- ✅ Generate thousands of synthetic trading scenarios deterministically
- ✅ Analyze reward distribution, feature importance & partial dependence
- ✅ Built‑in invariant & statistical validation layers (fail‑fast)
+- ✅ PBRS (Potential-Based Reward Shaping) integration with canonical invariance
- ✅ Export reproducible artifacts (parameter hash + execution manifest)
- ✅ Compare synthetic vs real trading data (distribution shift metrics)
- ✅ Parameter bounds validation & automatic sanitization
--num_samples 30000 \
--params win_reward_factor=4.0 \
--output aggressive_rewards
+
+# Test PBRS potential shaping
+python reward_space_analysis.py \
+ --num_samples 30000 \
+ --params hold_potential_enabled=true potential_gamma=0.9 exit_potential_mode=progressive_release \
+ --output pbrs_analysis
```
**Compare:** Reward distributions between runs in `statistical_analysis.md`
- `idle_penalty_scale` (default: 0.5) - Scale of idle penalty
- `idle_penalty_power` (default: 1.025) - Power applied to idle penalty scaling
-_Holding penalty configuration:_
+_Hold penalty configuration:_
-- `holding_penalty_scale` (default: 0.25) - Scale of holding penalty
-- `holding_penalty_power` (default: 1.025) - Power applied to holding penalty scaling
+- `hold_penalty_scale` (default: 0.25) - Scale of hold penalty
+- `hold_penalty_power` (default: 1.025) - Power applied to hold penalty scaling
_Exit attenuation configuration:_
- `win_reward_factor` (default: 2.0) - Asymptotic bonus multiplier for PnL above target. Raw `profit_target_factor` ∈ [1, 1 + win_reward_factor] (tanh bounds it); overall amplification may exceed this once multiplied by `efficiency_factor`.
- `pnl_factor_beta` (default: 0.5) - Sensitivity of amplification around target
+_PBRS (Potential-Based Reward Shaping) configuration:_
+
+- `potential_gamma` (default: 0.95) - Discount factor γ for PBRS potential term (0 ≤ γ ≤ 1)
+- `potential_softsign_sharpness` (default: 1.0) - Sharpness parameter for softsign_sharp transform (smaller = sharper)
+- `exit_potential_mode` (default: canonical) - Exit potential mode: 'canonical' (Φ=0), 'progressive_release', 'spike_cancel', 'retain_previous'
+- `exit_potential_decay` (default: 0.5) - Decay factor for progressive_release exit mode (0 ≤ decay ≤ 1)
+- `hold_potential_enabled` (default: true) - Enable PBRS hold potential function Φ(s)
+- `hold_potential_scale` (default: 1.0) - Scale factor for hold potential function
+- `hold_potential_gain` (default: 1.0) - Gain factor applied before transforms in hold potential
+- `hold_potential_transform_pnl` (default: tanh) - Transform function for PnL: tanh, softsign, softsign_sharp, arctan, logistic, asinh_norm, clip
+- `hold_potential_transform_duration` (default: tanh) - Transform function for duration ratio
+- `entry_additive_enabled` (default: false) - Enable entry additive reward (non-PBRS component)
+- `entry_additive_scale` (default: 1.0) - Scale factor for entry additive reward
+- `entry_additive_gain` (default: 1.0) - Gain factor for entry additive reward
+- `entry_additive_transform_pnl` (default: tanh) - Transform function for PnL in entry additive
+- `entry_additive_transform_duration` (default: tanh) - Transform function for duration ratio in entry additive
+- `exit_additive_enabled` (default: false) - Enable exit additive reward (non-PBRS component)
+- `exit_additive_scale` (default: 1.0) - Scale factor for exit additive reward
+- `exit_additive_gain` (default: 1.0) - Gain factor for exit additive reward
+- `exit_additive_transform_pnl` (default: tanh) - Transform function for PnL in exit additive
+- `exit_additive_transform_duration` (default: tanh) - Transform function for duration ratio in exit additive
+
_Invariant / safety controls:_
- `check_invariants` (default: true) - Enable/disable runtime invariant & safety validations (simulation invariants, mathematical bounds, distribution checks). Set to `false` only for performance experiments; not recommended for production validation.
--params win_reward_factor=3.0 idle_penalty_scale=1.5 \
--output sensitivity_test
+# PBRS potential shaping analysis
+python reward_space_analysis.py \
+ --num_samples 40000 \
+ --params hold_potential_enabled=true exit_potential_mode=spike_cancel potential_gamma=0.95 \
+ --output pbrs_test
+
# Real vs synthetic comparison
python reward_space_analysis.py \
--num_samples 100000 \
- **Global Statistics** - Reward distributions and component activation rates
- **Sample Representativity** - Coverage of critical market scenarios
- **Component Analysis** - Relationships between rewards and conditions
+- **PBRS Analysis** - Potential-based reward shaping component activation rates, statistics, and invariance validation
- **Feature Importance** - Machine learning analysis of key drivers
- **Statistical Validation** - Hypothesis tests, confidence intervals, normality + effect sizes
- **Distribution Shift** - Real vs synthetic divergence (KL, JS, Wasserstein, KS)
- **Diagnostics Validation Summary**
- Pass/fail snapshot of all runtime checks
- Consolidated pass/fail state of every validation layer (invariants, parameter bounds, bootstrap CIs, distribution metrics, diagnostics, hypothesis tests)
+ - PBRS invariance validation (canonical mode check: ∑shaping_rewards ≈ 0)
### Data Exports
--params exit_attenuation_mode=power exit_power_tau=0.5 efficiency_weight=0.8 \
--output custom_test
-# Test aggressive holding penalties
+# Test aggressive hold penalties
+python reward_space_analysis.py \
+ --num_samples 25000 \
+ --params hold_penalty_scale=0.5 \
+ --output aggressive_hold
+
+# Test PBRS configurations
+python reward_space_analysis.py \
+ --num_samples 25000 \
+ --params hold_potential_enabled=true entry_additive_enabled=true exit_additive_enabled=false exit_potential_mode=canonical \
+ --output pbrs_canonical
+
python reward_space_analysis.py \
--num_samples 25000 \
- --params holding_penalty_scale=0.5 \
- --output aggressive_holding
+ --params hold_potential_transform_pnl=softsign_sharp potential_softsign_sharpness=0.5 \
+ --output pbrs_sharp_transforms
```
### Real Data Comparison
| Statistical Validation | TestStatisticalValidation | Mathematical bounds, heteroscedasticity, invariants |
| Boundary Conditions | TestBoundaryConditions | Extreme params & unknown mode fallback |
| Helper Functions | TestHelperFunctions | Report writers, model analysis, utility conversions |
-| Private Functions (via public API) | TestPrivateFunctions | Idle / holding / invalid penalties, exit scenarios |
+| Private Functions (via public API) | TestPrivateFunctions | Idle / hold / invalid penalties, exit scenarios |
| Robustness | TestRewardRobustness | Monotonic attenuation (where applicable), decomposition integrity, boundary regimes |
| Parameter Validation | TestParameterValidation | Bounds clamping, warning threshold, penalty power scaling |
| Continuity | TestContinuityPlateau | Plateau boundary continuity & small‑epsilon attenuation scaling |
+| PBRS Integration | TestPBRSIntegration | Potential-based reward shaping, transforms, exit modes, canonical invariance |
### Test Architecture
- Review parameter overrides with `--params`
- Check trading mode settings (spot vs margin/futures)
- Verify `base_factor` matches your environment config
+- Check PBRS settings: `hold_potential_enabled`, `exit_potential_mode`, and transform functions
+- Review parameter adjustments in output logs for any automatic bound clamping
### Slow Execution
return bool(text)
-def _get_param_float(
+def _get_float_param(
params: RewardParams, key: str, default: RewardParamValue
) -> float:
"""Extract float parameter with type safety and default fallback."""
# Mathematical constants pre-computed for performance
_LOG_2 = math.log(2.0)
+DEFAULT_IDLE_DURATION_MULTIPLIER = 4
RewardParamValue = Union[float, str, bool, None]
RewardParams = Dict[str, RewardParamValue]
+# Internal safe fallback helper for numeric failures (centralizes semantics)
+def _fail_safely(reason: str) -> float:
+ """Return 0.0 on recoverable numeric failure (reason available for future debug hooks)."""
+ # NOTE: presently silent to preserve legacy behaviour; hook logging here if needed.
+ _ = reason
+ return 0.0
+
+
# Allowed exit attenuation modes
ALLOWED_EXIT_MODES = {"legacy", "sqrt", "linear", "power", "half_life"}
+# PBRS constants
+ALLOWED_TRANSFORMS = {
+ "tanh",
+ "softsign",
+ "softsign_sharp",
+ "arctan",
+ "logistic",
+ "asinh_norm",
+ "clip",
+}
+ALLOWED_EXIT_POTENTIAL_MODES = {
+ "canonical",
+ "progressive_release",
+ "spike_cancel",
+ "retain_previous",
+}
+
DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = {
"invalid_action": -2.0,
"base_factor": 100.0,
"idle_penalty_power": 1.025,
# Fallback: 2 * max_trade_duration_candles
"max_idle_duration_candles": None,
- # Holding keys (env defaults)
- "holding_penalty_scale": 0.25,
- "holding_penalty_power": 1.025,
+ # Hold keys (env defaults)
+ "hold_penalty_scale": 0.25,
+ "hold_penalty_power": 1.025,
# Exit attenuation configuration (env default)
"exit_attenuation_mode": "linear",
"exit_plateau": True,
# Invariant / safety controls (env defaults)
"check_invariants": True,
"exit_factor_threshold": 10000.0,
+ # === PBRS PARAMETERS ===
+ # Potential-based reward shaping core parameters
+ # Discount factor γ for potential term (0 ≤ γ ≤ 1)
+ "potential_gamma": 0.95,
+ "potential_softsign_sharpness": 1.0,
+ # Exit potential modes: canonical | progressive_release | spike_cancel | retain_previous
+ "exit_potential_mode": "canonical",
+ "exit_potential_decay": 0.5,
+ # Hold potential (PBRS function Φ)
+ "hold_potential_enabled": True,
+ "hold_potential_scale": 1.0,
+ "hold_potential_gain": 1.0,
+ "hold_potential_transform_pnl": "tanh",
+ "hold_potential_transform_duration": "tanh",
+ # Entry additive (non-PBRS additive term)
+ "entry_additive_enabled": False,
+ "entry_additive_scale": 1.0,
+ "entry_additive_gain": 1.0,
+ "entry_additive_transform_pnl": "tanh",
+ "entry_additive_transform_duration": "tanh",
+ # Exit additive (non-PBRS additive term)
+ "exit_additive_enabled": False,
+ "exit_additive_scale": 1.0,
+ "exit_additive_gain": 1.0,
+ "exit_additive_transform_pnl": "tanh",
+ "exit_additive_transform_duration": "tanh",
}
DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
"idle_penalty_power": "Power applied to idle penalty scaling.",
"idle_penalty_scale": "Scale of idle penalty.",
"max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling.",
- "holding_penalty_scale": "Scale of holding penalty.",
- "holding_penalty_power": "Power applied to holding penalty scaling.",
+ "hold_penalty_scale": "Scale of hold penalty.",
+ "hold_penalty_power": "Power applied to hold penalty scaling.",
"exit_attenuation_mode": "Attenuation kernel (legacy|sqrt|linear|power|half_life).",
"exit_plateau": "Enable plateau. If true, full strength until grace boundary then apply attenuation.",
"exit_plateau_grace": "Grace boundary duration ratio for plateau (full strength until this boundary).",
"pnl_factor_beta": "Sensitivity of amplification around target.",
"check_invariants": "Boolean flag (true/false) to enable runtime invariant & safety checks.",
"exit_factor_threshold": "If |exit factor| exceeds this threshold, emit warning.",
+ # PBRS parameters
+ "potential_gamma": "Discount factor γ for PBRS potential-based reward shaping (0 ≤ γ ≤ 1).",
+ "potential_softsign_sharpness": "Sharpness parameter for softsign_sharp transform (smaller = sharper).",
+ "exit_potential_mode": "Exit potential mode: 'canonical' (Φ=0), 'progressive_release', 'spike_cancel', 'retain_previous'.",
+ "exit_potential_decay": "Decay factor for progressive_release exit mode (0 ≤ decay ≤ 1).",
+ "hold_potential_enabled": "Enable PBRS hold potential function Φ(s).",
+ "hold_potential_scale": "Scale factor for hold potential function.",
+ "hold_potential_gain": "Gain factor applied before transforms in hold potential.",
+ "hold_potential_transform_pnl": "Transform function for PnL in hold potential: tanh, softsign, softsign_sharp, arctan, logistic, asinh_norm, clip.",
+ "hold_potential_transform_duration": "Transform function for duration ratio in hold potential.",
+ "entry_additive_enabled": "Enable entry additive reward (non-PBRS component).",
+ "entry_additive_scale": "Scale factor for entry additive reward.",
+ "entry_additive_gain": "Gain factor for entry additive reward.",
+ "entry_additive_transform_pnl": "Transform function for PnL in entry additive.",
+ "entry_additive_transform_duration": "Transform function for duration ratio in entry additive.",
+ "exit_additive_enabled": "Enable exit additive reward (non-PBRS component).",
+ "exit_additive_scale": "Scale factor for exit additive reward.",
+ "exit_additive_gain": "Gain factor for exit additive reward.",
+ "exit_additive_transform_pnl": "Transform function for PnL in exit additive.",
+ "exit_additive_transform_duration": "Transform function for duration ratio in exit additive.",
}
"idle_penalty_power": {"min": 0.0},
"idle_penalty_scale": {"min": 0.0},
"max_idle_duration_candles": {"min": 0.0},
- "holding_penalty_scale": {"min": 0.0},
- "holding_penalty_power": {"min": 0.0},
+ "hold_penalty_scale": {"min": 0.0},
+ "hold_penalty_power": {"min": 0.0},
"exit_linear_slope": {"min": 0.0},
"exit_plateau_grace": {"min": 0.0},
"exit_power_tau": {"min": 1e-6, "max": 1.0}, # open (0,1] approximated
"efficiency_center": {"min": 0.0, "max": 1.0},
"win_reward_factor": {"min": 0.0},
"pnl_factor_beta": {"min": 1e-6},
+ # PBRS parameter bounds
+ "potential_gamma": {"min": 0.0, "max": 1.0},
+ # Softsign sharpness: only lower bound enforced (upper bound limited implicitly by transform stability)
+ "potential_softsign_sharpness": {"min": 1e-6},
+ "exit_potential_decay": {"min": 0.0, "max": 1.0},
+ "hold_potential_scale": {"min": 0.0},
+ "hold_potential_gain": {"min": 0.0},
+ "entry_additive_scale": {"min": 0.0},
+ "entry_additive_gain": {"min": 0.0},
+ "exit_additive_scale": {"min": 0.0},
+ "exit_additive_gain": {"min": 0.0},
}
"""
sanitized = dict(params)
adjustments: Dict[str, Dict[str, Any]] = {}
+ # Normalize boolean-like parameters explicitly to avoid inconsistent types
+ _bool_keys = [
+ "check_invariants",
+ "hold_potential_enabled",
+ "entry_additive_enabled",
+ "exit_additive_enabled",
+ ]
+ for bkey in _bool_keys:
+ if bkey in sanitized:
+ original_val = sanitized[bkey]
+ coerced = _to_bool(original_val)
+ if coerced is not original_val:
+ sanitized[bkey] = coerced
+ adjustments.setdefault(
+ bkey,
+ {
+ "original": original_val,
+ "adjusted": coerced,
+ "reason": "bool_coerce",
+ },
+ )
for key, bounds in _PARAMETER_BOUNDS.items():
if key not in sanitized:
continue
- If the key is absent or value is ``None``: leave untouched (upstream defaults
will inject 'linear').
"""
- exit_attenuation_mode = params.get("exit_attenuation_mode")
- if exit_attenuation_mode is None:
+ if "exit_attenuation_mode" not in params:
return
- exit_attenuation_mode = str(exit_attenuation_mode)
+
+ exit_attenuation_mode = _get_str_param(params, "exit_attenuation_mode", "linear")
if exit_attenuation_mode not in ALLOWED_EXIT_MODES:
params["exit_attenuation_mode"] = "linear"
default=None,
help=help_text,
)
+ elif key == "exit_potential_mode":
+ parser.add_argument(
+ f"--{key}",
+ type=str,
+ choices=sorted(ALLOWED_EXIT_POTENTIAL_MODES),
+ default=None,
+ help=help_text,
+ )
+ elif key in [
+ "hold_potential_transform_pnl",
+ "hold_potential_transform_duration",
+ "entry_additive_transform_pnl",
+ "entry_additive_transform_duration",
+ "exit_additive_transform_pnl",
+ "exit_additive_transform_duration",
+ ]:
+ parser.add_argument(
+ f"--{key}",
+ type=str,
+ choices=sorted(ALLOWED_TRANSFORMS),
+ default=None,
+ help=help_text,
+ )
+ elif key in [
+ "hold_potential_enabled",
+ "entry_additive_enabled",
+ "exit_additive_enabled",
+ ]:
+ parser.add_argument(
+ f"--{key}",
+ type=int,
+ choices=[0, 1],
+ default=None,
+ help=help_text,
+ )
else:
# Map numerics to float; leave strings as str
if isinstance(default, (int, float)):
total: float = 0.0
invalid_penalty: float = 0.0
idle_penalty: float = 0.0
- holding_penalty: float = 0.0
+ hold_penalty: float = 0.0
exit_component: float = 0.0
+ # PBRS components
+ shaping_reward: float = 0.0
+ entry_additive: float = 0.0
+ exit_additive: float = 0.0
+ current_potential: float = 0.0
+ next_potential: float = 0.0
def _get_exit_factor(
Assumptions:
- ``_normalize_and_validate_mode`` has already run (invalid modes replaced by 'linear').
- ``exit_attenuation_mode`` is therefore either a member of ``ALLOWED_EXIT_MODES`` or 'linear'.
- - All numeric tunables are accessed through ``_get_param_float`` for safety.
+ - All numeric tunables are accessed through ``_get_float_param`` for safety.
Algorithm steps:
1. Finiteness & non-negative guard on inputs.
or not np.isfinite(pnl)
or not np.isfinite(duration_ratio)
):
- return 0.0
+ return _fail_safely("non_finite_exit_factor_inputs")
# Guard: duration ratio should never be negative
if duration_ratio < 0.0:
duration_ratio = 0.0
- exit_attenuation_mode = str(params.get("exit_attenuation_mode", "linear"))
- exit_plateau = _to_bool(params.get("exit_plateau", True))
+ exit_attenuation_mode = _get_str_param(params, "exit_attenuation_mode", "linear")
+ exit_plateau = _get_bool_param(params, "exit_plateau", True)
- exit_plateau_grace = _get_param_float(params, "exit_plateau_grace", 1.0)
+ exit_plateau_grace = _get_float_param(params, "exit_plateau_grace", 1.0)
if exit_plateau_grace < 0.0:
exit_plateau_grace = 1.0
- exit_linear_slope = _get_param_float(params, "exit_linear_slope", 1.0)
+ exit_linear_slope = _get_float_param(params, "exit_linear_slope", 1.0)
if exit_linear_slope < 0.0:
exit_linear_slope = 1.0
return f / (1.0 + exit_linear_slope * dr)
def _power_kernel(f: float, dr: float) -> float:
- tau = _get_param_float(
+ tau = _get_float_param(
params,
"exit_power_tau",
DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_power_tau", 0.5),
return f / math.pow(1.0 + dr, alpha)
def _half_life_kernel(f: float, dr: float) -> float:
- hl = _get_param_float(params, "exit_half_life", 0.5)
+ hl = _get_float_param(params, "exit_half_life", 0.5)
if hl <= 0.0:
hl = 0.5
return f * math.pow(2.0, -dr / hl)
base_factor *= pnl_factor
# Invariant & safety checks
- if _to_bool(params.get("check_invariants", True)):
+ if _get_bool_param(params, "check_invariants", True):
if not np.isfinite(base_factor):
- return 0.0
+ return _fail_safely("non_finite_exit_factor_after_kernel")
if base_factor < 0.0 and pnl >= 0.0:
# Clamp: avoid negative amplification on non-negative pnl
base_factor = 0.0
- exit_factor_threshold = _get_param_float(
+ exit_factor_threshold = _get_float_param(
params,
"exit_factor_threshold",
DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_factor_threshold", 10000.0),
pnl = context.pnl
if not np.isfinite(pnl) or not np.isfinite(profit_target):
- return 0.0
+ return _fail_safely("non_finite_pnl_or_target")
profit_target_factor = 1.0
if profit_target > 0.0 and pnl > profit_target:
- win_reward_factor = _get_param_float(
+ win_reward_factor = _get_float_param(
params,
"win_reward_factor",
DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0),
)
- pnl_factor_beta = _get_param_float(
+ pnl_factor_beta = _get_float_param(
params,
"pnl_factor_beta",
DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
)
efficiency_factor = 1.0
- efficiency_weight = _get_param_float(
+ efficiency_weight = _get_float_param(
params,
"efficiency_weight",
DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_weight", 1.0),
)
- efficiency_center = _get_param_float(
+ efficiency_center = _get_float_param(
params,
"efficiency_center",
DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_center", 0.5),
context: RewardContext, idle_factor: float, params: RewardParams
) -> float:
"""Mirror the environment's idle penalty behaviour."""
- idle_penalty_scale = _get_param_float(
+ idle_penalty_scale = _get_float_param(
params,
"idle_penalty_scale",
DEFAULT_MODEL_REWARD_PARAMETERS.get("idle_penalty_scale", 0.5),
)
- idle_penalty_power = _get_param_float(
+ idle_penalty_power = _get_float_param(
params,
"idle_penalty_power",
DEFAULT_MODEL_REWARD_PARAMETERS.get("idle_penalty_power", 1.025),
max_idle_duration_candles = params.get("max_idle_duration_candles")
if max_idle_duration_candles is None:
- max_idle_duration = 2 * max_trade_duration_candles
+ max_idle_duration = (
+ DEFAULT_IDLE_DURATION_MULTIPLIER * max_trade_duration_candles
+ )
else:
try:
max_idle_duration = int(max_idle_duration_candles)
except (TypeError, ValueError):
- max_idle_duration = 2 * max_trade_duration_candles
+ max_idle_duration = (
+ DEFAULT_IDLE_DURATION_MULTIPLIER * max_trade_duration_candles
+ )
idle_duration_ratio = context.idle_duration / max(1, max_idle_duration)
return -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power
-def _holding_penalty(
- context: RewardContext, holding_factor: float, params: RewardParams
+def _hold_penalty(
+ context: RewardContext, hold_factor: float, params: RewardParams
) -> float:
- """Mirror the environment's holding penalty behaviour."""
- holding_penalty_scale = _get_param_float(
+ """Mirror the environment's hold penalty behaviour."""
+ hold_penalty_scale = _get_float_param(
params,
- "holding_penalty_scale",
- DEFAULT_MODEL_REWARD_PARAMETERS.get("holding_penalty_scale", 0.25),
+ "hold_penalty_scale",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_penalty_scale", 0.25),
)
- holding_penalty_power = _get_param_float(
+ hold_penalty_power = _get_float_param(
params,
- "holding_penalty_power",
- DEFAULT_MODEL_REWARD_PARAMETERS.get("holding_penalty_power", 1.025),
+ "hold_penalty_power",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_penalty_power", 1.025),
)
duration_ratio = _compute_duration_ratio(
context.trade_duration, context.max_trade_duration
)
if duration_ratio < 1.0:
- return 0.0
+ return _fail_safely("hold_penalty_duration_ratio_lt_1")
return (
- -holding_factor
- * holding_penalty_scale
- * (duration_ratio - 1.0) ** holding_penalty_power
+ -hold_factor * hold_penalty_scale * (duration_ratio - 1.0) ** hold_penalty_power
)
*,
short_allowed: bool,
action_masking: bool,
+ previous_potential: float = 0.0,
) -> RewardBreakdown:
breakdown = RewardBreakdown()
short_allowed=short_allowed,
)
if not is_valid and not action_masking:
- breakdown.invalid_penalty = _get_param_float(params, "invalid_action", -2.0)
+ breakdown.invalid_penalty = _get_float_param(params, "invalid_action", -2.0)
breakdown.total = breakdown.invalid_penalty
return breakdown
- factor = _get_param_float(params, "base_factor", base_factor)
+ factor = _get_float_param(params, "base_factor", base_factor)
if "profit_target" in params:
- profit_target = _get_param_float(params, "profit_target", float(profit_target))
+ profit_target = _get_float_param(params, "profit_target", float(profit_target))
if "risk_reward_ratio" in params:
- risk_reward_ratio = _get_param_float(
+ risk_reward_ratio = _get_float_param(
params, "risk_reward_ratio", float(risk_reward_ratio)
)
# Scale profit target by risk-reward ratio (reward multiplier)
# E.g., profit_target=0.03, RR=2.0 → profit_target_final=0.06
profit_target_final = profit_target * risk_reward_ratio
- idle_factor = factor * profit_target_final / 3.0
+ idle_factor = factor * profit_target_final / 4.0
pnl_factor = _get_pnl_factor(params, context, profit_target_final)
- holding_factor = idle_factor
+ hold_factor = idle_factor
- if context.action == Actions.Neutral and context.position == Positions.Neutral:
- breakdown.idle_penalty = _idle_penalty(context, idle_factor, params)
- breakdown.total = breakdown.idle_penalty
- return breakdown
+ # Base reward calculation (existing logic)
+ base_reward = 0.0
- if (
+ if context.action == Actions.Neutral and context.position == Positions.Neutral:
+ base_reward = _idle_penalty(context, idle_factor, params)
+ breakdown.idle_penalty = base_reward
+ elif (
context.position in (Positions.Long, Positions.Short)
and context.action == Actions.Neutral
):
- breakdown.holding_penalty = _holding_penalty(context, holding_factor, params)
- breakdown.total = breakdown.holding_penalty
- return breakdown
+ base_reward = _hold_penalty(context, hold_factor, params)
+ breakdown.hold_penalty = base_reward
+ elif context.action == Actions.Long_exit and context.position == Positions.Long:
+ base_reward = _compute_exit_reward(factor, pnl_factor, context, params)
+ breakdown.exit_component = base_reward
+ elif context.action == Actions.Short_exit and context.position == Positions.Short:
+ base_reward = _compute_exit_reward(factor, pnl_factor, context, params)
+ breakdown.exit_component = base_reward
+ else:
+ base_reward = 0.0
+
+ # === PBRS INTEGRATION ===
+ # Determine state transitions for PBRS
+ current_pnl = context.pnl if context.position != Positions.Neutral else 0.0
+ current_duration_ratio = (
+ context.trade_duration / context.max_trade_duration
+ if context.position != Positions.Neutral and context.max_trade_duration > 0
+ else 0.0
+ )
- if context.action == Actions.Long_exit and context.position == Positions.Long:
- exit_reward = _compute_exit_reward(
- factor,
- pnl_factor,
- context,
- params,
+ # Simulate next state for PBRS calculation
+ is_terminal = context.action in (Actions.Long_exit, Actions.Short_exit)
+
+ # For terminal transitions, next state is neutral (PnL=0, duration=0)
+ if is_terminal:
+ next_pnl = 0.0
+ next_duration_ratio = 0.0
+ else:
+ # For non-terminal, use current values (simplified simulation)
+ next_pnl = current_pnl
+ next_duration_ratio = current_duration_ratio
+
+ # Apply PBRS if any PBRS parameters are enabled
+ pbrs_enabled = (
+ _get_bool_param(params, "hold_potential_enabled", True)
+ or _get_bool_param(params, "entry_additive_enabled", False)
+ or _get_bool_param(params, "exit_additive_enabled", False)
+ )
+
+ if pbrs_enabled:
+ total_reward, shaping_reward, next_potential = apply_potential_shaping(
+ base_reward=base_reward,
+ current_pnl=current_pnl,
+ current_duration_ratio=current_duration_ratio,
+ next_pnl=next_pnl,
+ next_duration_ratio=next_duration_ratio,
+ is_terminal=is_terminal,
+ last_potential=previous_potential,
+ params=params,
)
- breakdown.exit_component = exit_reward
- breakdown.total = exit_reward
- return breakdown
- if context.action == Actions.Short_exit and context.position == Positions.Short:
- exit_reward = _compute_exit_reward(
- factor,
- pnl_factor,
- context,
- params,
+ # Update breakdown with PBRS components
+ breakdown.shaping_reward = shaping_reward
+ breakdown.current_potential = _compute_hold_potential(
+ current_pnl, current_duration_ratio, params
)
- breakdown.exit_component = exit_reward
- breakdown.total = exit_reward
- return breakdown
+ breakdown.next_potential = next_potential
+ breakdown.entry_additive = _compute_entry_additive(
+ current_pnl, current_duration_ratio, params
+ )
+ breakdown.exit_additive = (
+ _compute_exit_additive(next_pnl, next_duration_ratio, params)
+ if is_terminal
+ else 0.0
+ )
+ breakdown.total = total_reward
+ else:
+ breakdown.total = base_reward
- breakdown.total = 0.0
return breakdown
) -> pd.DataFrame:
rng = random.Random(seed)
short_allowed = _is_short_allowed(trading_mode)
- action_masking = _to_bool(params.get("action_masking", True))
+ action_masking = _get_bool_param(params, "action_masking", True)
samples: list[Dict[str, float]] = []
for _ in range(num_samples):
if short_allowed:
"reward_total": breakdown.total,
"reward_invalid": breakdown.invalid_penalty,
"reward_idle": breakdown.idle_penalty,
- "reward_holding": breakdown.holding_penalty,
+ "reward_hold": breakdown.hold_penalty,
"reward_exit": breakdown.exit_component,
+ # PBRS components
+ "reward_shaping": breakdown.shaping_reward,
+ "reward_entry_additive": breakdown.entry_additive,
+ "reward_exit_additive": breakdown.exit_additive,
+ "current_potential": breakdown.current_potential,
+ "next_potential": breakdown.next_potential,
"is_invalid": float(breakdown.invalid_penalty != 0.0),
}
)
["count", "mean", "std", "min", "max"]
)
component_share = df[
- ["reward_invalid", "reward_idle", "reward_holding", "reward_exit"]
+ [
+ "reward_invalid",
+ "reward_idle",
+ "reward_hold",
+ "reward_exit",
+ "reward_shaping",
+ "reward_entry_additive",
+ "reward_exit_additive",
+ ]
].apply(lambda col: (col != 0).mean())
components = [
"reward_invalid",
"reward_idle",
- "reward_holding",
+ "reward_hold",
"reward_exit",
"reward_total",
]
pnl_bins = np.linspace(pnl_min, pnl_max, 13)
idle_stats = _binned_stats(df, "idle_duration", "reward_idle", idle_bins)
- holding_stats = _binned_stats(df, "trade_duration", "reward_holding", trade_bins)
+ hold_stats = _binned_stats(df, "trade_duration", "reward_hold", trade_bins)
exit_stats = _binned_stats(df, "pnl", "reward_exit", pnl_bins)
idle_stats = idle_stats.round(6)
- holding_stats = holding_stats.round(6)
+ hold_stats = hold_stats.round(6)
exit_stats = exit_stats.round(6)
correlation_fields = [
"reward_total",
"reward_invalid",
"reward_idle",
- "reward_holding",
+ "reward_hold",
"reward_exit",
"pnl",
"trade_duration",
return {
"idle_stats": idle_stats,
- "holding_stats": holding_stats,
+ "hold_stats": hold_stats,
"exit_stats": exit_stats,
"correlation": correlation,
}
duration_overage_share = float((df["duration_ratio"] > 1.0).mean())
idle_activated = float((df["reward_idle"] != 0).mean())
- holding_activated = float((df["reward_holding"] != 0).mean())
+ hold_activated = float((df["reward_hold"] != 0).mean())
exit_activated = float((df["reward_exit"] != 0).mean())
return {
"pnl_extreme": pnl_extreme,
"duration_overage_share": duration_overage_share,
"idle_activated": idle_activated,
- "holding_activated": holding_activated,
+ "hold_activated": hold_activated,
"exit_activated": exit_activated,
}
numeric_optional = {
"reward_exit",
"reward_idle",
- "reward_holding",
+ "reward_hold",
"reward_invalid",
"duration_ratio",
"idle_ratio",
nargs="*",
default=[],
metavar="KEY=VALUE",
- help="Override reward parameters, e.g. holding_penalty_scale=0.5",
+ help="Override reward parameters, e.g. hold_penalty_scale=0.5",
)
# Dynamically add CLI options for all tunables
add_tunable_cli_args(parser)
metrics_for_ci = [
"reward_total",
"reward_idle",
- "reward_holding",
+ "reward_hold",
"reward_exit",
"pnl",
]
f"| Duration overage (>1.0) | {representativity_stats['duration_overage_share']:.1%} |\n"
)
f.write(
- f"| Extreme PnL (|pnl|≥0.14) | {representativity_stats['pnl_extreme']:.1%} |\n"
+ f"| Extreme PnL (\\|pnl\\|≥0.14) | {representativity_stats['pnl_extreme']:.1%} |\n"
)
f.write("\n")
f.write("| Component | Activation Rate |\n")
f.write("|-----------|----------------|\n")
f.write(f"| Idle penalty | {representativity_stats['idle_activated']:.1%} |\n")
- f.write(
- f"| Holding penalty | {representativity_stats['holding_activated']:.1%} |\n"
- )
+ f.write(f"| Hold penalty | {representativity_stats['hold_activated']:.1%} |\n")
f.write(f"| Exit reward | {representativity_stats['exit_activated']:.1%} |\n")
f.write("\n")
idle_df.index.name = "bin"
f.write(_df_to_md(idle_df, index_name=idle_df.index.name, ndigits=6))
- f.write("### 3.2 Holding Penalty vs Trade Duration\n\n")
- if relationship_stats["holding_stats"].empty:
- f.write("_No holding samples present._\n\n")
+ f.write("### 3.2 Hold Penalty vs Trade Duration\n\n")
+ if relationship_stats["hold_stats"].empty:
+ f.write("_No hold samples present._\n\n")
else:
- holding_df = relationship_stats["holding_stats"].copy()
- if holding_df.index.name is None:
- holding_df.index.name = "bin"
- f.write(_df_to_md(holding_df, index_name=holding_df.index.name, ndigits=6))
+ hold_df = relationship_stats["hold_stats"].copy()
+ if hold_df.index.name is None:
+ hold_df.index.name = "bin"
+ f.write(_df_to_md(hold_df, index_name=hold_df.index.name, ndigits=6))
f.write("### 3.3 Exit Reward vs PnL\n\n")
if relationship_stats["exit_stats"].empty:
corr_df.index.name = "feature"
f.write(_df_to_md(corr_df, index_name=corr_df.index.name, ndigits=4))
+ # Section 3.5: PBRS Analysis
+ f.write("### 3.5 PBRS (Potential-Based Reward Shaping) Analysis\n\n")
+
+ # Check if PBRS components are present in the data
+ pbrs_components = [
+ "reward_shaping",
+ "reward_entry_additive",
+ "reward_exit_additive",
+ ]
+ pbrs_present = all(col in df.columns for col in pbrs_components)
+
+ if pbrs_present:
+ # PBRS activation rates
+ pbrs_activation = {}
+ for comp in pbrs_components:
+ pbrs_activation[comp.replace("reward_", "")] = (df[comp] != 0).mean()
+
+ f.write("**PBRS Component Activation Rates:**\n\n")
+ f.write("| Component | Activation Rate | Description |\n")
+ f.write("|-----------|-----------------|-------------|\n")
+ f.write(
+ f"| Shaping (Φ) | {pbrs_activation['shaping']:.1%} | Potential-based reward shaping |\n"
+ )
+ f.write(
+ f"| Entry Additive | {pbrs_activation['entry_additive']:.1%} | Non-PBRS entry reward |\n"
+ )
+ f.write(
+ f"| Exit Additive | {pbrs_activation['exit_additive']:.1%} | Non-PBRS exit reward |\n"
+ )
+ f.write("\n")
+
+ # PBRS statistics
+ f.write("**PBRS Component Statistics:**\n\n")
+ pbrs_stats = df[pbrs_components].describe(
+ percentiles=[0.1, 0.25, 0.5, 0.75, 0.9]
+ )
+ pbrs_stats_df = pbrs_stats.round(
+ 6
+ ).T # Transpose to make it DataFrame-compatible
+ pbrs_stats_df.index.name = "component"
+ f.write(_df_to_md(pbrs_stats_df, index_name="component", ndigits=6))
+
+ # PBRS invariance check (canonical mode)
+ total_shaping = df["reward_shaping"].sum()
+ if abs(total_shaping) < 1e-6:
+ f.write(
+ "✅ **PBRS Invariance:** Total shaping reward ≈ 0 (canonical mode preserved)\n\n"
+ )
+ else:
+ f.write(
+ f"❌ **PBRS Invariance:** Total shaping reward = {total_shaping:.6f} (non-canonical behavior)\n\n"
+ )
+
+ else:
+ f.write("_PBRS components not present in this analysis._\n\n")
+
# Section 4: Feature Importance Analysis
f.write("---\n\n")
f.write("## 4. Feature Importance\n\n")
f.write(f"- p-value: {h['p_value']:.4g}\n")
if "p_value_adj" in h:
f.write(
- f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅' if h['significant_adj'] else '❌'} (α=0.05)\n"
+ f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
)
f.write(f"- 95% CI: [{h['ci_95'][0]:.4f}, {h['ci_95'][1]:.4f}]\n")
f.write(f"- Sample size: {h['n_samples']:,}\n")
f.write(f"- p-value: {h['p_value']:.4g}\n")
if "p_value_adj" in h:
f.write(
- f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅' if h['significant_adj'] else '❌'} (α=0.05)\n"
+ f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
)
f.write(f"- Effect size (ε²): {h['effect_size_epsilon_sq']:.4f}\n")
f.write(f"- Number of groups: {h['n_groups']}\n")
if "pnl_sign_reward_difference" in hypothesis_tests:
h = hypothesis_tests["pnl_sign_reward_difference"]
- f.write("#### 5.1.4 Positive vs Negative PnL Comparison\n\n")
+ f.write("#### 5.1.3 Positive vs Negative PnL Comparison\n\n")
f.write(f"**Test Method:** {h['test']}\n\n")
f.write(f"- U-statistic: **{h['statistic']:.4f}**\n")
f.write(f"- p-value: {h['p_value']:.4g}\n")
if "p_value_adj" in h:
f.write(
- f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅' if h['significant_adj'] else '❌'} (α=0.05)\n"
+ f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
)
f.write(f"- Median (PnL+): {h['median_pnl_positive']:.4f}\n")
f.write(f"- Median (PnL-): {h['median_pnl_negative']:.4f}\n")
f.write("**Interpretation Guide:**\n\n")
f.write("| Metric | Threshold | Meaning |\n")
f.write("|--------|-----------|--------|\n")
- f.write("| KL Divergence | < 0.3 | ✅ Good representativeness |\n")
- f.write("| JS Distance | < 0.2 | ✅ Similar distributions |\n")
- f.write("| KS p-value | > 0.05 | ✅ No significant difference |\n\n")
+ f.write("| KL Divergence | < 0.3 | ✅ Yes: Good representativeness |\n")
+ f.write("| JS Distance | < 0.2 | ✅ Yes: Similar distributions |\n")
+ f.write(
+ "| KS p-value | > 0.05 | ✅ Yes: No significant difference |\n\n"
+ )
# Footer
f.write("---\n\n")
"2. **Sample Representativity** - Coverage of critical market scenarios\n"
)
f.write(
- "3. **Component Analysis** - Relationships between rewards and conditions\n"
+ "3. **Component Analysis** - Relationships between rewards and conditions (including PBRS)\n"
)
f.write(
"4. **Feature Importance** - Machine learning analysis of key drivers\n"
# Early parameter validation (moved before simulation for alignment with docs)
params_validated, adjustments = validate_reward_parameters(params)
params = params_validated
+ if adjustments:
+ # Compact adjustments summary (param: original->adjusted [reason])
+ adj_lines = [
+ f" - {k}: {v['original']} -> {v['adjusted']} ({v['reason']})"
+ for k, v in adjustments.items()
+ ]
+ print("Parameter adjustments applied:\n" + "\n".join(adj_lines))
# Normalize attenuation mode
_normalize_and_validate_mode(params)
- base_factor = _get_param_float(params, "base_factor", float(args.base_factor))
- profit_target = _get_param_float(params, "profit_target", float(args.profit_target))
- risk_reward_ratio = _get_param_float(
+ base_factor = _get_float_param(params, "base_factor", float(args.base_factor))
+ profit_target = _get_float_param(params, "profit_target", float(args.profit_target))
+ risk_reward_ratio = _get_float_param(
params, "risk_reward_ratio", float(args.risk_reward_ratio)
)
print(f"Artifacts saved to: {args.output.resolve()}")
+# === PBRS TRANSFORM FUNCTIONS ===
+
+
+def _apply_transform_tanh(value: float, scale: float = 1.0) -> float:
+ """tanh(scale*value) ∈ (-1,1)."""
+ return float(np.tanh(scale * value))
+
+
+def _apply_transform_softsign(value: float, scale: float = 1.0) -> float:
+ """softsign: x/(1+|x|) with x=scale*value."""
+ x = scale * value
+ return float(x / (1.0 + abs(x)))
+
+
+def _apply_transform_softsign_sharp(
+ value: float, scale: float = 1.0, sharpness: float = 1.0
+) -> float:
+ """softsign_sharp: x/(sharpness+|x|) with x=scale*value (smaller sharpness = steeper)."""
+ x = scale * value
+ return float(x / (sharpness + abs(x)))
+
+
+def _apply_transform_arctan(value: float, scale: float = 1.0) -> float:
+ """arctan normalized: (2/pi)*atan(scale*value) ∈ (-1,1)."""
+ x = scale * value
+ return float((2.0 / math.pi) * math.atan(x))
+
+
+def _apply_transform_logistic(value: float, scale: float = 1.0) -> float:
+ """Overflow‑safe logistic transform mapped to (-1,1): 2σ(kx)−1 where k=scale."""
+ x = scale * value
+ try:
+ if x >= 0:
+ z = math.exp(-x) # z in (0,1]
+ return float((1.0 - z) / (1.0 + z))
+ else:
+ z = math.exp(x) # z in (0,1]
+ return float((z - 1.0) / (z + 1.0))
+ except OverflowError:
+ return 1.0 if x > 0 else -1.0
+
+
+def _apply_transform_asinh_norm(value: float, scale: float = 1.0) -> float:
+ """Normalized asinh: x / sqrt(1 + x²) producing range (-1,1)."""
+ scaled = scale * value
+ return float(scaled / math.hypot(1.0, scaled))
+
+
+def _apply_transform_clip(value: float, scale: float = 1.0) -> float:
+ """clip(scale*value) to [-1,1]."""
+ return float(np.clip(scale * value, -1.0, 1.0))
+
+
+def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float:
+ """Apply named transform; unknown names fallback to tanh with warning."""
+ transforms = {
+ "tanh": _apply_transform_tanh,
+ "softsign": _apply_transform_softsign,
+ "softsign_sharp": _apply_transform_softsign_sharp,
+ "arctan": _apply_transform_arctan,
+ "logistic": _apply_transform_logistic,
+ "asinh_norm": _apply_transform_asinh_norm,
+ "clip": _apply_transform_clip,
+ }
+
+ if transform_name not in transforms:
+ warnings.warn(
+ f"Unknown potential transform '{transform_name}'; falling back to tanh",
+ category=UserWarning,
+ stacklevel=2,
+ )
+ return _apply_transform_tanh(value, **kwargs)
+
+ return transforms[transform_name](value, **kwargs)
+
+
+# === PBRS HELPER FUNCTIONS ===
+
+
+def _get_potential_gamma(params: RewardParams) -> float:
+ """Return potential_gamma with fallback (missing/invalid -> 0.95 + warning)."""
+ value = params.get("potential_gamma", None)
+
+ if value is None:
+ warnings.warn(
+ "potential_gamma not found in config, using default value of 0.95. "
+ "This parameter controls the discount factor for PBRS potential shaping.",
+ category=UserWarning,
+ stacklevel=2,
+ )
+ return 0.95
+
+ if isinstance(value, (int, float)):
+ return float(value)
+
+ warnings.warn(
+ f"Invalid potential_gamma value: {value}. Using default 0.95. "
+ "Expected numeric value in [0, 1].",
+ category=UserWarning,
+ stacklevel=2,
+ )
+ return 0.95
+
+
+def _get_str_param(params: RewardParams, key: str, default: str) -> str:
+ """Extract string parameter with type safety."""
+ value = params.get(key, default)
+ if isinstance(value, str):
+ return value
+ return default
+
+
+def _get_bool_param(params: RewardParams, key: str, default: bool) -> bool:
+ """Extract boolean parameter with type safety."""
+ value = params.get(key, default)
+ try:
+ return _to_bool(value)
+ except Exception:
+ return bool(default)
+
+
+# === PBRS IMPLEMENTATION ===
+
+
+def _compute_hold_potential(
+ pnl: float, duration_ratio: float, params: RewardParams
+) -> float:
+ """
+ Compute PBRS hold potential: Φ(s) = scale * 0.5 * [T_pnl(g*pnl_ratio) + T_dur(g*duration_ratio)].
+
+ This implements the canonical PBRS potential function from Ng et al. (1999):
+ R'(s,a,s') = R_base(s,a,s') + γΦ(s') - Φ(s)
+
+ Args:
+ pnl: Current profit/loss ratio
+ duration_ratio: Current duration as fraction of max_trade_duration
+ params: Reward parameters containing PBRS configuration
+
+ Returns:
+ Potential value Φ(s)
+ """
+ if not _get_bool_param(params, "hold_potential_enabled", True):
+ return _fail_safely("hold_potential_disabled")
+
+ scale = _get_float_param(params, "hold_potential_scale", 1.0)
+ gain = _get_float_param(params, "hold_potential_gain", 1.0)
+ transform_pnl = _get_str_param(params, "hold_potential_transform_pnl", "tanh")
+ transform_duration = _get_str_param(
+ params, "hold_potential_transform_duration", "tanh"
+ )
+ sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0)
+
+ # Apply transforms
+ if transform_pnl == "softsign_sharp":
+ t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness)
+ else:
+ t_pnl = apply_transform(transform_pnl, gain * pnl)
+
+ if transform_duration == "softsign_sharp":
+ t_dur = apply_transform(
+ transform_duration, gain * duration_ratio, sharpness=sharpness
+ )
+ else:
+ t_dur = apply_transform(transform_duration, gain * duration_ratio)
+
+ potential = scale * 0.5 * (t_pnl + t_dur)
+
+ # Validate numerical safety
+ if not np.isfinite(potential):
+ return _fail_safely("non_finite_hold_potential")
+
+ return float(potential)
+
+
+def _compute_entry_additive(
+ pnl: float, duration_ratio: float, params: RewardParams
+) -> float:
+ """
+ Compute entry additive reward (non-PBRS component).
+
+ Args:
+ pnl: Current profit/loss ratio
+ duration_ratio: Current duration as fraction of max_trade_duration
+ params: Reward parameters
+
+ Returns:
+ Entry additive reward
+ """
+ if not _get_bool_param(params, "entry_additive_enabled", False):
+ return _fail_safely("entry_additive_disabled")
+
+ scale = _get_float_param(params, "entry_additive_scale", 1.0)
+ gain = _get_float_param(params, "entry_additive_gain", 1.0)
+ transform_pnl = _get_str_param(params, "entry_additive_transform_pnl", "tanh")
+ transform_duration = _get_str_param(
+ params, "entry_additive_transform_duration", "tanh"
+ )
+ sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0)
+
+ # Apply transforms
+ if transform_pnl == "softsign_sharp":
+ t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness)
+ else:
+ t_pnl = apply_transform(transform_pnl, gain * pnl)
+
+ if transform_duration == "softsign_sharp":
+ t_dur = apply_transform(
+ transform_duration, gain * duration_ratio, sharpness=sharpness
+ )
+ else:
+ t_dur = apply_transform(transform_duration, gain * duration_ratio)
+
+ additive = scale * 0.5 * (t_pnl + t_dur)
+
+ # Validate numerical safety
+ if not np.isfinite(additive):
+ return _fail_safely("non_finite_entry_additive")
+
+ return float(additive)
+
+
+def _compute_exit_additive(
+ pnl: float, duration_ratio: float, params: RewardParams
+) -> float:
+ """
+ Compute exit additive reward (non-PBRS component).
+
+ Args:
+ pnl: Final profit/loss ratio at exit
+ duration_ratio: Final duration as fraction of max_trade_duration
+ params: Reward parameters
+
+ Returns:
+ Exit additive reward
+ """
+ if not _get_bool_param(params, "exit_additive_enabled", False):
+ return _fail_safely("exit_additive_disabled")
+
+ scale = _get_float_param(params, "exit_additive_scale", 1.0)
+ gain = _get_float_param(params, "exit_additive_gain", 1.0)
+ transform_pnl = _get_str_param(params, "exit_additive_transform_pnl", "tanh")
+ transform_duration = _get_str_param(
+ params, "exit_additive_transform_duration", "tanh"
+ )
+ sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0)
+
+ # Apply transforms
+ if transform_pnl == "softsign_sharp":
+ t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness)
+ else:
+ t_pnl = apply_transform(transform_pnl, gain * pnl)
+
+ if transform_duration == "softsign_sharp":
+ t_dur = apply_transform(
+ transform_duration, gain * duration_ratio, sharpness=sharpness
+ )
+ else:
+ t_dur = apply_transform(transform_duration, gain * duration_ratio)
+
+ additive = scale * 0.5 * (t_pnl + t_dur)
+
+ # Validate numerical safety
+ if not np.isfinite(additive):
+ return _fail_safely("non_finite_exit_additive")
+
+ return float(additive)
+
+
+def _compute_exit_potential(
+ pnl: float, duration_ratio: float, params: RewardParams, last_potential: float = 0.0
+) -> float:
+ """Compute next potential Φ(s') for closing/exit transitions.
+
+ Mirrors the original environment semantics:
+ - canonical: Φ' = 0.0
+ - progressive_release: Φ' = Φ * (1 - decay) with decay clamped to [0,1]
+ - spike_cancel: Φ' = Φ / γ (neutralizes shaping spike ≈ 0 net effect) if γ>0 else Φ
+ - retain_previous: Φ' = Φ
+ Invalid modes fall back to canonical.
+ Any non-finite resulting potential is coerced to 0.0.
+ """
+ mode = _get_str_param(params, "exit_potential_mode", "canonical")
+ if mode == "canonical":
+ return _fail_safely("canonical_exit_potential")
+
+ if mode == "progressive_release":
+ decay = _get_float_param(params, "exit_potential_decay", 0.5)
+ if not np.isfinite(decay) or decay < 0.0:
+ decay = 0.5
+ if decay > 1.0:
+ decay = 1.0
+ next_potential = last_potential * (1.0 - decay)
+ elif mode == "spike_cancel":
+ gamma = _get_potential_gamma(params)
+ if gamma > 0.0 and np.isfinite(gamma):
+ next_potential = last_potential / gamma
+ else:
+ next_potential = last_potential
+ elif mode == "retain_previous":
+ next_potential = last_potential
+ else:
+ next_potential = _fail_safely("invalid_exit_potential_mode")
+
+ if not np.isfinite(next_potential):
+ next_potential = _fail_safely("non_finite_next_exit_potential")
+ return float(next_potential)
+
+
+def apply_potential_shaping(
+ base_reward: float,
+ current_pnl: float,
+ current_duration_ratio: float,
+ next_pnl: float,
+ next_duration_ratio: float,
+ is_terminal: bool,
+ last_potential: float,
+ params: RewardParams,
+) -> tuple[float, float, float]:
+ """
+ Apply PBRS potential-based reward shaping following Ng et al. (1999).
+
+ Implements: R'(s,a,s') = R_base(s,a,s') + γΦ(s') - Φ(s)
+
+ This function computes the complete PBRS transformation including:
+ - Hold potential: Φ(s) based on current state features
+ - Closing potential: Φ(s') with mode-specific terminal handling
+ - Entry/exit potentials: Non-PBRS additive components
+ - Gamma discounting: Standard RL discount factor
+ - Invariance guarantees: Optimal policy preservation
+
+ Theory:
+ PBRS maintains optimal policy invariance by ensuring that the potential-based
+ shaping reward is the difference of a potential function. Terminal states
+ require special handling to preserve this property.
+
+ Args:
+ base_reward: Base environment reward R_base(s,a,s')
+ current_pnl: Current state PnL ratio
+ current_duration_ratio: Current state duration ratio
+ next_pnl: Next state PnL ratio
+ next_duration_ratio: Next state duration ratio
+ is_terminal: Whether next state is terminal
+ last_potential: Previous potential for closing mode calculations
+ params: Reward parameters containing PBRS configuration
+
+ Returns:
+ tuple[total_reward, shaping_reward, next_potential]:
+ - total_reward: R_base + R_shaping + additives
+ - shaping_reward: Pure PBRS component γΦ(s') - Φ(s)
+ - next_potential: Φ(s') for next iteration
+
+ Raises:
+ ValueError: If gamma is invalid or numerical issues detected
+ """
+ # Enforce PBRS invariance (auto-disable additives in canonical mode)
+ params = _enforce_pbrs_invariance(params)
+
+ # Validate gamma (with None handling matching original environment)
+ gamma = _get_potential_gamma(params)
+ if not (0.0 <= gamma <= 1.0):
+ raise ValueError(f"Invalid gamma: {gamma}. Must be in [0, 1]")
+
+ # Compute current potential Φ(s)
+ current_potential = _compute_hold_potential(
+ current_pnl, current_duration_ratio, params
+ )
+
+ # Compute next potential Φ(s')
+ if is_terminal:
+ next_potential = _compute_exit_potential(
+ next_pnl, next_duration_ratio, params, last_potential
+ )
+ else:
+ next_potential = _compute_hold_potential(next_pnl, next_duration_ratio, params)
+
+ # PBRS shaping reward: γΦ(s') - Φ(s)
+ shaping_reward = gamma * next_potential - current_potential
+
+ # Compute additive components (non-PBRS)
+ entry_additive = _compute_entry_additive(
+ current_pnl, current_duration_ratio, params
+ )
+ exit_additive = (
+ _compute_exit_additive(next_pnl, next_duration_ratio, params)
+ if is_terminal
+ else 0.0
+ )
+
+ # Invariance diagnostic
+ _log_pbrs_invariance_warning(params)
+
+ # Total reward
+ total_reward = base_reward + shaping_reward + entry_additive + exit_additive
+
+ # Numerical validation & normalization of tiny shaping
+ if not np.isfinite(total_reward):
+ return float(base_reward), 0.0, 0.0
+ if np.isclose(shaping_reward, 0.0):
+ shaping_reward = 0.0
+ return float(total_reward), float(shaping_reward), float(next_potential)
+
+
+def _enforce_pbrs_invariance(params: RewardParams) -> RewardParams:
+ """Enforce PBRS invariance by auto-disabling additives in canonical mode.
+
+ Matches original environment behavior: canonical mode automatically
+ disables entry/exit additives to preserve theoretical invariance.
+
+ PBRS invariance (Ng et al. 1999) requires:
+ - canonical exit_potential_mode (terminal Φ=0)
+ - No path-dependent additive reward components enabled.
+
+ Returns modified params dict with invariance enforced.
+ """
+ mode = _get_str_param(params, "exit_potential_mode", "canonical")
+ if mode == "canonical":
+ # Make a copy to avoid mutating input
+ enforced_params = dict(params)
+ entry_enabled = _get_bool_param(params, "entry_additive_enabled", False)
+ exit_enabled = _get_bool_param(params, "exit_additive_enabled", False)
+
+ if entry_enabled:
+ warnings.warn(
+ "Disabling entry additive to preserve PBRS invariance (canonical mode).",
+ category=UserWarning,
+ stacklevel=2,
+ )
+ enforced_params["entry_additive_enabled"] = False
+
+ if exit_enabled:
+ warnings.warn(
+ "Disabling exit additive to preserve PBRS invariance (canonical mode).",
+ category=UserWarning,
+ stacklevel=2,
+ )
+ enforced_params["exit_additive_enabled"] = False
+
+ return enforced_params
+ return params
+
+
+def _log_pbrs_invariance_warning(params: RewardParams) -> None:
+ """Log an informational message if invariance conditions are violated.
+
+ PBRS invariance (Ng et al. 1999) requires:
+ - canonical exit_potential_mode (terminal Φ=0)
+ - No path-dependent additive reward components enabled.
+ This mirrors original environment diagnostic behavior.
+ """
+ mode = _get_str_param(params, "exit_potential_mode", "canonical")
+ if mode == "canonical":
+ if _get_bool_param(params, "entry_additive_enabled", False) or _get_bool_param(
+ params, "exit_additive_enabled", False
+ ):
+ warnings.warn(
+ (
+ "PBRS invariance relaxed: canonical mode with additive components enabled "
+ f"(entry_additive_enabled={_get_bool_param(params, 'entry_additive_enabled', False)}, "
+ f"exit_additive_enabled={_get_bool_param(params, 'exit_additive_enabled', False)})"
+ ),
+ category=UserWarning,
+ stacklevel=2,
+ )
+
+
if __name__ == "__main__":
main()
import unittest
import warnings
from pathlib import Path
+from typing import Iterable, Sequence
import numpy as np
import pandas as pd
+# Central PBRS parameter lists
+PBRS_INTEGRATION_PARAMS = [
+ "potential_gamma",
+ "hold_potential_enabled",
+ "hold_potential_scale",
+ "entry_additive_enabled",
+ "exit_additive_enabled",
+]
+PBRS_REQUIRED_PARAMS = PBRS_INTEGRATION_PARAMS + ["exit_potential_mode"]
+
# Import functions to test
try:
from reward_space_analysis import (
Actions,
Positions,
RewardContext,
+ _compute_entry_additive,
+ _compute_exit_additive,
+ _compute_exit_potential,
+ _compute_hold_potential,
+ _get_bool_param,
_get_exit_factor,
- _get_param_float,
+ _get_float_param,
_get_pnl_factor,
+ _get_str_param,
+ apply_potential_shaping,
+ apply_transform,
bootstrap_confidence_intervals,
build_argument_parser,
calculate_reward,
class RewardSpaceTestBase(unittest.TestCase):
- """Base class with common test utilities."""
+ """Base class with common test utilities.
+
+ Central tolerance policy (avoid per-test commentary):
+ - Generic numeric equality: 1e-6
+ - Component decomposition / identity: 1e-9
+ - Monotonic attenuation allowance: +1e-9 drift
+ """
@classmethod
def setUpClass(cls):
def assertAlmostEqualFloat(
self,
- first: float,
- second: float,
+ first: float | int,
+ second: float | int,
tolerance: float = 1e-6,
msg: str | None = None,
) -> None:
"""Absolute tolerance compare with explicit failure and finite check."""
- if not (np.isfinite(first) and np.isfinite(second)):
- self.fail(msg or f"Non-finite comparison (a={first}, b={second})")
+ self.assertFinite(first, name="a")
+ self.assertFinite(second, name="b")
diff = abs(first - second)
if diff > tolerance:
self.fail(
or f"Difference {diff} exceeds tolerance {tolerance} (a={first}, b={second})"
)
+ # --- Statistical bounds helpers (factorize redundancy) ---
+ def assertPValue(self, value: float | int, msg: str = "") -> None:
+ """Assert a p-value is finite and within [0,1]."""
+ self.assertFinite(value, name="p-value")
+ self.assertGreaterEqual(value, 0.0, msg or f"p-value < 0: {value}")
+ self.assertLessEqual(value, 1.0, msg or f"p-value > 1: {value}")
+
+ def assertDistanceMetric(
+ self,
+ value: float | int,
+ *,
+ non_negative: bool = True,
+ upper: float | None = None,
+ name: str = "metric",
+ ) -> None:
+ """Generic distance/divergence bounds: finite, optional non-negativity and optional upper bound."""
+ self.assertFinite(value, name=name)
+ if non_negative:
+ self.assertGreaterEqual(value, 0.0, f"{name} negative: {value}")
+ if upper is not None:
+ self.assertLessEqual(value, upper, f"{name} > {upper}: {value}")
+
+ def assertEffectSize(
+ self,
+ value: float | int,
+ *,
+ lower: float = -1.0,
+ upper: float = 1.0,
+ name: str = "effect size",
+ ) -> None:
+ """Assert effect size within symmetric interval and finite."""
+ self.assertFinite(value, name=name)
+ self.assertGreaterEqual(value, lower, f"{name} < {lower}: {value}")
+ self.assertLessEqual(value, upper, f"{name} > {upper}: {value}")
+
+ def assertFinite(self, value: float | int, name: str = "value") -> None:
+ """Assert scalar is finite."""
+ if not np.isfinite(value): # low-level base check to avoid recursion
+ self.fail(f"{name} not finite: {value}")
+
+ def assertMonotonic(
+ self,
+ seq: Sequence[float | int] | Iterable[float | int],
+ *,
+ non_increasing: bool | None = None,
+ non_decreasing: bool | None = None,
+ tolerance: float = 0.0,
+ name: str = "sequence",
+ ) -> None:
+ """Assert a sequence is monotonic under specified direction.
+
+ Provide exactly one of non_increasing/non_decreasing=True.
+ tolerance allows tiny positive drift in expected monotone direction.
+ """
+ data = list(seq)
+ if len(data) < 2:
+ return
+ if (non_increasing and non_decreasing) or (
+ not non_increasing and not non_decreasing
+ ):
+ self.fail("Specify exactly one monotonic direction")
+ for a, b in zip(data, data[1:]):
+ if non_increasing:
+ if b > a + tolerance:
+ self.fail(f"{name} not non-increasing at pair ({a}, {b})")
+ elif non_decreasing:
+ if b + tolerance < a:
+ self.fail(f"{name} not non-decreasing at pair ({a}, {b})")
+
+ def assertWithin(
+ self,
+ value: float | int,
+ low: float | int,
+ high: float | int,
+ *,
+ name: str = "value",
+ inclusive: bool = True,
+ ) -> None:
+ """Assert that value is within [low, high] (inclusive) or (low, high) if inclusive=False."""
+ self.assertFinite(value, name=name)
+ if inclusive:
+ self.assertGreaterEqual(value, low, f"{name} < {low}")
+ self.assertLessEqual(value, high, f"{name} > {high}")
+ else:
+ self.assertGreater(value, low, f"{name} <= {low}")
+ self.assertLess(value, high, f"{name} >= {high}")
+
class TestIntegration(RewardSpaceTestBase):
"""Integration tests for CLI and file outputs."""
# Values should be finite and reasonable
for metric_name, value in metrics.items():
if "pnl" in metric_name:
- self.assertTrue(np.isfinite(value), f"{metric_name} should be finite")
+ # All metrics must be finite; selected metrics must be non-negative
if any(
- suffix in metric_name for suffix in ["js_distance", "ks_statistic"]
+ suffix in metric_name
+ for suffix in [
+ "js_distance",
+ "ks_statistic",
+ "wasserstein",
+ "kl_divergence",
+ ]
):
- self.assertGreaterEqual(
- value, 0, f"{metric_name} should be non-negative"
- )
+ self.assertDistanceMetric(value, name=metric_name)
+ else:
+ self.assertFinite(value, name=metric_name)
def test_distribution_shift_identity_null_metrics(self):
"""Identical distributions should yield (near) zero shift metrics."""
"Idle durations should be non-negative",
)
- # Idle rewards should generally be negative (penalty for holding)
+ # Idle rewards should generally be negative (penalty for hold)
negative_rewards = (idle_rew < 0).sum()
total_rewards = len(idle_rew)
negative_ratio = negative_rewards / total_rewards
for suffix in expected_suffixes:
key = f"{prefix}{suffix}"
if key in diagnostics:
- self.assertTrue(
- np.isfinite(diagnostics[key]), f"{key} should be finite"
- )
+ self.assertFinite(diagnostics[key], name=key)
class TestRewardAlignment(RewardSpaceTestBase):
# Should return valid breakdown
self.assertIsInstance(breakdown.total, (int, float))
- self.assertTrue(np.isfinite(breakdown.total))
+ self.assertFinite(breakdown.total, name="breakdown.total")
# Exit reward should be positive for profitable trade
self.assertGreater(
pnl_factor = _get_pnl_factor(params, ctx, profit_target)
# Expect no efficiency modulation: factor should be >= 0 and close to 1.0
- self.assertTrue(np.isfinite(pnl_factor))
+ self.assertFinite(pnl_factor, name="pnl_factor")
self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=1e-6)
def test_max_idle_duration_candles_logic(self):
f"Expected less severe penalty with larger max_idle_duration_candles (large={breakdown_large.idle_penalty}, small={breakdown_small.idle_penalty})",
)
+ def test_pbrs_progressive_release_decay_clamped(self):
+ """progressive_release with decay>1 must clamp to 1 so Φ' = 0 and Δ = -Φ_prev."""
+ params = self.DEFAULT_PARAMS.copy()
+ params.update(
+ {
+ "potential_gamma": 0.95,
+ "exit_potential_mode": "progressive_release",
+ "exit_potential_decay": 5.0, # should clamp to 1.0
+ "hold_potential_enabled": True,
+ "entry_additive_enabled": False,
+ "exit_additive_enabled": False,
+ }
+ )
+ current_pnl = 0.02
+ current_dur = 0.5
+ prev_potential = _compute_hold_potential(current_pnl, current_dur, params)
+ total_reward, shaping_reward, next_potential = apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=current_pnl,
+ current_duration_ratio=current_dur,
+ next_pnl=0.02,
+ next_duration_ratio=0.6,
+ is_terminal=True,
+ last_potential=prev_potential,
+ params=params,
+ )
+ self.assertAlmostEqualFloat(next_potential, 0.0, tolerance=1e-9)
+ self.assertAlmostEqualFloat(shaping_reward, -prev_potential, tolerance=1e-9)
+ self.assertAlmostEqualFloat(total_reward, shaping_reward, tolerance=1e-9)
+
+ def test_pbrs_spike_cancel_invariance(self):
+ """spike_cancel terminal shaping should be ≈ 0 (γ*(Φ/γ) - Φ)."""
+ params = self.DEFAULT_PARAMS.copy()
+ params.update(
+ {
+ "potential_gamma": 0.9,
+ "exit_potential_mode": "spike_cancel",
+ "hold_potential_enabled": True,
+ "entry_additive_enabled": False,
+ "exit_additive_enabled": False,
+ }
+ )
+ current_pnl = 0.015
+ current_dur = 0.4
+ prev_potential = _compute_hold_potential(current_pnl, current_dur, params)
+ # Use helper accessor to avoid union type issues
+ gamma = _get_float_param(params, "potential_gamma", 0.95)
+ expected_next = (
+ prev_potential / gamma if gamma not in (0.0, None) else prev_potential
+ )
+ total_reward, shaping_reward, next_potential = apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=current_pnl,
+ current_duration_ratio=current_dur,
+ next_pnl=0.016,
+ next_duration_ratio=0.45,
+ is_terminal=True,
+ last_potential=prev_potential,
+ params=params,
+ )
+ self.assertAlmostEqualFloat(next_potential, expected_next, tolerance=1e-9)
+ self.assertAlmostEqualFloat(shaping_reward, 0.0, tolerance=1e-9)
+ self.assertAlmostEqualFloat(total_reward, 0.0, tolerance=1e-9)
+
def test_idle_penalty_fallback_and_proportionality(self):
"""Fallback & proportionality validation.
msg=f"Idle penalty proportionality mismatch (ratio={ratio})",
)
# Additional mid-range inference check (idle_duration between 1x and 2x trade duration)
- ctx_mid = dataclasses.replace(ctx_a, idle_duration=120, max_trade_duration=100)
+ ctx_mid = dataclasses.replace(
+ ctx_a, idle_duration=120, max_trade_duration=100
+ ) # Adjusted context for mid-range check
br_mid = calculate_reward(
ctx_mid,
params,
action_masking=True,
)
self.assertLess(br_mid.idle_penalty, 0.0)
- idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 0.5)
- idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.025)
+ idle_penalty_scale = _get_float_param(params, "idle_penalty_scale", 0.5)
+ idle_penalty_power = _get_float_param(params, "idle_penalty_power", 1.025)
# Internal factor may come from params (overrides provided base_factor argument)
- factor = _get_param_float(params, "base_factor", float(base_factor))
- idle_factor = factor * (profit_target * risk_reward_ratio) / 3.0
+ factor = _get_float_param(params, "base_factor", float(base_factor))
+ idle_factor = factor * (profit_target * risk_reward_ratio) / 4.0
observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale)
if observed_ratio > 0:
implied_D = 120 / (observed_ratio ** (1 / idle_penalty_power))
self.assertAlmostEqualFloat(
implied_D,
- 200.0,
- tolerance=12.0, # modest tolerance for float ops / rounding
- msg=f"Fallback denominator mismatch (implied={implied_D}, expected≈200, factor={factor})",
+ 400.0,
+ tolerance=20.0,
+ msg=f"Fallback denominator mismatch (implied={implied_D}, expected≈400, factor={factor})",
)
def test_exit_factor_threshold_warning_non_capping(self):
params = self.DEFAULT_PARAMS.copy()
# Remove base_factor from params so that the function uses the provided argument (makes scaling observable)
params.pop("base_factor", None)
- exit_factor_threshold = _get_param_float(
+ exit_factor_threshold = _get_float_param(
params, "exit_factor_threshold", 10_000.0
)
duration_ratio=0.3,
params=test_params,
)
- self.assertTrue(
- np.isfinite(factor), f"Exit factor for {mode} should be finite"
- )
+ self.assertFinite(factor, name=f"exit_factor[{mode}]")
self.assertGreater(factor, 0, f"Exit factor for {mode} should be positive")
# Plateau+linear variant sanity check (grace region at 0.5)
ratios_observed.append(float(ratio))
# Monotonic non-decreasing (allow tiny float noise)
- for a, b in zip(ratios_observed, ratios_observed[1:]):
- self.assertGreaterEqual(
- b + 1e-12, a, f"Amplification not monotonic: {ratios_observed}"
- )
+ self.assertMonotonic(
+ ratios_observed,
+ non_decreasing=True,
+ tolerance=1e-12,
+ name="pnl_amplification_ratio",
+ )
asymptote = 1.0 + win_reward_factor
final_ratio = ratios_observed[-1]
# Expect to be very close to asymptote (tanh(0.5*(10-1)) ≈ 0.9997)
- if not np.isfinite(final_ratio):
- self.fail(f"Final ratio is not finite: {final_ratio}")
+ self.assertFinite(final_ratio, name="final_ratio")
self.assertLess(
abs(final_ratio - asymptote),
1e-3,
expected_ratios.append(expected)
# Compare each observed to expected within loose tolerance (model parity)
for obs, exp in zip(ratios_observed, expected_ratios):
- if not (np.isfinite(obs) and np.isfinite(exp)):
- self.fail(f"Non-finite observed/expected ratio: obs={obs}, exp={exp}")
+ self.assertFinite(obs, name="observed_ratio")
+ self.assertFinite(exp, name="expected_ratio")
self.assertLess(
abs(obs - exp),
5e-6,
)
def test_scale_invariance_and_decomposition(self):
- """Reward components should scale linearly with base_factor and total == sum of components.
+ """Core reward components scale ~ linearly with base_factor; total = core + shaping + additives.
Contract:
- R(base_factor * k) = k * R(base_factor) for each non-zero component.
+ For each non-zero core component C: C(base_factor * k) ≈ k * C(base_factor).
+ Decomposition uses total = (exit + idle + hold + invalid + shaping + entry_additive + exit_additive).
"""
params = self.DEFAULT_PARAMS.copy()
# Remove internal base_factor so the explicit argument is used
position=Positions.Neutral,
action=Actions.Neutral,
),
- # Holding penalty
+ # Hold penalty
RewardContext(
pnl=0.0,
trade_duration=80,
action_masking=True,
)
- # Strict decomposition: total must equal sum of components
+ # Decomposition including shaping + additives (environment always applies PBRS pipeline)
for br in (br1, br2):
comp_sum = (
br.exit_component
+ br.idle_penalty
- + br.holding_penalty
+ + br.hold_penalty
+ br.invalid_penalty
+ + br.shaping_reward
+ + br.entry_additive
+ + br.exit_additive
)
self.assertAlmostEqual(
br.total,
components1 = {
"exit_component": br1.exit_component,
"idle_penalty": br1.idle_penalty,
- "holding_penalty": br1.holding_penalty,
+ "hold_penalty": br1.hold_penalty,
"invalid_penalty": br1.invalid_penalty,
- "total": br1.total,
+ # Exclude shaping/additives from scale invariance check (some may have nonlinear dependence)
+ "total": br1.exit_component
+ + br1.idle_penalty
+ + br1.hold_penalty
+ + br1.invalid_penalty,
}
components2 = {
"exit_component": br2.exit_component,
"idle_penalty": br2.idle_penalty,
- "holding_penalty": br2.holding_penalty,
+ "hold_penalty": br2.hold_penalty,
"invalid_penalty": br2.invalid_penalty,
- "total": br2.total,
+ "total": br2.exit_component
+ + br2.idle_penalty
+ + br2.hold_penalty
+ + br2.invalid_penalty,
}
for key, v1 in components1.items():
v2 = components2[key]
self.assertIn("pnl", results)
for metric, (mean, ci_low, ci_high) in results.items():
- self.assertTrue(np.isfinite(mean), f"Mean for {metric} should be finite")
- self.assertTrue(
- np.isfinite(ci_low), f"CI low for {metric} should be finite"
- )
- self.assertTrue(
- np.isfinite(ci_high), f"CI high for {metric} should be finite"
- )
+ self.assertFinite(mean, name=f"mean[{metric}]")
+ self.assertFinite(ci_low, name=f"ci_low[{metric}]")
+ self.assertFinite(ci_high, name=f"ci_high[{metric}]")
self.assertLess(
ci_low, ci_high, f"CI bounds for {metric} should be ordered"
)
{
"reward_total": np.random.normal(0, 1, 300),
"reward_idle": np.where(idle_mask, np.random.normal(-1, 0.3, 300), 0.0),
- "reward_holding": np.where(
+ "reward_hold": np.where(
~idle_mask, np.random.normal(-0.5, 0.2, 300), 0.0
),
"reward_exit": np.random.normal(0.8, 0.6, 300),
# KL divergence must be >= 0
kl_key = f"{feature}_kl_divergence"
if kl_key in metrics:
- self.assertGreaterEqual(
- metrics[kl_key], 0, f"KL divergence for {feature} must be >= 0"
- )
+ self.assertDistanceMetric(metrics[kl_key], name=kl_key)
# JS distance must be in [0, 1]
js_key = f"{feature}_js_distance"
if js_key in metrics:
- js_val = metrics[js_key]
- self.assertGreaterEqual(
- js_val, 0, f"JS distance for {feature} must be >= 0"
- )
- self.assertLessEqual(
- js_val, 1, f"JS distance for {feature} must be <= 1"
- )
+ self.assertDistanceMetric(metrics[js_key], upper=1.0, name=js_key)
# Wasserstein must be >= 0
ws_key = f"{feature}_wasserstein"
if ws_key in metrics:
- self.assertGreaterEqual(
- metrics[ws_key],
- 0,
- f"Wasserstein distance for {feature} must be >= 0",
- )
+ self.assertDistanceMetric(metrics[ws_key], name=ws_key)
# KS statistic must be in [0, 1]
ks_stat_key = f"{feature}_ks_statistic"
if ks_stat_key in metrics:
- ks_val = metrics[ks_stat_key]
- self.assertGreaterEqual(
- ks_val, 0, f"KS statistic for {feature} must be >= 0"
- )
- self.assertLessEqual(
- ks_val, 1, f"KS statistic for {feature} must be <= 1"
+ self.assertDistanceMetric(
+ metrics[ks_stat_key], upper=1.0, name=ks_stat_key
)
# KS p-value must be in [0, 1]
ks_p_key = f"{feature}_ks_pvalue"
if ks_p_key in metrics:
- p_val = metrics[ks_p_key]
- self.assertGreaterEqual(
- p_val, 0, f"KS p-value for {feature} must be >= 0"
+ self.assertPValue(
+ metrics[ks_p_key], msg=f"KS p-value out of bounds for {feature}"
)
- self.assertLessEqual(p_val, 1, f"KS p-value for {feature} must be <= 1")
def test_heteroscedasticity_pnl_validation(self):
"""Test that PnL variance increases with trade duration (heteroscedasticity)."""
{
"reward_total": np.random.normal(0, 1, 300),
"reward_idle": np.random.normal(-1, 0.5, 300),
- "reward_holding": np.random.normal(-0.5, 0.3, 300),
+ "reward_hold": np.random.normal(-0.5, 0.3, 300),
"reward_exit": np.random.normal(1, 0.8, 300),
"pnl": np.random.normal(0.01, 0.02, 300),
"trade_duration": np.random.uniform(5, 150, 300),
# Skewness can be any real number, but should be finite
if f"{col}_skewness" in diagnostics:
skew = diagnostics[f"{col}_skewness"]
- self.assertTrue(
- np.isfinite(skew), f"Skewness for {col} should be finite"
- )
+ self.assertFinite(skew, name=f"skewness[{col}]")
# Kurtosis should be finite (can be negative for platykurtic distributions)
if f"{col}_kurtosis" in diagnostics:
kurt = diagnostics[f"{col}_kurtosis"]
- self.assertTrue(
- np.isfinite(kurt), f"Kurtosis for {col} should be finite"
- )
+ self.assertFinite(kurt, name=f"kurtosis[{col}]")
# Shapiro p-value must be in [0, 1]
if f"{col}_shapiro_pval" in diagnostics:
- p_val = diagnostics[f"{col}_shapiro_pval"]
- self.assertGreaterEqual(
- p_val, 0, f"Shapiro p-value for {col} must be >= 0"
- )
- self.assertLessEqual(
- p_val, 1, f"Shapiro p-value for {col} must be <= 1"
+ self.assertPValue(
+ diagnostics[f"{col}_shapiro_pval"],
+ msg=f"Shapiro p-value bounds for {col}",
)
# Test hypothesis tests results bounds
for test_name, result in hypothesis_results.items():
# All p-values must be in [0, 1]
if "p_value" in result:
- p_val = result["p_value"]
- self.assertGreaterEqual(
- p_val, 0, f"p-value for {test_name} must be >= 0"
+ self.assertPValue(
+ result["p_value"], msg=f"p-value bounds for {test_name}"
)
- self.assertLessEqual(p_val, 1, f"p-value for {test_name} must be <= 1")
# Effect size epsilon squared (ANOVA/Kruskal) must be finite and >= 0
if "effect_size_epsilon_sq" in result:
eps2 = result["effect_size_epsilon_sq"]
- self.assertTrue(
- np.isfinite(eps2),
- f"Effect size epsilon^2 for {test_name} should be finite",
- )
+ self.assertFinite(eps2, name=f"epsilon_sq[{test_name}]")
self.assertGreaterEqual(
eps2, 0.0, f"Effect size epsilon^2 for {test_name} must be >= 0"
)
# Rank-biserial correlation (Mann-Whitney) must be finite in [-1, 1]
if "effect_size_rank_biserial" in result:
rb = result["effect_size_rank_biserial"]
- self.assertTrue(
- np.isfinite(rb),
- f"Rank-biserial correlation for {test_name} should be finite",
- )
+ self.assertFinite(rb, name=f"rank_biserial[{test_name}]")
self.assertGreaterEqual(
rb, -1.0, f"Rank-biserial correlation for {test_name} must be >= -1"
)
# Generic correlation effect size (Spearman/Pearson) if present
if "rho" in result:
rho = result["rho"]
- if rho is not None and np.isfinite(rho):
+ if rho is not None:
+ self.assertFinite(rho, name=f"rho[{test_name}]")
self.assertGreaterEqual(
rho, -1.0, f"Correlation rho for {test_name} must be >= -1"
)
self.assertIn("significant_adj", res, f"Missing significant_adj in {name}")
p_raw = res["p_value"]
p_adj = res["p_value_adj"]
- # Bounds & ordering
- self.assertTrue(0 <= p_raw <= 1, f"Raw p-value out of bounds ({p_raw})")
- self.assertTrue(
- 0 <= p_adj <= 1, f"Adjusted p-value out of bounds ({p_adj})"
- )
+ self.assertPValue(p_raw, msg=f"Raw p-value out of bounds ({p_raw})")
+ self.assertPValue(p_adj, msg=f"Adjusted p-value out of bounds ({p_adj})")
# BH should not reduce p-value (non-decreasing) after monotonic enforcement
self.assertGreaterEqual(
p_adj,
# Optional: if effect sizes present, basic bounds
if "effect_size_epsilon_sq" in res:
eff = res["effect_size_epsilon_sq"]
- self.assertTrue(
- np.isfinite(eff), f"Effect size finite check failed for {name}"
- )
+ self.assertFinite(eff, name=f"effect_size[{name}]")
self.assertGreaterEqual(eff, 0, f"ε² should be >=0 for {name}")
if "effect_size_rank_biserial" in res:
rb = res["effect_size_rank_biserial"]
- self.assertTrue(
- np.isfinite(rb), f"Rank-biserial finite check failed for {name}"
- )
+ self.assertFinite(rb, name=f"rank_biserial[{name}]")
self.assertGreaterEqual(rb, -1, f"Rank-biserial lower bound {name}")
self.assertLessEqual(rb, 1, f"Rank-biserial upper bound {name}")
"reward_total",
"reward_invalid",
"reward_idle",
- "reward_holding",
+ "reward_hold",
"reward_exit",
]
for col in required_columns:
)
# Total should always be finite
- self.assertTrue(
- np.isfinite(breakdown.total),
- f"Reward total should be finite for {position}/{action}",
- )
+ self.assertFinite(breakdown.total, name="breakdown.total")
class TestBoundaryConditions(RewardSpaceTestBase):
action_masking=True,
)
- self.assertTrue(
- np.isfinite(breakdown.total),
- "Reward should be finite even with extreme parameters",
- )
+ self.assertFinite(breakdown.total, name="breakdown.total")
def test_different_exit_attenuation_modes(self):
"""Test different exit attenuation modes (legacy, sqrt, linear, power, half_life)."""
action_masking=True,
)
- self.assertTrue(
- np.isfinite(breakdown.exit_component),
- f"Exit component should be finite for mode {mode}",
- )
- self.assertTrue(
- np.isfinite(breakdown.total),
- f"Total reward should be finite for mode {mode}",
+ self.assertFinite(
+ breakdown.exit_component, name="breakdown.exit_component"
)
+ self.assertFinite(breakdown.total, name="breakdown.total")
class TestHelperFunctions(RewardSpaceTestBase):
"reward_idle": np.concatenate(
[np.zeros(150), np.random.normal(-1, 0.5, 50)]
),
- "reward_holding": np.concatenate(
+ "reward_hold": np.concatenate(
[np.zeros(150), np.random.normal(-0.5, 0.3, 50)]
),
"reward_exit": np.concatenate(
)
self.assertLess(breakdown.idle_penalty, 0, "Idle penalty should be negative")
- self.assertEqual(
- breakdown.total, breakdown.idle_penalty, "Total should equal idle penalty"
+ # Total now includes shaping/additives - require equality including those components.
+ self.assertAlmostEqualFloat(
+ breakdown.total,
+ breakdown.idle_penalty
+ + breakdown.shaping_reward
+ + breakdown.entry_additive
+ + breakdown.exit_additive,
+ tolerance=1e-9,
+ msg="Total should equal sum of components (idle + shaping/additives)",
)
- def test_holding_penalty_via_rewards(self):
- """Test holding penalty calculation via reward calculation."""
- # Create context that will trigger holding penalty
+ def test_hold_penalty_via_rewards(self):
+ """Test hold penalty calculation via reward calculation."""
+ # Create context that will trigger hold penalty
context = RewardContext(
pnl=0.01,
trade_duration=150,
action_masking=True,
)
- self.assertLess(
- breakdown.holding_penalty, 0, "Holding penalty should be negative"
- )
- self.assertEqual(
+ self.assertLess(breakdown.hold_penalty, 0, "Hold penalty should be negative")
+ self.assertAlmostEqualFloat(
breakdown.total,
- breakdown.holding_penalty,
- "Total should equal holding penalty",
+ breakdown.hold_penalty
+ + breakdown.shaping_reward
+ + breakdown.entry_additive
+ + breakdown.exit_additive,
+ tolerance=1e-9,
+ msg="Total should equal sum of components (hold + shaping/additives)",
)
def test_exit_reward_calculation(self):
self.assertLess(
breakdown.invalid_penalty, 0, "Invalid action should have negative penalty"
)
- self.assertEqual(
+ self.assertAlmostEqualFloat(
breakdown.total,
- breakdown.invalid_penalty,
- "Total should equal invalid penalty",
+ breakdown.invalid_penalty
+ + breakdown.shaping_reward
+ + breakdown.entry_additive
+ + breakdown.exit_additive,
+ tolerance=1e-9,
+ msg="Total should equal invalid penalty plus shaping/additives",
)
- def test_holding_penalty_zero_before_max_duration(self):
- """Test holding penalty logic: zero penalty before max_trade_duration."""
+ def test_hold_penalty_zero_before_max_duration(self):
+ """Test hold penalty logic: zero penalty before max_trade_duration."""
max_duration = 128
# Test cases: before, at, and after max_duration
for trade_duration, description in test_cases:
with self.subTest(duration=trade_duration, desc=description):
context = RewardContext(
- pnl=0.0, # Neutral PnL to isolate holding penalty
+ pnl=0.0, # Neutral PnL to isolate hold penalty
trade_duration=trade_duration,
idle_duration=0,
max_trade_duration=max_duration,
if duration_ratio < 1.0:
# Before max_duration: should be exactly 0.0
self.assertEqual(
- breakdown.holding_penalty,
+ breakdown.hold_penalty,
0.0,
- f"Holding penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
+ f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
)
elif duration_ratio == 1.0:
# At max_duration: (1.0-1.0)^power = 0, so should be 0.0
self.assertEqual(
- breakdown.holding_penalty,
+ breakdown.hold_penalty,
0.0,
- f"Holding penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
+ f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
)
else:
# After max_duration: should be negative
self.assertLess(
- breakdown.holding_penalty,
+ breakdown.hold_penalty,
0.0,
- f"Holding penalty should be negative {description} (ratio={duration_ratio:.2f})",
+ f"Hold penalty should be negative {description} (ratio={duration_ratio:.2f})",
)
- # Total should equal holding penalty (no other components active)
- self.assertEqual(
+ self.assertAlmostEqualFloat(
breakdown.total,
- breakdown.holding_penalty,
- f"Total should equal holding penalty {description}",
+ breakdown.hold_penalty
+ + breakdown.shaping_reward
+ + breakdown.entry_additive
+ + breakdown.exit_additive,
+ tolerance=1e-9,
+ msg=f"Total mismatch including shaping {description}",
)
- def test_holding_penalty_progressive_scaling(self):
- """Test that holding penalty scales progressively after max_duration."""
+ def test_hold_penalty_progressive_scaling(self):
+ """Test that hold penalty scales progressively after max_duration."""
max_duration = 100
durations = [150, 200, 300] # All > max_duration
penalties: list[float] = []
action_masking=True,
)
- penalties.append(breakdown.holding_penalty)
+ penalties.append(breakdown.hold_penalty)
# Penalties should be increasingly negative (monotonic decrease)
for i in range(1, len(penalties)):
short_allowed=True,
action_masking=True,
)
- self.assertTrue(
- np.isfinite(breakdown.exit_component), "Exit component must be finite"
- )
+ self.assertFinite(breakdown.exit_component, name="exit_component")
class TestRewardRobustness(RewardSpaceTestBase):
"""Tests implementing all prioritized robustness enhancements.
Covers:
- - Reward decomposition integrity (total == sum of active component exactly)
+ - Reward decomposition integrity (total == core components + shaping + additives)
- Exit factor monotonic attenuation per mode where mathematically expected
- Boundary parameter conditions (tau extremes, plateau grace edges, linear slope = 0)
- - Non-linear power tests for idle & holding penalties (power != 1)
+ - Non-linear power tests for idle & hold penalties (power != 1)
- Warning emission (exit_factor_threshold) without capping
"""
),
active="idle_penalty",
),
- # Holding penalty only
+ # Hold penalty only
dict(
ctx=RewardContext(
pnl=0.0,
position=Positions.Long,
action=Actions.Neutral,
),
- active="holding_penalty",
+ active="hold_penalty",
),
# Exit reward only (positive pnl)
dict(
components = [
br.invalid_penalty,
br.idle_penalty,
- br.holding_penalty,
+ br.hold_penalty,
br.exit_component,
]
non_zero = [
)
self.assertAlmostEqualFloat(
br.total,
- non_zero[0],
+ non_zero[0]
+ + br.shaping_reward
+ + br.entry_additive
+ + br.exit_additive,
tolerance=1e-9,
- msg=f"Total mismatch for {sc['active']}",
+ msg=f"Total mismatch including shaping for {sc['active']}",
)
def test_exit_factor_monotonic_attenuation(self):
self.assertIn("exit_power_tau", adjustments)
self.assertIn("min=", str(adjustments["exit_power_tau"]["reason"]))
- def test_idle_and_holding_penalty_power(self):
+ def test_idle_and_hold_penalty_power(self):
"""Test non-linear scaling when penalty powers != 1."""
params = self.DEFAULT_PARAMS.copy()
params["idle_penalty_power"] = 2.0
delta=0.8,
msg=f"Idle penalty quadratic scaling mismatch (ratio={ratio_quadratic})",
)
- # Holding penalty with power 2: durations just above threshold
- params["holding_penalty_power"] = 2.0
+ # Hold penalty with power 2: durations just above threshold
+ params["hold_penalty_power"] = 2.0
ctx_h1 = RewardContext(
pnl=0.0,
trade_duration=130,
action=Actions.Neutral,
)
ctx_h2 = dataclasses.replace(ctx_h1, trade_duration=140)
- # Compute baseline and comparison holding penalties
+ # Compute baseline and comparison hold penalties
br_h1 = calculate_reward(
ctx_h1,
params,
)
# Quadratic scaling: ((140-100)/(130-100))^2 = (40/30)^2 ≈ 1.777...
hold_ratio = 0.0
- if br_h1.holding_penalty != 0:
- hold_ratio = br_h2.holding_penalty / br_h1.holding_penalty
+ if br_h1.hold_penalty != 0:
+ hold_ratio = br_h2.hold_penalty / br_h1.hold_penalty
self.assertAlmostEqual(
abs(hold_ratio),
(40 / 30) ** 2,
delta=0.4,
- msg=f"Holding penalty quadratic scaling mismatch (ratio={hold_ratio})",
+ msg=f"Hold penalty quadratic scaling mismatch (ratio={hold_ratio})",
)
def test_exit_factor_threshold_warning_emission(self):
self.assertIn("pnl", loaded_data.columns)
+class TestPBRSIntegration(RewardSpaceTestBase):
+ """Tests for PBRS (Potential-Based Reward Shaping) integration."""
+
+ def test_tanh_transform(self):
+ """tanh transform: bounded in [-1,1], symmetric."""
+ self.assertAlmostEqualFloat(apply_transform("tanh", 0.0), 0.0)
+ self.assertAlmostEqualFloat(apply_transform("tanh", 1.0), math.tanh(1.0))
+ self.assertAlmostEqualFloat(apply_transform("tanh", -1.0), math.tanh(-1.0))
+ self.assertTrue(abs(apply_transform("tanh", 100.0)) <= 1.0)
+ self.assertTrue(abs(apply_transform("tanh", -100.0)) <= 1.0)
+
+ def test_softsign_transform(self):
+ """softsign transform: x/(1+|x|) in (-1,1)."""
+ self.assertAlmostEqualFloat(apply_transform("softsign", 0.0), 0.0)
+ self.assertAlmostEqualFloat(apply_transform("softsign", 1.0), 0.5)
+ self.assertAlmostEqualFloat(apply_transform("softsign", -1.0), -0.5)
+ self.assertTrue(abs(apply_transform("softsign", 100.0)) < 1.0)
+ self.assertTrue(abs(apply_transform("softsign", -100.0)) < 1.0)
+
+ def test_arctan_transform(self):
+ """arctan transform: normalized (2/pi)atan(x) bounded [-1,1]."""
+ # Environment uses normalized arctan: (2/pi)*atan(x)
+ self.assertAlmostEqualFloat(apply_transform("arctan", 0.0), 0.0)
+ self.assertAlmostEqualFloat(
+ apply_transform("arctan", 1.0),
+ (2.0 / math.pi) * math.atan(1.0),
+ tolerance=1e-10,
+ )
+ self.assertTrue(abs(apply_transform("arctan", 100.0)) <= 1.0)
+ self.assertTrue(abs(apply_transform("arctan", -100.0)) <= 1.0)
+
+ def test_logistic_transform(self):
+ """logistic transform: 2σ(x)-1 in (-1,1)."""
+ # Environment logistic returns 2σ(x)-1 centered at 0 in (-1,1)
+ self.assertAlmostEqualFloat(apply_transform("logistic", 0.0), 0.0)
+ self.assertTrue(apply_transform("logistic", 100.0) > 0.99)
+ self.assertTrue(apply_transform("logistic", -100.0) < -0.99)
+ self.assertTrue(-1 < apply_transform("logistic", 10.0) < 1)
+ self.assertTrue(-1 < apply_transform("logistic", -10.0) < 1)
+
+ def test_clip_transform(self):
+ """clip transform: clamp to [-1,1]."""
+ self.assertAlmostEqualFloat(apply_transform("clip", 0.0), 0.0)
+ self.assertAlmostEqualFloat(apply_transform("clip", 0.5), 0.5)
+ self.assertAlmostEqualFloat(apply_transform("clip", 2.0), 1.0)
+ self.assertAlmostEqualFloat(apply_transform("clip", -2.0), -1.0)
+
+ def test_invalid_transform(self):
+ """Test error handling for invalid transforms."""
+ # Environment falls back silently to tanh
+ self.assertAlmostEqualFloat(
+ apply_transform("invalid_transform", 1.0), math.tanh(1.0), tolerance=1e-9
+ )
+
+ def test_get_float_param(self):
+ """Test float parameter extraction."""
+ params = {"test_float": 1.5, "test_int": 2, "test_str": "hello"}
+ self.assertEqual(_get_float_param(params, "test_float", 0.0), 1.5)
+ self.assertEqual(_get_float_param(params, "test_int", 0.0), 2.0)
+ # Non parseable string -> NaN fallback in tolerant parser
+ val_str = _get_float_param(params, "test_str", 0.0)
+ if isinstance(val_str, float) and math.isnan(val_str):
+ pass
+ else:
+ self.fail("Expected NaN for non-numeric string in _get_float_param")
+ self.assertEqual(_get_float_param(params, "missing", 3.14), 3.14)
+
+ def test_get_str_param(self):
+ """Test string parameter extraction."""
+ params = {"test_str": "hello", "test_int": 2}
+ self.assertEqual(_get_str_param(params, "test_str", "default"), "hello")
+ self.assertEqual(_get_str_param(params, "test_int", "default"), "default")
+ self.assertEqual(_get_str_param(params, "missing", "default"), "default")
+
+ def test_get_bool_param(self):
+ """Test boolean parameter extraction."""
+ params = {
+ "test_true": True,
+ "test_false": False,
+ "test_int": 1,
+ "test_str": "yes",
+ }
+ self.assertTrue(_get_bool_param(params, "test_true", False))
+ self.assertFalse(_get_bool_param(params, "test_false", True))
+ # Environment coerces typical truthy numeric/string values
+ self.assertTrue(_get_bool_param(params, "test_int", False))
+ self.assertTrue(_get_bool_param(params, "test_str", False))
+ self.assertFalse(_get_bool_param(params, "missing", False))
+
+ def test_hold_potential_basic(self):
+ """Test basic hold potential calculation."""
+ params = {
+ "hold_potential_enabled": True,
+ "hold_potential_scale": 1.0,
+ "hold_potential_gain": 1.0,
+ "hold_potential_transform_pnl": "tanh",
+ "hold_potential_transform_duration": "tanh",
+ }
+ val = _compute_hold_potential(0.5, 0.3, params)
+ self.assertFinite(val, name="hold_potential")
+
+ def test_entry_additive_disabled(self):
+ """Test entry additive when disabled."""
+ params = {"entry_additive_enabled": False}
+ val = _compute_entry_additive(0.5, 0.3, params)
+ self.assertEqual(val, 0.0)
+
+ def test_exit_additive_disabled(self):
+ """Test exit additive when disabled."""
+ params = {"exit_additive_enabled": False}
+ val = _compute_exit_additive(0.5, 0.3, params)
+ self.assertEqual(val, 0.0)
+
+ def test_exit_potential_canonical(self):
+ """Test exit potential in canonical mode."""
+ params = {"exit_potential_mode": "canonical"}
+ val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0)
+ self.assertEqual(val, 0.0)
+
+ def test_exit_potential_progressive_release(self):
+ """Progressive release: Φ' = Φ * (1 - decay)."""
+ params = {
+ "exit_potential_mode": "progressive_release",
+ "exit_potential_decay": 0.8,
+ }
+ # Expected: Φ' = Φ * (1 - decay) = 1 * (1 - 0.8) = 0.2
+ val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0)
+ self.assertAlmostEqual(val, 0.2)
+
+ def test_exit_potential_spike_cancel(self):
+ """Spike cancel: Φ' = Φ / γ (inversion)."""
+ params = {"exit_potential_mode": "spike_cancel", "potential_gamma": 0.95}
+ val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0)
+ self.assertAlmostEqual(val, 1.0 / 0.95, places=7)
+
+ def test_exit_potential_retain_previous(self):
+ """Test exit potential in retain previous mode."""
+ params = {"exit_potential_mode": "retain_previous"}
+ val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0)
+ self.assertEqual(val, 1.0)
+
+ def test_pbrs_terminal_canonical(self):
+ """Test PBRS behavior in canonical mode with terminal state."""
+ params = {
+ "potential_gamma": 0.95,
+ "hold_potential_enabled": True,
+ "hold_potential_scale": 1.0,
+ "hold_potential_gain": 1.0,
+ "hold_potential_transform_pnl": "tanh",
+ "hold_potential_transform_duration": "tanh",
+ "exit_potential_mode": "canonical",
+ "entry_additive_enabled": False,
+ "exit_additive_enabled": False,
+ }
+
+ current_pnl = 0.5
+ current_duration_ratio = 0.3
+ expected_current_potential = _compute_hold_potential(
+ current_pnl, current_duration_ratio, params
+ )
+
+ total_reward, shaping_reward, next_potential = apply_potential_shaping(
+ base_reward=100.0,
+ current_pnl=current_pnl,
+ current_duration_ratio=current_duration_ratio,
+ next_pnl=0.0,
+ next_duration_ratio=0.0,
+ is_terminal=True,
+ last_potential=0.0,
+ params=params,
+ )
+
+ # Terminal potential should be 0 in canonical mode
+ self.assertEqual(next_potential, 0.0)
+ # Shaping reward should be negative (releasing potential)
+ self.assertTrue(shaping_reward < 0)
+ # Check exact formula: γΦ(s') - Φ(s) = 0.95 * 0 - expected_current_potential
+ expected_shaping = 0.95 * 0.0 - expected_current_potential
+ self.assertAlmostEqual(shaping_reward, expected_shaping, delta=1e-10)
+
+ def test_pbrs_invalid_gamma(self):
+ """Test PBRS with invalid gamma value."""
+ params = {"potential_gamma": 1.5, "hold_potential_enabled": True}
+ with self.assertRaises(ValueError):
+ apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=0.0,
+ current_duration_ratio=0.0,
+ next_pnl=0.0,
+ next_duration_ratio=0.0,
+ is_terminal=True,
+ last_potential=0.0,
+ params=params,
+ )
+
+ def test_calculate_reward_with_pbrs_integration(self):
+ """Test that PBRS parameters are properly integrated in defaults."""
+ # Test that PBRS parameters are in the default parameters
+ for param in PBRS_INTEGRATION_PARAMS:
+ self.assertIn(
+ param,
+ DEFAULT_MODEL_REWARD_PARAMETERS,
+ f"PBRS parameter {param} missing from defaults",
+ )
+
+ # Test basic PBRS function integration works
+ params = {"hold_potential_enabled": True, "hold_potential_scale": 1.0}
+ potential = _compute_hold_potential(0.1, 0.2, params)
+ self.assertFinite(potential, name="hold_potential")
+
+ def test_pbrs_default_parameters_completeness(self):
+ """Test that all required PBRS parameters have defaults."""
+ for param in PBRS_REQUIRED_PARAMS:
+ self.assertIn(
+ param,
+ DEFAULT_MODEL_REWARD_PARAMETERS,
+ f"Missing PBRS parameter: {param}",
+ )
+
+
if __name__ == "__main__":
# Configure test discovery and execution
loader = unittest.TestLoader()
"""
_LOG_2 = math.log(2.0)
+ DEFAULT_IDLE_DURATION_MULTIPLIER: int = 4
_action_masks_cache: Dict[Tuple[bool, float], NDArray[np.bool_]] = {}
def __init__(self, *args, **kwargs):
model_params: Dict[str, Any] = copy.deepcopy(self.model_training_parameters)
model_params.setdefault("seed", 42)
+ model_params.setdefault("gamma", 0.95)
if not self.hyperopt and self.lr_schedule:
lr = model_params.get("learning_rate", 0.0003)
else:
tensorboard_log_path = None
+ # Rebuild train and eval environments before training to sync model parameters
+ prices_train, prices_test = self.build_ohlc_price_dataframes(
+ dk.data_dictionary, dk.pair, dk
+ )
+ self.set_train_and_eval_environments(
+ dk.data_dictionary, prices_train, prices_test, dk
+ )
+
model = self.get_init_model(dk.pair)
if model is not None:
logger.info(
dtype=np.float32, copy=False
)
n = np_dataframe.shape[0]
- window_size: int = self.CONV_WIDTH
+ window_size: int = self.window_size
frame_stacking: int = self.frame_stacking
frame_stacking_enabled: bool = bool(frame_stacking) and frame_stacking > 1
inference_masking: bool = self.action_masking and self.inference_masking
seed: Optional[int] = None,
env_info: Optional[Dict[str, Any]] = None,
trial: Optional[Trial] = None,
+ model_params: Optional[Dict[str, Any]] = None,
) -> Tuple[VecEnv, VecEnv]:
if (
train_df is None
if trial is not None:
seed += trial.number
set_random_seed(seed)
- env_info = self.pack_env_dict(dk.pair) if env_info is None else env_info
+ env_info: Dict[str, Any] = (
+ self.pack_env_dict(dk.pair) if env_info is None else env_info
+ )
+ gamma: Optional[float] = None
+ best_trial_params: Optional[Dict[str, Any]] = None
+ if self.hyperopt:
+ best_trial_params = self.load_best_trial_params(
+ dk.pair if self.rl_config_optuna.get("per_pair", False) else None
+ )
+ if model_params and isinstance(model_params.get("gamma"), (int, float)):
+ gamma = model_params.get("gamma")
+ elif best_trial_params:
+ gamma = best_trial_params.get("gamma")
+ elif hasattr(self.model, "gamma") and isinstance(
+ self.model.gamma, (int, float)
+ ):
+ gamma = self.model.gamma
+ elif isinstance(self.get_model_params().get("gamma"), (int, float)):
+ gamma = self.get_model_params().get("gamma")
+ if gamma is not None:
+ # Align RL agent gamma with PBRS gamma for consistent discount factor
+ env_info["config"]["freqai"]["rl_config"]["model_reward_parameters"][
+ "potential_gamma"
+ ] = float(gamma)
env_prefix = f"trial_{trial.number}_" if trial is not None else ""
train_fns = [
else:
tensorboard_log_path = None
- train_env, eval_env = self._get_train_and_eval_environments(dk, trial=trial)
+ train_env, eval_env = self._get_train_and_eval_environments(
+ dk, trial=trial, model_params=params
+ )
model = self.MODELCLASS(
self.policy_type,
self.max_trade_duration_candles: int = self.rl_config.get(
"max_trade_duration_candles", 128
)
+ # === Constants ===
+ self.MIN_SOFTSIGN_SHARPNESS: float = 0.01
+ self.MAX_SOFTSIGN_SHARPNESS: float = 100.0
+ # === INTERNAL STATE ===
self._last_closed_position: Optional[Positions] = None
self._last_closed_trade_tick: int = 0
self._max_unrealized_profit: float = -np.inf
self._min_unrealized_profit: float = np.inf
+ self._last_potential: float = 0.0
+ # === PBRS INSTRUMENTATION ===
+ self._total_shaping_reward: float = 0.0
+ self._last_shaping_reward: float = 0.0
+ model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
+ # === PBRS COMMON PARAMETERS ===
+ potential_gamma = model_reward_parameters.get("potential_gamma")
+ if potential_gamma is None:
+ logger.warning("potential_gamma not specified; defaulting to 0.95")
+ self._potential_gamma = 0.95
+ else:
+ self._potential_gamma = float(potential_gamma)
+ self._potential_softsign_sharpness: float = float(
+ model_reward_parameters.get("potential_softsign_sharpness", 1.0)
+ )
+ self._potential_softsign_sharpness = max(
+ self.MIN_SOFTSIGN_SHARPNESS,
+ min(self.MAX_SOFTSIGN_SHARPNESS, self._potential_softsign_sharpness),
+ )
+ # === EXIT POTENTIAL MODE ===
+ # exit_potential_mode options:
+ # 'canonical' -> Φ(s')=0 (baseline PBRS, preserves invariance)
+ # 'progressive_release' -> Φ(s')=Φ(s)*(1-decay_factor)
+ # 'spike_cancel' -> Φ(s')=Φ(s)/γ (Δ ≈ 0, cancels shaping)
+ # 'retain_previous' -> Φ(s')=Φ(s)
+ self._exit_potential_mode = str(
+ model_reward_parameters.get("exit_potential_mode", "canonical")
+ )
+ _allowed_exit_modes = {
+ "canonical",
+ "progressive_release",
+ "spike_cancel",
+ "retain_previous",
+ }
+ if self._exit_potential_mode not in _allowed_exit_modes:
+ logger.warning(
+ "Unknown exit_potential_mode '%s'; defaulting to 'canonical'",
+ self._exit_potential_mode,
+ )
+ self._exit_potential_mode = "canonical"
+ self._exit_potential_decay: float = float(
+ model_reward_parameters.get("exit_potential_decay", 0.5)
+ )
+ # === ENTRY ADDITIVE (non-PBRS additive term) ===
+ self._entry_additive_enabled: bool = bool(
+ model_reward_parameters.get("entry_additive_enabled", False)
+ )
+ self._entry_additive_scale: float = float(
+ model_reward_parameters.get("entry_additive_scale", 1.0)
+ )
+ self._entry_additive_gain: float = float(
+ model_reward_parameters.get("entry_additive_gain", 1.0)
+ )
+ self._entry_additive_transform_pnl: str = str(
+ model_reward_parameters.get("entry_additive_transform_pnl", "tanh")
+ )
+ self._entry_additive_transform_duration: str = str(
+ model_reward_parameters.get("entry_additive_transform_duration", "tanh")
+ )
+ # === HOLD POTENTIAL (PBRS function Φ) ===
+ self._hold_potential_enabled: bool = bool(
+ model_reward_parameters.get("hold_potential_enabled", True)
+ )
+ self._hold_potential_scale: float = float(
+ model_reward_parameters.get("hold_potential_scale", 1.0)
+ )
+ self._hold_potential_gain: float = float(
+ model_reward_parameters.get("hold_potential_gain", 1.0)
+ )
+ self._hold_potential_transform_pnl: str = str(
+ model_reward_parameters.get("hold_potential_transform_pnl", "tanh")
+ )
+ self._hold_potential_transform_duration: str = str(
+ model_reward_parameters.get("hold_potential_transform_duration", "tanh")
+ )
+ # === EXIT ADDITIVE (non-PBRS additive term) ===
+ self._exit_additive_enabled: bool = bool(
+ model_reward_parameters.get("exit_additive_enabled", False)
+ )
+ self._exit_additive_scale: float = float(
+ model_reward_parameters.get("exit_additive_scale", 1.0)
+ )
+ self._exit_additive_gain: float = float(
+ model_reward_parameters.get("exit_additive_gain", 1.0)
+ )
+ self._exit_additive_transform_pnl: str = str(
+ model_reward_parameters.get("exit_additive_transform_pnl", "tanh")
+ )
+ self._exit_additive_transform_duration: str = str(
+ model_reward_parameters.get("exit_additive_transform_duration", "tanh")
+ )
+ # === PBRS INVARIANCE CHECKS ===
+ if self._exit_potential_mode == "canonical":
+ if self._entry_additive_enabled or self._exit_additive_enabled:
+ if self._entry_additive_enabled:
+ logger.info(
+ "Disabling entry additive to preserve PBRS invariance (canonical mode)."
+ )
+ if self._exit_additive_enabled:
+ logger.info(
+ "Disabling exit additive to preserve PBRS invariance (canonical mode)."
+ )
+ self._entry_additive_enabled = False
+ self._exit_additive_enabled = False
+
+ def _get_next_position(self, action: int) -> Positions:
+ if action == Actions.Long_enter.value and self._position == Positions.Neutral:
+ return Positions.Long
+ if (
+ action == Actions.Short_enter.value
+ and self._position == Positions.Neutral
+ and self.can_short
+ ):
+ return Positions.Short
+ if action == Actions.Long_exit.value and self._position == Positions.Long:
+ return Positions.Neutral
+ if action == Actions.Short_exit.value and self._position == Positions.Short:
+ return Positions.Neutral
+ return self._position
+
+ def _get_next_transition_state(
+ self,
+ action: int,
+ trade_duration: float,
+ pnl: float,
+ ) -> Tuple[Positions, int, float]:
+ """Compute next transition state tuple."""
+ next_position = self._get_next_position(action)
+ # Entry
+ if self._position == Positions.Neutral and next_position in (
+ Positions.Long,
+ Positions.Short,
+ ):
+ return next_position, 0, 0.0
+ # Exit
+ if (
+ self._position in (Positions.Long, Positions.Short)
+ and next_position == Positions.Neutral
+ ):
+ return next_position, 0, 0.0
+ # Hold
+ if self._position in (Positions.Long, Positions.Short) and next_position in (
+ Positions.Long,
+ Positions.Short,
+ ):
+ return next_position, int(trade_duration) + 1, pnl
+ # Neutral self-loop
+ return next_position, 0, 0.0
+
+ def _is_invalid_pnl_target(self, pnl_target: float) -> bool:
+ """Check if pnl_target is invalid (negative or close to zero)."""
+ return pnl_target < 0.0 or np.isclose(pnl_target, 0.0)
+
+ def _compute_pnl_duration_signal(
+ self,
+ *,
+ enabled: bool,
+ require_position: bool,
+ position: Positions,
+ pnl: float,
+ pnl_target: float,
+ duration_ratio: float,
+ scale: float,
+ gain: float,
+ transform_pnl: str,
+ transform_duration: str,
+ ) -> float:
+ """Generic bounded bi-component signal combining PnL and duration.
+
+ Shared logic for:
+ - Hold potential Φ(s)
+ - Entry additive
+ - Exit additive
+
+ Parameters
+ ----------
+ enabled : bool
+ Whether this signal is active
+ require_position : bool
+ If True, only compute when position in (Long, Short)
+ position : Positions
+ Current position
+ pnl : float
+ PnL used for normalization
+ pnl_target : float
+ Target PnL normalizer (>0)
+ duration_ratio : float
+ Raw duration ratio
+ scale : float
+ Output scaling factor
+ gain : float
+ Gain multiplier before transform
+ transform_pnl : str
+ Transform name for PnL component
+ transform_duration : str
+ Transform name for duration component
+
+ Returns
+ -------
+ float
+ Bounded signal in [-scale, scale]
+ """
+ if not enabled:
+ return 0.0
+ if require_position and position not in (Positions.Long, Positions.Short):
+ return 0.0
+ if self._is_invalid_pnl_target(pnl_target):
+ return 0.0
+
+ duration_ratio = 0.0 if duration_ratio < 0.0 else duration_ratio
+ if duration_ratio > 1.0:
+ duration_ratio = 1.0
+
+ try:
+ pnl_ratio = pnl / pnl_target
+ except Exception:
+ return 0.0
+
+ pnl_term = self._potential_transform(transform_pnl, gain * pnl_ratio)
+ dur_term = self._potential_transform(transform_duration, gain * duration_ratio)
+ value = scale * 0.5 * (pnl_term + dur_term)
+ return float(value) if np.isfinite(value) else 0.0
+
+ def _compute_hold_potential(
+ self,
+ position: Positions,
+ duration_ratio: float,
+ pnl: float,
+ pnl_target: float,
+ ) -> float:
+ """Compute PBRS potential Φ(s) for position holding states.
+
+ See ``_apply_potential_shaping`` for complete PBRS documentation.
+ """
+ return self._compute_pnl_duration_signal(
+ enabled=self._hold_potential_enabled,
+ require_position=True,
+ position=position,
+ pnl=pnl,
+ pnl_target=pnl_target,
+ duration_ratio=duration_ratio,
+ scale=self._hold_potential_scale,
+ gain=self._hold_potential_gain,
+ transform_pnl=self._hold_potential_transform_pnl,
+ transform_duration=self._hold_potential_transform_duration,
+ )
+
+ def _compute_exit_additive(
+ self,
+ pnl: float,
+ pnl_target: float,
+ duration_ratio: float,
+ ) -> float:
+ """Compute exit additive reward for position exit transitions.
+
+ See ``_apply_potential_shaping`` for complete PBRS documentation.
+ """
+ return self._compute_pnl_duration_signal(
+ enabled=self._exit_additive_enabled,
+ require_position=False,
+ position=Positions.Neutral,
+ pnl=pnl,
+ pnl_target=pnl_target,
+ duration_ratio=duration_ratio,
+ scale=self._exit_additive_scale,
+ gain=self._exit_additive_gain,
+ transform_pnl=self._exit_additive_transform_pnl,
+ transform_duration=self._exit_additive_transform_duration,
+ )
+
+ def _compute_entry_additive(
+ self,
+ pnl: float,
+ pnl_target: float,
+ duration_ratio: float,
+ ) -> float:
+ """Compute entry additive reward for position entry transitions.
+
+ See ``_apply_potential_shaping`` for complete PBRS documentation.
+ """
+ return self._compute_pnl_duration_signal(
+ enabled=self._entry_additive_enabled,
+ require_position=False,
+ position=Positions.Neutral,
+ pnl=pnl,
+ pnl_target=pnl_target,
+ duration_ratio=duration_ratio,
+ scale=self._entry_additive_scale,
+ gain=self._entry_additive_gain,
+ transform_pnl=self._entry_additive_transform_pnl,
+ transform_duration=self._entry_additive_transform_duration,
+ )
+
+ def _potential_transform(self, name: str, x: float) -> float:
+ """Apply bounded transform function for potential and additive computations.
+
+ Provides numerical stability by mapping unbounded inputs to bounded outputs
+ using various smooth activation functions. Used in both PBRS potentials
+ and additive reward calculations.
+
+ Parameters
+ ----------
+ name : str
+ Transform function name: 'tanh', 'softsign', 'softsign_sharp',
+ 'arctan', 'logistic', 'asinh_norm', or 'clip'
+ x : float
+ Input value to transform
+
+ Returns
+ -------
+ float
+ Bounded output in [-1, 1]
+ """
+ if name == "tanh":
+ return math.tanh(x)
+
+ if name == "softsign":
+ ax = abs(x)
+ return x / (1.0 + ax)
+
+ if name == "softsign_sharp":
+ s = self._potential_softsign_sharpness
+ xs = s * x
+ ax = abs(xs)
+ return xs / (1.0 + ax)
+
+ if name == "arctan":
+ return (2.0 / math.pi) * math.atan(x)
+
+ if name == "logistic":
+ if x >= 0:
+ z = math.exp(-x) # z in (0,1]
+ return (1.0 - z) / (1.0 + z)
+ else:
+ z = math.exp(x) # z in (0,1]
+ return (z - 1.0) / (z + 1.0)
+
+ if name == "asinh_norm":
+ return x / math.hypot(1.0, x)
+
+ if name == "clip":
+ return max(-1.0, min(1.0, x))
+
+ logger.info("Unknown potential transform '%s'; falling back to tanh", name)
+ return math.tanh(x)
+
+ def _compute_exit_potential(self, prev_potential: float, gamma: float) -> float:
+ """Compute next potential Φ(s') for exit transitions based on exit potential mode.
+
+ See ``_apply_potential_shaping`` for complete PBRS documentation.
+ """
+ mode = self._exit_potential_mode
+ if mode == "canonical":
+ return 0.0
+ if mode == "progressive_release":
+ decay = self._exit_potential_decay
+ if not np.isfinite(decay) or decay < 0.0:
+ decay = 0.5
+ if decay > 1.0:
+ decay = 1.0
+ next_potential = prev_potential * (1.0 - decay)
+ elif mode == "spike_cancel":
+ if gamma <= 0.0 or not np.isfinite(gamma):
+ next_potential = prev_potential
+ else:
+ next_potential = prev_potential / gamma
+ elif mode == "retain_previous":
+ next_potential = prev_potential
+ else:
+ next_potential = 0.0
+ if not np.isfinite(next_potential):
+ next_potential = 0.0
+ return next_potential
+
+ def is_pbrs_invariant_mode(self) -> bool:
+ """Return True if current configuration preserves PBRS policy invariance.
+
+ PBRS policy invariance (Ng et al. 1999) requires:
+ 1. Canonical exit mode: Φ(terminal) = 0
+ 2. No path-dependent additives: entry_additive = exit_additive = 0
+
+ When True, the shaped policy π'(s) is guaranteed to be equivalent to
+ the policy π(s) learned with base rewards only.
+
+ Returns
+ -------
+ bool
+ True if configuration preserves theoretical PBRS invariance
+ """
+ return self._exit_potential_mode == "canonical" and not (
+ self._entry_additive_enabled or self._exit_additive_enabled
+ )
+
+ def _apply_potential_shaping(
+ self,
+ base_reward: float,
+ action: int,
+ trade_duration: float,
+ max_trade_duration: float,
+ pnl: float,
+ pnl_target: float,
+ ) -> float:
+ """Apply potential-based reward shaping (PBRS) following Ng et al. 1999.
+
+ Implements the canonical PBRS formula:
+
+ R'(s, a, s') = R_base(s, a, s') + γ Φ(s') - Φ(s)
+
+ Notation
+ --------
+ - R_base(s, a, s') : unshaped environment reward (code variable: ``base_reward``)
+ - Φ(s) : potential before transition (code: ``prev_potential`` / ``self._last_potential``)
+ - Φ(s') : potential after transition (computed per transition type)
+ - γ : shaping discount (``self._potential_gamma``)
+ - Δ(s,s') : shaping term = γ Φ(s') - Φ(s) (logged as ``shaping_reward`` per step)
+ - R'(s, a, s') : shaped reward delivered to the agent = R_base + Δ(s,s') + (additives if enabled)
+ - pnl_ratio : pnl / pnl_target (normalized profit component before transform)
+ - duration_ratio : trade_duration / max_trade_duration (clipped to [0,1] before transform)
+
+ PBRS Theory & Compliance
+ ------------------------
+ This implementation follows academic standards for potential-based reward shaping:
+ - Ng et al. 1999: Canonical formula with invariance guarantees
+ - Wiewiora et al. 2003: Terminal state handling (Φ(terminal)=0)
+ - Maintains policy invariance in canonical mode with proper terminal handling
+
+ Architecture & Transitions
+ --------------------------
+ Three mutually exclusive transition types:
+
+ 1. **Entry** (Neutral → Long/Short):
+ - Initialize potential Φ for next step: Φ(s') = hold_potential(next_state)
+ - PBRS shaping reward: γΦ(s') - Φ(s) where Φ(s)=0 (neutral has no potential)
+ - Optional entry additive (non-PBRS additive term, breaks invariance if used)
+
+ 2. **Hold** (Long/Short → Long/Short):
+ - Standard PBRS: γΦ(s') - Φ(s) where both potentials computed from hold_potential()
+ - Φ(s') accounts for updated PnL and trade duration progression
+
+ 3. **Exit** (Long/Short → Neutral):
+ - **Canonical mode**: Φ(terminal)=0, Δ(s,s') = -Φ(s)
+ - **Heuristic modes**: Φ(s') computed by _compute_exit_potential(), Δ(s,s') = γΦ(s')-Φ(s)
+ - Optional exit additive (non-PBRS additive term for trade quality summary)
+
+ Potential Function Φ(s)
+ -----------------------
+ Hold potential formula: Φ(s) = scale * 0.5 * [T_pnl(g*pnl_ratio) + T_dur(g*duration_ratio)]
+
+ **Bounded Transform Functions** (range [-1,1]):
+ - tanh: smooth saturation, tanh(x)
+ - softsign: x/(1+|x|), gentler than tanh
+ - softsign_sharp: softsign(sharpness*x), tunable steepness
+ - arctan: (2/π)*arctan(x), linear near origin
+ - logistic: 2σ(x)-1 where σ(x)=1/(1+e^(-x)), numerically stable implementation
+ - asinh_norm: x/√(1+x²), normalized asinh-like
+ - clip: hard clamp to [-1,1]
+
+ **Parameters**:
+ - gain g: sharpens (g>1) or softens (g<1) transform input
+ - scale: multiplies final potential value
+ - sharpness: affects softsign_sharp transform (must be >0)
+
+ Exit Potential Modes
+ --------------------
+ **canonical** (PBRS-compliant):
+ - Φ(s')=0 for all exit transitions
+ - Maintains theoretical invariance guarantees
+ - Shaping reward: γ·0-Φ(s) = -Φ(s)
+ - Entry/exit additives automatically disabled to preserve invariance
+
+ **progressive_release** (heuristic):
+ - Φ(s')=Φ(s)*(1-decay_factor), gradual decay
+ - Shaping reward: γΦ(s')-Φ(s) = γΦ(s)*(1-d)-Φ(s)
+
+ **spike_cancel** (heuristic):
+ - Φ(s')=Φ(s)/γ, aims for zero net shaping
+ - Shaping reward: γΦ(s')-Φ(s) = γ*(Φ(s)/γ)-Φ(s) = 0
+
+ **retain_previous** (heuristic):
+ - Φ(s')=Φ(s), full retention
+ - Shaping reward: (γ-1)Φ(s)
+
+ Additive Components & Path Dependence
+ ------------------------------------
+ **Entry/Exit Additive Terms**: Non-PBRS additive rewards that break invariance
+ - Entry additive: Applied at entry transitions, computed via _compute_entry_additive()
+ - Exit additive: Applied at exit transitions, computed via _compute_exit_additive()
+ - Neither additive persists in stored potential (maintains neutrality)
+
+ **Path Dependence**: Only canonical mode preserves PBRS invariance. Heuristic
+ exit modes introduce path dependence through non-zero terminal potentials.
+
+ Invariance & Validation
+ -----------------------
+ **Theoretical Guarantee**: In canonical mode, ∑ Δ(s,s') = 0 over
+ complete episodes due to Φ(terminal)=0. Entry/exit additives are automatically
+ disabled in canonical mode to preserve this invariance.
+
+ **Deviations from Theory**:
+ - Heuristic exit modes violate invariance
+ - Entry/exit additives break policy invariance
+ - Non-canonical modes may cause path-dependent learning
+
+ **Robustness**:
+ - Bounded transforms prevent potential explosion
+ - Finite value validation with fallback to 0
+ - Terminal state enforcement: Φ(s)=0 when terminated=True
+ - All transform functions are strictly bounded in [-1, 1], ensuring numerical stability
+
+ Parameters
+ ----------
+ base_reward : float
+ Original reward before shaping
+ action : int
+ Action taken leading to transition
+ trade_duration : float
+ Current trade duration in candles
+ max_trade_duration : float
+ Maximum allowed trade duration
+ pnl : float
+ Current position PnL
+ pnl_target : float
+ Target PnL for normalization
+
+ Returns
+ -------
+ float
+ Shaped reward R'(s,a,s') = R_base + Δ(s,s') + optional_additives
+
+ Notes
+ -----
+ - Use canonical mode for theoretical compliance
+ - Monitor ∑Δ(s,s') for invariance validation (should sum to 0 over episodes)
+ - Heuristic exit modes are experimental and may affect convergence
+ - Transform validation removed from runtime (deferred to analysis tools)
+ - In canonical exit mode, Φ is reset to 0 at exit boundaries, ensuring telescoping cancellation (∑Δ=0) over closed episodes
+ """
+ if not self._hold_potential_enabled and not (
+ self._entry_additive_enabled or self._exit_additive_enabled
+ ):
+ return base_reward
+ prev_potential = self._last_potential
+ next_position, next_trade_duration, next_pnl = self._get_next_transition_state(
+ action=action, trade_duration=trade_duration, pnl=pnl
+ )
+ if max_trade_duration <= 0:
+ next_duration_ratio = 0.0
+ else:
+ next_duration_ratio = next_trade_duration / max_trade_duration
+
+ is_entry = self._position == Positions.Neutral and next_position in (
+ Positions.Long,
+ Positions.Short,
+ )
+ is_exit = (
+ self._position in (Positions.Long, Positions.Short)
+ and next_position == Positions.Neutral
+ )
+ is_hold = self._position in (
+ Positions.Long,
+ Positions.Short,
+ ) and next_position in (Positions.Long, Positions.Short)
+
+ gamma = self._potential_gamma
+ if is_entry:
+ if self._hold_potential_enabled:
+ potential = self._compute_hold_potential(
+ next_position, next_duration_ratio, next_pnl, pnl_target
+ )
+ shaping_reward = gamma * potential - prev_potential
+ self._last_potential = potential
+ else:
+ shaping_reward = 0.0
+ self._last_potential = 0.0
+ entry_additive = self._compute_entry_additive(
+ pnl=next_pnl, pnl_target=pnl_target, duration_ratio=next_duration_ratio
+ )
+ self._last_shaping_reward = float(shaping_reward)
+ self._total_shaping_reward += float(shaping_reward)
+ return base_reward + shaping_reward + entry_additive
+ elif is_hold:
+ if self._hold_potential_enabled:
+ potential = self._compute_hold_potential(
+ next_position, next_duration_ratio, next_pnl, pnl_target
+ )
+ shaping_reward = gamma * potential - prev_potential
+ self._last_potential = potential
+ else:
+ shaping_reward = 0.0
+ self._last_potential = 0.0
+ self._last_shaping_reward = float(shaping_reward)
+ self._total_shaping_reward += float(shaping_reward)
+ return base_reward + shaping_reward
+ elif is_exit:
+ if self._exit_potential_mode == "canonical":
+ next_potential = 0.0
+ exit_shaping_reward = -prev_potential
+ else:
+ next_potential = self._compute_exit_potential(prev_potential, gamma)
+ exit_shaping_reward = gamma * next_potential - prev_potential
+
+ exit_additive = 0.0
+ if self._exit_additive_enabled and not self.is_pbrs_invariant_mode():
+ duration_ratio = trade_duration / max(max_trade_duration, 1)
+ exit_additive = self._compute_exit_additive(
+ pnl, pnl_target, duration_ratio
+ )
+
+ exit_reward = exit_shaping_reward + exit_additive
+ self._last_potential = next_potential
+ self._last_shaping_reward = float(exit_shaping_reward)
+ self._total_shaping_reward += float(exit_shaping_reward)
+ return base_reward + exit_reward
+ else:
+ # Neutral self-loop
+ self._last_potential = 0.0
+ self._last_shaping_reward = 0.0
+ return base_reward
def _set_observation_space(self) -> None:
"""
self._last_closed_trade_tick: int = 0
self._max_unrealized_profit = -np.inf
self._min_unrealized_profit = np.inf
+ self._last_potential = 0.0
+ self._total_shaping_reward = 0.0
+ self._last_shaping_reward = 0.0
return observation, history
def _get_exit_factor(
return max(0.0, pnl_target_factor * efficiency_factor)
def calculate_reward(self, action: int) -> float:
- """
- An example reward function. This is the one function that users will likely
- wish to inject their own creativity into.
-
- Warning!
- This is function is a showcase of functionality designed to show as many possible
- environment control features as possible. It is also designed to run quickly
- on small computers. This is a benchmark, it is *not* for live production.
-
- :param action: int = The action made by the agent for the current candle.
- :return:
- float = the reward to give to the agent for current step (used for optimization
- of weights in NN)
+ """Compute per-step reward and apply potential-based reward shaping (PBRS).
+
+ Reward Pipeline:
+ 1. Invalid action penalty
+ 2. Idle penalty
+ 3. Hold overtime penalty
+ 4. Exit reward
+ 5. Default fallback (0.0 if no specific reward)
+ 6. PBRS application: R'(s,a,s') = R_base + Δ(s,s') + optional_additives
+
+ The final shaped reward is what the RL agent receives for learning.
+ In canonical PBRS mode, the learned policy is theoretically equivalent
+ to training on base rewards only (policy invariance).
+
+ Parameters
+ ----------
+ action : int
+ Action index taken by the agent
+
+ Returns
+ -------
+ float
+ Shaped reward R'(s,a,s') = R_base + Δ(s,s') + optional_additives
"""
model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
+ base_reward: Optional[float] = None
- # first, penalize if the action is not valid
+ # 1. Invalid action
if not self.action_masking and not self._is_valid(action):
self.tensorboard_log("invalid", category="actions")
- return float(model_reward_parameters.get("invalid_action", -2.0))
-
- pnl = self.get_unrealized_profit()
- # mrr = self.get_most_recent_return()
- # mrp = self.get_most_recent_profit()
+ base_reward = float(model_reward_parameters.get("invalid_action", -2.0))
max_trade_duration = max(self.max_trade_duration_candles, 1)
trade_duration = self.get_trade_duration()
duration_ratio = trade_duration / max_trade_duration
-
base_factor = float(model_reward_parameters.get("base_factor", 100.0))
pnl_target = self.profit_aim * self.rr
- idle_factor = base_factor * pnl_target / 3.0
- holding_factor = idle_factor
-
- # # you can use feature values from dataframe
- # rsi_now = self.get_feature_value(
- # name="%-rsi",
- # period=8,
- # pair=self.pair,
- # timeframe=self.config.get("timeframe"),
- # raw=True,
- # )
-
- # # reward agent for entering trades when RSI is low
- # if (
- # action in (Actions.Long_enter.value, Actions.Short_enter.value)
- # and self._position == Positions.Neutral
- # ):
- # if rsi_now < 40:
- # factor = 40 / rsi_now
- # else:
- # factor = 1
- # return 25.0 * factor
-
- # discourage agent from sitting idle
- if action == Actions.Neutral.value and self._position == Positions.Neutral:
+ idle_factor = base_factor * pnl_target / 4.0
+ hold_factor = idle_factor
+
+ # 2. Idle penalty
+ if (
+ base_reward is None
+ and action == Actions.Neutral.value
+ and self._position == Positions.Neutral
+ ):
max_idle_duration = int(
model_reward_parameters.get(
- "max_idle_duration_candles", 2 * max_trade_duration
+ "max_idle_duration_candles",
+ ReforceXY.DEFAULT_IDLE_DURATION_MULTIPLIER * max_trade_duration,
)
)
idle_penalty_scale = float(
)
idle_duration = self.get_idle_duration()
idle_duration_ratio = idle_duration / max(1, max_idle_duration)
- return (
+ base_reward = (
-idle_factor
* idle_penalty_scale
* idle_duration_ratio**idle_penalty_power
)
- # discourage agent from sitting in position
+ # 3. Hold overtime penalty
if (
- self._position in (Positions.Short, Positions.Long)
+ base_reward is None
+ and self._position in (Positions.Short, Positions.Long)
and action == Actions.Neutral.value
):
- holding_penalty_scale = float(
- model_reward_parameters.get("holding_penalty_scale", 0.25)
+ hold_penalty_scale = float(
+ model_reward_parameters.get("hold_penalty_scale", 0.25)
)
- holding_penalty_power = float(
- model_reward_parameters.get("holding_penalty_power", 1.025)
+ hold_penalty_power = float(
+ model_reward_parameters.get("hold_penalty_power", 1.025)
)
-
if duration_ratio < 1.0:
- return 0.0
-
- return (
- -holding_factor
- * holding_penalty_scale
- * (duration_ratio - 1.0) ** holding_penalty_power
- )
-
- # close long
- if action == Actions.Long_exit.value and self._position == Positions.Long:
- return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
-
- # close short
- if action == Actions.Short_exit.value and self._position == Positions.Short:
- return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
+ base_reward = 0.0
+ else:
+ base_reward = (
+ -hold_factor
+ * hold_penalty_scale
+ * (duration_ratio - 1.0) ** hold_penalty_power
+ )
- return 0.0
+ # 4. Exit rewards
+ pnl = self.get_unrealized_profit()
+ if (
+ base_reward is None
+ and action == Actions.Long_exit.value
+ and self._position == Positions.Long
+ ):
+ base_reward = pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
+ if (
+ base_reward is None
+ and action == Actions.Short_exit.value
+ and self._position == Positions.Short
+ ):
+ base_reward = pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
+
+ # 5. Default
+ if base_reward is None:
+ base_reward = 0.0
+
+ # 6. Potential-based reward shaping
+ return self._apply_potential_shaping(
+ base_reward=base_reward,
+ action=action,
+ trade_duration=trade_duration,
+ max_trade_duration=max_trade_duration,
+ pnl=pnl,
+ pnl_target=pnl_target,
+ )
def _get_observation(self) -> NDArray[np.float32]:
- """
- This may or may not be independent of action types, user can inherit
- this in their custom "MyRLEnv"
- """
start_idx = max(self._start_tick, self._current_tick - self.window_size)
end_idx = min(self._current_tick, len(self.signal_features))
features_window = self.signal_features.iloc[start_idx:end_idx]
"most_recent_return": round(self.get_most_recent_return(), 5),
"most_recent_profit": round(self.get_most_recent_profit(), 5),
"total_profit": round(self._total_profit, 5),
+ "potential": round(self._last_potential, 5),
+ "shaping_reward": round(self._last_shaping_reward, 5),
+ "total_shaping_reward": round(self._total_shaping_reward, 5),
"reward": round(reward, 5),
"total_reward": round(self.total_reward, 5),
+ "pbrs_invariant": self.is_pbrs_invariant_mode(),
"idle_duration": self.get_idle_duration(),
"trade_duration": self.get_trade_duration(),
"trade_count": int(len(self.trade_history) // 2),
}
self._update_history(info)
+ terminated = self.is_terminated()
+ if terminated:
+ # Enforce Φ(terminal)=0 for PBRS invariance (Wiewiora et al. 2003)
+ self._last_potential = 0.0
return (
self._get_observation(),
reward,
- self.is_terminated(),
+ terminated,
self.is_truncated(),
info,
)
if regressor == "xgboost":
from xgboost import XGBRegressor
- if model_training_parameters.get("random_state") is None:
- model_training_parameters["random_state"] = 1
+ model_training_parameters.setdefault("random_state", 1)
if trial is not None:
model_training_parameters["random_state"] = (
elif regressor == "lightgbm":
from lightgbm import LGBMRegressor
- if model_training_parameters.get("seed") is None:
- model_training_parameters["seed"] = 1
+ model_training_parameters.setdefault("seed", 1)
if trial is not None:
model_training_parameters["seed"] = (