From 3f01976437bfda6a4cda2367cd0983ef060e0bdf Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 13 Oct 2025 23:39:10 +0200 Subject: [PATCH] chore(reforcexy): add dynamic potential based rewards (#6) MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * chore(reforcexy): add dynamic potential based rewards Signed-off-by: Jérôme Benoit * refactor(reforcexy): align tunables namespace Signed-off-by: Jérôme Benoit * refactor(reforcexy): factor out pnl_target validation Signed-off-by: Jérôme Benoit * refactor(reforcexy): cleanup PBRS integration Signed-off-by: Jérôme Benoit * refactor(reforcexy): factor out default idle duration multiplier Signed-off-by: Jérôme Benoit * chore: cleanup variable initialization Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor(reforcexy): factor out PBRS next state definition Signed-off-by: Jérôme Benoit * chore: revert copilot incorrect suggestions Signed-off-by: Jérôme Benoit * refactor(reforcexy): PBRS integration code cleanups Signed-off-by: Jérôme Benoit * refactor(reforcexy): handle overtflow Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor(reforcexy): avoid code duplication Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor(reforcexy): add PBRS instrumentation Signed-off-by: Jérôme Benoit * refactor(reforcexy): factor out PBRS signal computation Signed-off-by: Jérôme Benoit * refactor(reforcexy): add check to PBRS logic Signed-off-by: Jérôme Benoit * chore: sync PBRS in RSA Signed-off-by: Jérôme Benoit * refactor(reforcexy): cleanup PBRS integration in RSA Signed-off-by: Jérôme Benoit * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * docs: add PBRS Signed-off-by: Jérôme Benoit * fix(reforcexy): ensure model gamma is transmitted to env Signed-off-by: Jérôme Benoit --------- Signed-off-by: Jérôme Benoit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ReforceXY/reward_space_analysis/README.md | 65 +- .../reward_space_analysis.py | 959 +++++++++++++++--- .../test_reward_space_analysis.py | 736 ++++++++++---- ReforceXY/user_data/freqaimodels/ReforceXY.py | 831 +++++++++++++-- quickadapter/user_data/strategies/Utils.py | 6 +- 5 files changed, 2215 insertions(+), 382 deletions(-) diff --git a/ReforceXY/reward_space_analysis/README.md b/ReforceXY/reward_space_analysis/README.md index db7a2d6..36cd9a9 100644 --- a/ReforceXY/reward_space_analysis/README.md +++ b/ReforceXY/reward_space_analysis/README.md @@ -13,6 +13,7 @@ This tool helps you understand and validate how the ReforceXY reinforcement lear - ✅ Generate thousands of synthetic trading scenarios deterministically - ✅ Analyze reward distribution, feature importance & partial dependence - ✅ Built‑in invariant & statistical validation layers (fail‑fast) +- ✅ PBRS (Potential-Based Reward Shaping) integration with canonical invariance - ✅ Export reproducible artifacts (parameter hash + execution manifest) - ✅ Compare synthetic vs real trading data (distribution shift metrics) - ✅ Parameter bounds validation & automatic sanitization @@ -130,6 +131,12 @@ python reward_space_analysis.py \ --num_samples 30000 \ --params win_reward_factor=4.0 \ --output aggressive_rewards + +# Test PBRS potential shaping +python reward_space_analysis.py \ + --num_samples 30000 \ + --params hold_potential_enabled=true potential_gamma=0.9 exit_potential_mode=progressive_release \ + --output pbrs_analysis ``` **Compare:** Reward distributions between runs in `statistical_analysis.md` @@ -262,10 +269,10 @@ _Idle penalty configuration:_ - `idle_penalty_scale` (default: 0.5) - Scale of idle penalty - `idle_penalty_power` (default: 1.025) - Power applied to idle penalty scaling -_Holding penalty configuration:_ +_Hold penalty configuration:_ -- `holding_penalty_scale` (default: 0.25) - Scale of holding penalty -- `holding_penalty_power` (default: 1.025) - Power applied to holding penalty scaling +- `hold_penalty_scale` (default: 0.25) - Scale of hold penalty +- `hold_penalty_power` (default: 1.025) - Power applied to hold penalty scaling _Exit attenuation configuration:_ @@ -307,6 +314,28 @@ _Profit factor configuration:_ - `win_reward_factor` (default: 2.0) - Asymptotic bonus multiplier for PnL above target. Raw `profit_target_factor` ∈ [1, 1 + win_reward_factor] (tanh bounds it); overall amplification may exceed this once multiplied by `efficiency_factor`. - `pnl_factor_beta` (default: 0.5) - Sensitivity of amplification around target +_PBRS (Potential-Based Reward Shaping) configuration:_ + +- `potential_gamma` (default: 0.95) - Discount factor γ for PBRS potential term (0 ≤ γ ≤ 1) +- `potential_softsign_sharpness` (default: 1.0) - Sharpness parameter for softsign_sharp transform (smaller = sharper) +- `exit_potential_mode` (default: canonical) - Exit potential mode: 'canonical' (Φ=0), 'progressive_release', 'spike_cancel', 'retain_previous' +- `exit_potential_decay` (default: 0.5) - Decay factor for progressive_release exit mode (0 ≤ decay ≤ 1) +- `hold_potential_enabled` (default: true) - Enable PBRS hold potential function Φ(s) +- `hold_potential_scale` (default: 1.0) - Scale factor for hold potential function +- `hold_potential_gain` (default: 1.0) - Gain factor applied before transforms in hold potential +- `hold_potential_transform_pnl` (default: tanh) - Transform function for PnL: tanh, softsign, softsign_sharp, arctan, logistic, asinh_norm, clip +- `hold_potential_transform_duration` (default: tanh) - Transform function for duration ratio +- `entry_additive_enabled` (default: false) - Enable entry additive reward (non-PBRS component) +- `entry_additive_scale` (default: 1.0) - Scale factor for entry additive reward +- `entry_additive_gain` (default: 1.0) - Gain factor for entry additive reward +- `entry_additive_transform_pnl` (default: tanh) - Transform function for PnL in entry additive +- `entry_additive_transform_duration` (default: tanh) - Transform function for duration ratio in entry additive +- `exit_additive_enabled` (default: false) - Enable exit additive reward (non-PBRS component) +- `exit_additive_scale` (default: 1.0) - Scale factor for exit additive reward +- `exit_additive_gain` (default: 1.0) - Gain factor for exit additive reward +- `exit_additive_transform_pnl` (default: tanh) - Transform function for PnL in exit additive +- `exit_additive_transform_duration` (default: tanh) - Transform function for duration ratio in exit additive + _Invariant / safety controls:_ - `check_invariants` (default: true) - Enable/disable runtime invariant & safety validations (simulation invariants, mathematical bounds, distribution checks). Set to `false` only for performance experiments; not recommended for production validation. @@ -380,6 +409,12 @@ python reward_space_analysis.py \ --params win_reward_factor=3.0 idle_penalty_scale=1.5 \ --output sensitivity_test +# PBRS potential shaping analysis +python reward_space_analysis.py \ + --num_samples 40000 \ + --params hold_potential_enabled=true exit_potential_mode=spike_cancel potential_gamma=0.95 \ + --output pbrs_test + # Real vs synthetic comparison python reward_space_analysis.py \ --num_samples 100000 \ @@ -400,12 +435,14 @@ The analysis generates the following output files: - **Global Statistics** - Reward distributions and component activation rates - **Sample Representativity** - Coverage of critical market scenarios - **Component Analysis** - Relationships between rewards and conditions +- **PBRS Analysis** - Potential-based reward shaping component activation rates, statistics, and invariance validation - **Feature Importance** - Machine learning analysis of key drivers - **Statistical Validation** - Hypothesis tests, confidence intervals, normality + effect sizes - **Distribution Shift** - Real vs synthetic divergence (KL, JS, Wasserstein, KS) - **Diagnostics Validation Summary** - Pass/fail snapshot of all runtime checks - Consolidated pass/fail state of every validation layer (invariants, parameter bounds, bootstrap CIs, distribution metrics, diagnostics, hypothesis tests) + - PBRS invariance validation (canonical mode check: ∑shaping_rewards ≈ 0) ### Data Exports @@ -465,11 +502,22 @@ python reward_space_analysis.py \ --params exit_attenuation_mode=power exit_power_tau=0.5 efficiency_weight=0.8 \ --output custom_test -# Test aggressive holding penalties +# Test aggressive hold penalties +python reward_space_analysis.py \ + --num_samples 25000 \ + --params hold_penalty_scale=0.5 \ + --output aggressive_hold + +# Test PBRS configurations +python reward_space_analysis.py \ + --num_samples 25000 \ + --params hold_potential_enabled=true entry_additive_enabled=true exit_additive_enabled=false exit_potential_mode=canonical \ + --output pbrs_canonical + python reward_space_analysis.py \ --num_samples 25000 \ - --params holding_penalty_scale=0.5 \ - --output aggressive_holding + --params hold_potential_transform_pnl=softsign_sharp potential_softsign_sharpness=0.5 \ + --output pbrs_sharp_transforms ``` ### Real Data Comparison @@ -523,10 +571,11 @@ Always run the full suite after modifying reward logic or attenuation parameters | Statistical Validation | TestStatisticalValidation | Mathematical bounds, heteroscedasticity, invariants | | Boundary Conditions | TestBoundaryConditions | Extreme params & unknown mode fallback | | Helper Functions | TestHelperFunctions | Report writers, model analysis, utility conversions | -| Private Functions (via public API) | TestPrivateFunctions | Idle / holding / invalid penalties, exit scenarios | +| Private Functions (via public API) | TestPrivateFunctions | Idle / hold / invalid penalties, exit scenarios | | Robustness | TestRewardRobustness | Monotonic attenuation (where applicable), decomposition integrity, boundary regimes | | Parameter Validation | TestParameterValidation | Bounds clamping, warning threshold, penalty power scaling | | Continuity | TestContinuityPlateau | Plateau boundary continuity & small‑epsilon attenuation scaling | +| PBRS Integration | TestPBRSIntegration | Potential-based reward shaping, transforms, exit modes, canonical invariance | ### Test Architecture @@ -592,6 +641,8 @@ pytest -q test_reward_space_analysis.py::TestRewardAlignment - Review parameter overrides with `--params` - Check trading mode settings (spot vs margin/futures) - Verify `base_factor` matches your environment config +- Check PBRS settings: `hold_potential_enabled`, `exit_potential_mode`, and transform functions +- Review parameter adjustments in output logs for any automatic bound clamping ### Slow Execution diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index 0868703..a733a05 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -73,7 +73,7 @@ def _to_bool(value: Any) -> bool: return bool(text) -def _get_param_float( +def _get_float_param( params: RewardParams, key: str, default: RewardParamValue ) -> float: """Extract float parameter with type safety and default fallback.""" @@ -121,14 +121,40 @@ def _is_short_allowed(trading_mode: str) -> bool: # Mathematical constants pre-computed for performance _LOG_2 = math.log(2.0) +DEFAULT_IDLE_DURATION_MULTIPLIER = 4 RewardParamValue = Union[float, str, bool, None] RewardParams = Dict[str, RewardParamValue] +# Internal safe fallback helper for numeric failures (centralizes semantics) +def _fail_safely(reason: str) -> float: + """Return 0.0 on recoverable numeric failure (reason available for future debug hooks).""" + # NOTE: presently silent to preserve legacy behaviour; hook logging here if needed. + _ = reason + return 0.0 + + # Allowed exit attenuation modes ALLOWED_EXIT_MODES = {"legacy", "sqrt", "linear", "power", "half_life"} +# PBRS constants +ALLOWED_TRANSFORMS = { + "tanh", + "softsign", + "softsign_sharp", + "arctan", + "logistic", + "asinh_norm", + "clip", +} +ALLOWED_EXIT_POTENTIAL_MODES = { + "canonical", + "progressive_release", + "spike_cancel", + "retain_previous", +} + DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { "invalid_action": -2.0, "base_factor": 100.0, @@ -137,9 +163,9 @@ DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { "idle_penalty_power": 1.025, # Fallback: 2 * max_trade_duration_candles "max_idle_duration_candles": None, - # Holding keys (env defaults) - "holding_penalty_scale": 0.25, - "holding_penalty_power": 1.025, + # Hold keys (env defaults) + "hold_penalty_scale": 0.25, + "hold_penalty_power": 1.025, # Exit attenuation configuration (env default) "exit_attenuation_mode": "linear", "exit_plateau": True, @@ -156,6 +182,32 @@ DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { # Invariant / safety controls (env defaults) "check_invariants": True, "exit_factor_threshold": 10000.0, + # === PBRS PARAMETERS === + # Potential-based reward shaping core parameters + # Discount factor γ for potential term (0 ≤ γ ≤ 1) + "potential_gamma": 0.95, + "potential_softsign_sharpness": 1.0, + # Exit potential modes: canonical | progressive_release | spike_cancel | retain_previous + "exit_potential_mode": "canonical", + "exit_potential_decay": 0.5, + # Hold potential (PBRS function Φ) + "hold_potential_enabled": True, + "hold_potential_scale": 1.0, + "hold_potential_gain": 1.0, + "hold_potential_transform_pnl": "tanh", + "hold_potential_transform_duration": "tanh", + # Entry additive (non-PBRS additive term) + "entry_additive_enabled": False, + "entry_additive_scale": 1.0, + "entry_additive_gain": 1.0, + "entry_additive_transform_pnl": "tanh", + "entry_additive_transform_duration": "tanh", + # Exit additive (non-PBRS additive term) + "exit_additive_enabled": False, + "exit_additive_scale": 1.0, + "exit_additive_gain": 1.0, + "exit_additive_transform_pnl": "tanh", + "exit_additive_transform_duration": "tanh", } DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { @@ -164,8 +216,8 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { "idle_penalty_power": "Power applied to idle penalty scaling.", "idle_penalty_scale": "Scale of idle penalty.", "max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling.", - "holding_penalty_scale": "Scale of holding penalty.", - "holding_penalty_power": "Power applied to holding penalty scaling.", + "hold_penalty_scale": "Scale of hold penalty.", + "hold_penalty_power": "Power applied to hold penalty scaling.", "exit_attenuation_mode": "Attenuation kernel (legacy|sqrt|linear|power|half_life).", "exit_plateau": "Enable plateau. If true, full strength until grace boundary then apply attenuation.", "exit_plateau_grace": "Grace boundary duration ratio for plateau (full strength until this boundary).", @@ -178,6 +230,26 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { "pnl_factor_beta": "Sensitivity of amplification around target.", "check_invariants": "Boolean flag (true/false) to enable runtime invariant & safety checks.", "exit_factor_threshold": "If |exit factor| exceeds this threshold, emit warning.", + # PBRS parameters + "potential_gamma": "Discount factor γ for PBRS potential-based reward shaping (0 ≤ γ ≤ 1).", + "potential_softsign_sharpness": "Sharpness parameter for softsign_sharp transform (smaller = sharper).", + "exit_potential_mode": "Exit potential mode: 'canonical' (Φ=0), 'progressive_release', 'spike_cancel', 'retain_previous'.", + "exit_potential_decay": "Decay factor for progressive_release exit mode (0 ≤ decay ≤ 1).", + "hold_potential_enabled": "Enable PBRS hold potential function Φ(s).", + "hold_potential_scale": "Scale factor for hold potential function.", + "hold_potential_gain": "Gain factor applied before transforms in hold potential.", + "hold_potential_transform_pnl": "Transform function for PnL in hold potential: tanh, softsign, softsign_sharp, arctan, logistic, asinh_norm, clip.", + "hold_potential_transform_duration": "Transform function for duration ratio in hold potential.", + "entry_additive_enabled": "Enable entry additive reward (non-PBRS component).", + "entry_additive_scale": "Scale factor for entry additive reward.", + "entry_additive_gain": "Gain factor for entry additive reward.", + "entry_additive_transform_pnl": "Transform function for PnL in entry additive.", + "entry_additive_transform_duration": "Transform function for duration ratio in entry additive.", + "exit_additive_enabled": "Enable exit additive reward (non-PBRS component).", + "exit_additive_scale": "Scale factor for exit additive reward.", + "exit_additive_gain": "Gain factor for exit additive reward.", + "exit_additive_transform_pnl": "Transform function for PnL in exit additive.", + "exit_additive_transform_duration": "Transform function for duration ratio in exit additive.", } @@ -192,8 +264,8 @@ _PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = { "idle_penalty_power": {"min": 0.0}, "idle_penalty_scale": {"min": 0.0}, "max_idle_duration_candles": {"min": 0.0}, - "holding_penalty_scale": {"min": 0.0}, - "holding_penalty_power": {"min": 0.0}, + "hold_penalty_scale": {"min": 0.0}, + "hold_penalty_power": {"min": 0.0}, "exit_linear_slope": {"min": 0.0}, "exit_plateau_grace": {"min": 0.0}, "exit_power_tau": {"min": 1e-6, "max": 1.0}, # open (0,1] approximated @@ -202,6 +274,17 @@ _PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = { "efficiency_center": {"min": 0.0, "max": 1.0}, "win_reward_factor": {"min": 0.0}, "pnl_factor_beta": {"min": 1e-6}, + # PBRS parameter bounds + "potential_gamma": {"min": 0.0, "max": 1.0}, + # Softsign sharpness: only lower bound enforced (upper bound limited implicitly by transform stability) + "potential_softsign_sharpness": {"min": 1e-6}, + "exit_potential_decay": {"min": 0.0, "max": 1.0}, + "hold_potential_scale": {"min": 0.0}, + "hold_potential_gain": {"min": 0.0}, + "entry_additive_scale": {"min": 0.0}, + "entry_additive_gain": {"min": 0.0}, + "exit_additive_scale": {"min": 0.0}, + "exit_additive_gain": {"min": 0.0}, } @@ -233,6 +316,27 @@ def validate_reward_parameters( """ sanitized = dict(params) adjustments: Dict[str, Dict[str, Any]] = {} + # Normalize boolean-like parameters explicitly to avoid inconsistent types + _bool_keys = [ + "check_invariants", + "hold_potential_enabled", + "entry_additive_enabled", + "exit_additive_enabled", + ] + for bkey in _bool_keys: + if bkey in sanitized: + original_val = sanitized[bkey] + coerced = _to_bool(original_val) + if coerced is not original_val: + sanitized[bkey] = coerced + adjustments.setdefault( + bkey, + { + "original": original_val, + "adjusted": coerced, + "reason": "bool_coerce", + }, + ) for key, bounds in _PARAMETER_BOUNDS.items(): if key not in sanitized: continue @@ -272,10 +376,10 @@ def _normalize_and_validate_mode(params: RewardParams) -> None: - If the key is absent or value is ``None``: leave untouched (upstream defaults will inject 'linear'). """ - exit_attenuation_mode = params.get("exit_attenuation_mode") - if exit_attenuation_mode is None: + if "exit_attenuation_mode" not in params: return - exit_attenuation_mode = str(exit_attenuation_mode) + + exit_attenuation_mode = _get_str_param(params, "exit_attenuation_mode", "linear") if exit_attenuation_mode not in ALLOWED_EXIT_MODES: params["exit_attenuation_mode"] = "linear" @@ -312,6 +416,41 @@ def add_tunable_cli_args(parser: argparse.ArgumentParser) -> None: default=None, help=help_text, ) + elif key == "exit_potential_mode": + parser.add_argument( + f"--{key}", + type=str, + choices=sorted(ALLOWED_EXIT_POTENTIAL_MODES), + default=None, + help=help_text, + ) + elif key in [ + "hold_potential_transform_pnl", + "hold_potential_transform_duration", + "entry_additive_transform_pnl", + "entry_additive_transform_duration", + "exit_additive_transform_pnl", + "exit_additive_transform_duration", + ]: + parser.add_argument( + f"--{key}", + type=str, + choices=sorted(ALLOWED_TRANSFORMS), + default=None, + help=help_text, + ) + elif key in [ + "hold_potential_enabled", + "entry_additive_enabled", + "exit_additive_enabled", + ]: + parser.add_argument( + f"--{key}", + type=int, + choices=[0, 1], + default=None, + help=help_text, + ) else: # Map numerics to float; leave strings as str if isinstance(default, (int, float)): @@ -339,8 +478,14 @@ class RewardBreakdown: total: float = 0.0 invalid_penalty: float = 0.0 idle_penalty: float = 0.0 - holding_penalty: float = 0.0 + hold_penalty: float = 0.0 exit_component: float = 0.0 + # PBRS components + shaping_reward: float = 0.0 + entry_additive: float = 0.0 + exit_additive: float = 0.0 + current_potential: float = 0.0 + next_potential: float = 0.0 def _get_exit_factor( @@ -357,7 +502,7 @@ def _get_exit_factor( Assumptions: - ``_normalize_and_validate_mode`` has already run (invalid modes replaced by 'linear'). - ``exit_attenuation_mode`` is therefore either a member of ``ALLOWED_EXIT_MODES`` or 'linear'. - - All numeric tunables are accessed through ``_get_param_float`` for safety. + - All numeric tunables are accessed through ``_get_float_param`` for safety. Algorithm steps: 1. Finiteness & non-negative guard on inputs. @@ -372,19 +517,19 @@ def _get_exit_factor( or not np.isfinite(pnl) or not np.isfinite(duration_ratio) ): - return 0.0 + return _fail_safely("non_finite_exit_factor_inputs") # Guard: duration ratio should never be negative if duration_ratio < 0.0: duration_ratio = 0.0 - exit_attenuation_mode = str(params.get("exit_attenuation_mode", "linear")) - exit_plateau = _to_bool(params.get("exit_plateau", True)) + exit_attenuation_mode = _get_str_param(params, "exit_attenuation_mode", "linear") + exit_plateau = _get_bool_param(params, "exit_plateau", True) - exit_plateau_grace = _get_param_float(params, "exit_plateau_grace", 1.0) + exit_plateau_grace = _get_float_param(params, "exit_plateau_grace", 1.0) if exit_plateau_grace < 0.0: exit_plateau_grace = 1.0 - exit_linear_slope = _get_param_float(params, "exit_linear_slope", 1.0) + exit_linear_slope = _get_float_param(params, "exit_linear_slope", 1.0) if exit_linear_slope < 0.0: exit_linear_slope = 1.0 @@ -398,7 +543,7 @@ def _get_exit_factor( return f / (1.0 + exit_linear_slope * dr) def _power_kernel(f: float, dr: float) -> float: - tau = _get_param_float( + tau = _get_float_param( params, "exit_power_tau", DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_power_tau", 0.5), @@ -410,7 +555,7 @@ def _get_exit_factor( return f / math.pow(1.0 + dr, alpha) def _half_life_kernel(f: float, dr: float) -> float: - hl = _get_param_float(params, "exit_half_life", 0.5) + hl = _get_float_param(params, "exit_half_life", 0.5) if hl <= 0.0: hl = 0.5 return f * math.pow(2.0, -dr / hl) @@ -448,13 +593,13 @@ def _get_exit_factor( base_factor *= pnl_factor # Invariant & safety checks - if _to_bool(params.get("check_invariants", True)): + if _get_bool_param(params, "check_invariants", True): if not np.isfinite(base_factor): - return 0.0 + return _fail_safely("non_finite_exit_factor_after_kernel") if base_factor < 0.0 and pnl >= 0.0: # Clamp: avoid negative amplification on non-negative pnl base_factor = 0.0 - exit_factor_threshold = _get_param_float( + exit_factor_threshold = _get_float_param( params, "exit_factor_threshold", DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_factor_threshold", 10000.0), @@ -479,16 +624,16 @@ def _get_pnl_factor( pnl = context.pnl if not np.isfinite(pnl) or not np.isfinite(profit_target): - return 0.0 + return _fail_safely("non_finite_pnl_or_target") profit_target_factor = 1.0 if profit_target > 0.0 and pnl > profit_target: - win_reward_factor = _get_param_float( + win_reward_factor = _get_float_param( params, "win_reward_factor", DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0), ) - pnl_factor_beta = _get_param_float( + pnl_factor_beta = _get_float_param( params, "pnl_factor_beta", DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5), @@ -499,12 +644,12 @@ def _get_pnl_factor( ) efficiency_factor = 1.0 - efficiency_weight = _get_param_float( + efficiency_weight = _get_float_param( params, "efficiency_weight", DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_weight", 1.0), ) - efficiency_center = _get_param_float( + efficiency_center = _get_float_param( params, "efficiency_center", DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_center", 0.5), @@ -550,12 +695,12 @@ def _idle_penalty( context: RewardContext, idle_factor: float, params: RewardParams ) -> float: """Mirror the environment's idle penalty behaviour.""" - idle_penalty_scale = _get_param_float( + idle_penalty_scale = _get_float_param( params, "idle_penalty_scale", DEFAULT_MODEL_REWARD_PARAMETERS.get("idle_penalty_scale", 0.5), ) - idle_penalty_power = _get_param_float( + idle_penalty_power = _get_float_param( params, "idle_penalty_power", DEFAULT_MODEL_REWARD_PARAMETERS.get("idle_penalty_power", 1.025), @@ -571,42 +716,44 @@ def _idle_penalty( max_idle_duration_candles = params.get("max_idle_duration_candles") if max_idle_duration_candles is None: - max_idle_duration = 2 * max_trade_duration_candles + max_idle_duration = ( + DEFAULT_IDLE_DURATION_MULTIPLIER * max_trade_duration_candles + ) else: try: max_idle_duration = int(max_idle_duration_candles) except (TypeError, ValueError): - max_idle_duration = 2 * max_trade_duration_candles + max_idle_duration = ( + DEFAULT_IDLE_DURATION_MULTIPLIER * max_trade_duration_candles + ) idle_duration_ratio = context.idle_duration / max(1, max_idle_duration) return -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power -def _holding_penalty( - context: RewardContext, holding_factor: float, params: RewardParams +def _hold_penalty( + context: RewardContext, hold_factor: float, params: RewardParams ) -> float: - """Mirror the environment's holding penalty behaviour.""" - holding_penalty_scale = _get_param_float( + """Mirror the environment's hold penalty behaviour.""" + hold_penalty_scale = _get_float_param( params, - "holding_penalty_scale", - DEFAULT_MODEL_REWARD_PARAMETERS.get("holding_penalty_scale", 0.25), + "hold_penalty_scale", + DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_penalty_scale", 0.25), ) - holding_penalty_power = _get_param_float( + hold_penalty_power = _get_float_param( params, - "holding_penalty_power", - DEFAULT_MODEL_REWARD_PARAMETERS.get("holding_penalty_power", 1.025), + "hold_penalty_power", + DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_penalty_power", 1.025), ) duration_ratio = _compute_duration_ratio( context.trade_duration, context.max_trade_duration ) if duration_ratio < 1.0: - return 0.0 + return _fail_safely("hold_penalty_duration_ratio_lt_1") return ( - -holding_factor - * holding_penalty_scale - * (duration_ratio - 1.0) ** holding_penalty_power + -hold_factor * hold_penalty_scale * (duration_ratio - 1.0) ** hold_penalty_power ) @@ -635,6 +782,7 @@ def calculate_reward( *, short_allowed: bool, action_masking: bool, + previous_potential: float = 0.0, ) -> RewardBreakdown: breakdown = RewardBreakdown() @@ -644,63 +792,106 @@ def calculate_reward( short_allowed=short_allowed, ) if not is_valid and not action_masking: - breakdown.invalid_penalty = _get_param_float(params, "invalid_action", -2.0) + breakdown.invalid_penalty = _get_float_param(params, "invalid_action", -2.0) breakdown.total = breakdown.invalid_penalty return breakdown - factor = _get_param_float(params, "base_factor", base_factor) + factor = _get_float_param(params, "base_factor", base_factor) if "profit_target" in params: - profit_target = _get_param_float(params, "profit_target", float(profit_target)) + profit_target = _get_float_param(params, "profit_target", float(profit_target)) if "risk_reward_ratio" in params: - risk_reward_ratio = _get_param_float( + risk_reward_ratio = _get_float_param( params, "risk_reward_ratio", float(risk_reward_ratio) ) # Scale profit target by risk-reward ratio (reward multiplier) # E.g., profit_target=0.03, RR=2.0 → profit_target_final=0.06 profit_target_final = profit_target * risk_reward_ratio - idle_factor = factor * profit_target_final / 3.0 + idle_factor = factor * profit_target_final / 4.0 pnl_factor = _get_pnl_factor(params, context, profit_target_final) - holding_factor = idle_factor + hold_factor = idle_factor - if context.action == Actions.Neutral and context.position == Positions.Neutral: - breakdown.idle_penalty = _idle_penalty(context, idle_factor, params) - breakdown.total = breakdown.idle_penalty - return breakdown + # Base reward calculation (existing logic) + base_reward = 0.0 - if ( + if context.action == Actions.Neutral and context.position == Positions.Neutral: + base_reward = _idle_penalty(context, idle_factor, params) + breakdown.idle_penalty = base_reward + elif ( context.position in (Positions.Long, Positions.Short) and context.action == Actions.Neutral ): - breakdown.holding_penalty = _holding_penalty(context, holding_factor, params) - breakdown.total = breakdown.holding_penalty - return breakdown + base_reward = _hold_penalty(context, hold_factor, params) + breakdown.hold_penalty = base_reward + elif context.action == Actions.Long_exit and context.position == Positions.Long: + base_reward = _compute_exit_reward(factor, pnl_factor, context, params) + breakdown.exit_component = base_reward + elif context.action == Actions.Short_exit and context.position == Positions.Short: + base_reward = _compute_exit_reward(factor, pnl_factor, context, params) + breakdown.exit_component = base_reward + else: + base_reward = 0.0 + + # === PBRS INTEGRATION === + # Determine state transitions for PBRS + current_pnl = context.pnl if context.position != Positions.Neutral else 0.0 + current_duration_ratio = ( + context.trade_duration / context.max_trade_duration + if context.position != Positions.Neutral and context.max_trade_duration > 0 + else 0.0 + ) - if context.action == Actions.Long_exit and context.position == Positions.Long: - exit_reward = _compute_exit_reward( - factor, - pnl_factor, - context, - params, + # Simulate next state for PBRS calculation + is_terminal = context.action in (Actions.Long_exit, Actions.Short_exit) + + # For terminal transitions, next state is neutral (PnL=0, duration=0) + if is_terminal: + next_pnl = 0.0 + next_duration_ratio = 0.0 + else: + # For non-terminal, use current values (simplified simulation) + next_pnl = current_pnl + next_duration_ratio = current_duration_ratio + + # Apply PBRS if any PBRS parameters are enabled + pbrs_enabled = ( + _get_bool_param(params, "hold_potential_enabled", True) + or _get_bool_param(params, "entry_additive_enabled", False) + or _get_bool_param(params, "exit_additive_enabled", False) + ) + + if pbrs_enabled: + total_reward, shaping_reward, next_potential = apply_potential_shaping( + base_reward=base_reward, + current_pnl=current_pnl, + current_duration_ratio=current_duration_ratio, + next_pnl=next_pnl, + next_duration_ratio=next_duration_ratio, + is_terminal=is_terminal, + last_potential=previous_potential, + params=params, ) - breakdown.exit_component = exit_reward - breakdown.total = exit_reward - return breakdown - if context.action == Actions.Short_exit and context.position == Positions.Short: - exit_reward = _compute_exit_reward( - factor, - pnl_factor, - context, - params, + # Update breakdown with PBRS components + breakdown.shaping_reward = shaping_reward + breakdown.current_potential = _compute_hold_potential( + current_pnl, current_duration_ratio, params ) - breakdown.exit_component = exit_reward - breakdown.total = exit_reward - return breakdown + breakdown.next_potential = next_potential + breakdown.entry_additive = _compute_entry_additive( + current_pnl, current_duration_ratio, params + ) + breakdown.exit_additive = ( + _compute_exit_additive(next_pnl, next_duration_ratio, params) + if is_terminal + else 0.0 + ) + breakdown.total = total_reward + else: + breakdown.total = base_reward - breakdown.total = 0.0 return breakdown @@ -754,7 +945,7 @@ def simulate_samples( ) -> pd.DataFrame: rng = random.Random(seed) short_allowed = _is_short_allowed(trading_mode) - action_masking = _to_bool(params.get("action_masking", True)) + action_masking = _get_bool_param(params, "action_masking", True) samples: list[Dict[str, float]] = [] for _ in range(num_samples): if short_allowed: @@ -851,8 +1042,14 @@ def simulate_samples( "reward_total": breakdown.total, "reward_invalid": breakdown.invalid_penalty, "reward_idle": breakdown.idle_penalty, - "reward_holding": breakdown.holding_penalty, + "reward_hold": breakdown.hold_penalty, "reward_exit": breakdown.exit_component, + # PBRS components + "reward_shaping": breakdown.shaping_reward, + "reward_entry_additive": breakdown.entry_additive, + "reward_exit_additive": breakdown.exit_additive, + "current_potential": breakdown.current_potential, + "next_potential": breakdown.next_potential, "is_invalid": float(breakdown.invalid_penalty != 0.0), } ) @@ -945,13 +1142,21 @@ def _compute_summary_stats(df: pd.DataFrame) -> Dict[str, Any]: ["count", "mean", "std", "min", "max"] ) component_share = df[ - ["reward_invalid", "reward_idle", "reward_holding", "reward_exit"] + [ + "reward_invalid", + "reward_idle", + "reward_hold", + "reward_exit", + "reward_shaping", + "reward_entry_additive", + "reward_exit_additive", + ] ].apply(lambda col: (col != 0).mean()) components = [ "reward_invalid", "reward_idle", - "reward_holding", + "reward_hold", "reward_exit", "reward_total", ] @@ -1019,18 +1224,18 @@ def _compute_relationship_stats( pnl_bins = np.linspace(pnl_min, pnl_max, 13) idle_stats = _binned_stats(df, "idle_duration", "reward_idle", idle_bins) - holding_stats = _binned_stats(df, "trade_duration", "reward_holding", trade_bins) + hold_stats = _binned_stats(df, "trade_duration", "reward_hold", trade_bins) exit_stats = _binned_stats(df, "pnl", "reward_exit", pnl_bins) idle_stats = idle_stats.round(6) - holding_stats = holding_stats.round(6) + hold_stats = hold_stats.round(6) exit_stats = exit_stats.round(6) correlation_fields = [ "reward_total", "reward_invalid", "reward_idle", - "reward_holding", + "reward_hold", "reward_exit", "pnl", "trade_duration", @@ -1040,7 +1245,7 @@ def _compute_relationship_stats( return { "idle_stats": idle_stats, - "holding_stats": holding_stats, + "hold_stats": hold_stats, "exit_stats": exit_stats, "correlation": correlation, } @@ -1074,7 +1279,7 @@ def _compute_representativity_stats( duration_overage_share = float((df["duration_ratio"] > 1.0).mean()) idle_activated = float((df["reward_idle"] != 0).mean()) - holding_activated = float((df["reward_holding"] != 0).mean()) + hold_activated = float((df["reward_hold"] != 0).mean()) exit_activated = float((df["reward_exit"] != 0).mean()) return { @@ -1086,7 +1291,7 @@ def _compute_representativity_stats( "pnl_extreme": pnl_extreme, "duration_overage_share": duration_overage_share, "idle_activated": idle_activated, - "holding_activated": holding_activated, + "hold_activated": hold_activated, "exit_activated": exit_activated, } @@ -1288,7 +1493,7 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr numeric_optional = { "reward_exit", "reward_idle", - "reward_holding", + "reward_hold", "reward_invalid", "duration_ratio", "idle_ratio", @@ -1858,7 +2063,7 @@ def build_argument_parser() -> argparse.ArgumentParser: nargs="*", default=[], metavar="KEY=VALUE", - help="Override reward parameters, e.g. holding_penalty_scale=0.5", + help="Override reward parameters, e.g. hold_penalty_scale=0.5", ) # Dynamically add CLI options for all tunables add_tunable_cli_args(parser) @@ -1965,7 +2170,7 @@ def write_complete_statistical_analysis( metrics_for_ci = [ "reward_total", "reward_idle", - "reward_holding", + "reward_hold", "reward_exit", "pnl", ] @@ -2060,7 +2265,7 @@ def write_complete_statistical_analysis( f"| Duration overage (>1.0) | {representativity_stats['duration_overage_share']:.1%} |\n" ) f.write( - f"| Extreme PnL (|pnl|≥0.14) | {representativity_stats['pnl_extreme']:.1%} |\n" + f"| Extreme PnL (\\|pnl\\|≥0.14) | {representativity_stats['pnl_extreme']:.1%} |\n" ) f.write("\n") @@ -2068,9 +2273,7 @@ def write_complete_statistical_analysis( f.write("| Component | Activation Rate |\n") f.write("|-----------|----------------|\n") f.write(f"| Idle penalty | {representativity_stats['idle_activated']:.1%} |\n") - f.write( - f"| Holding penalty | {representativity_stats['holding_activated']:.1%} |\n" - ) + f.write(f"| Hold penalty | {representativity_stats['hold_activated']:.1%} |\n") f.write(f"| Exit reward | {representativity_stats['exit_activated']:.1%} |\n") f.write("\n") @@ -2090,14 +2293,14 @@ def write_complete_statistical_analysis( idle_df.index.name = "bin" f.write(_df_to_md(idle_df, index_name=idle_df.index.name, ndigits=6)) - f.write("### 3.2 Holding Penalty vs Trade Duration\n\n") - if relationship_stats["holding_stats"].empty: - f.write("_No holding samples present._\n\n") + f.write("### 3.2 Hold Penalty vs Trade Duration\n\n") + if relationship_stats["hold_stats"].empty: + f.write("_No hold samples present._\n\n") else: - holding_df = relationship_stats["holding_stats"].copy() - if holding_df.index.name is None: - holding_df.index.name = "bin" - f.write(_df_to_md(holding_df, index_name=holding_df.index.name, ndigits=6)) + hold_df = relationship_stats["hold_stats"].copy() + if hold_df.index.name is None: + hold_df.index.name = "bin" + f.write(_df_to_md(hold_df, index_name=hold_df.index.name, ndigits=6)) f.write("### 3.3 Exit Reward vs PnL\n\n") if relationship_stats["exit_stats"].empty: @@ -2115,6 +2318,62 @@ def write_complete_statistical_analysis( corr_df.index.name = "feature" f.write(_df_to_md(corr_df, index_name=corr_df.index.name, ndigits=4)) + # Section 3.5: PBRS Analysis + f.write("### 3.5 PBRS (Potential-Based Reward Shaping) Analysis\n\n") + + # Check if PBRS components are present in the data + pbrs_components = [ + "reward_shaping", + "reward_entry_additive", + "reward_exit_additive", + ] + pbrs_present = all(col in df.columns for col in pbrs_components) + + if pbrs_present: + # PBRS activation rates + pbrs_activation = {} + for comp in pbrs_components: + pbrs_activation[comp.replace("reward_", "")] = (df[comp] != 0).mean() + + f.write("**PBRS Component Activation Rates:**\n\n") + f.write("| Component | Activation Rate | Description |\n") + f.write("|-----------|-----------------|-------------|\n") + f.write( + f"| Shaping (Φ) | {pbrs_activation['shaping']:.1%} | Potential-based reward shaping |\n" + ) + f.write( + f"| Entry Additive | {pbrs_activation['entry_additive']:.1%} | Non-PBRS entry reward |\n" + ) + f.write( + f"| Exit Additive | {pbrs_activation['exit_additive']:.1%} | Non-PBRS exit reward |\n" + ) + f.write("\n") + + # PBRS statistics + f.write("**PBRS Component Statistics:**\n\n") + pbrs_stats = df[pbrs_components].describe( + percentiles=[0.1, 0.25, 0.5, 0.75, 0.9] + ) + pbrs_stats_df = pbrs_stats.round( + 6 + ).T # Transpose to make it DataFrame-compatible + pbrs_stats_df.index.name = "component" + f.write(_df_to_md(pbrs_stats_df, index_name="component", ndigits=6)) + + # PBRS invariance check (canonical mode) + total_shaping = df["reward_shaping"].sum() + if abs(total_shaping) < 1e-6: + f.write( + "✅ **PBRS Invariance:** Total shaping reward ≈ 0 (canonical mode preserved)\n\n" + ) + else: + f.write( + f"❌ **PBRS Invariance:** Total shaping reward = {total_shaping:.6f} (non-canonical behavior)\n\n" + ) + + else: + f.write("_PBRS components not present in this analysis._\n\n") + # Section 4: Feature Importance Analysis f.write("---\n\n") f.write("## 4. Feature Importance\n\n") @@ -2157,7 +2416,7 @@ def write_complete_statistical_analysis( f.write(f"- p-value: {h['p_value']:.4g}\n") if "p_value_adj" in h: f.write( - f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅' if h['significant_adj'] else '❌'} (α=0.05)\n" + f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" ) f.write(f"- 95% CI: [{h['ci_95'][0]:.4f}, {h['ci_95'][1]:.4f}]\n") f.write(f"- Sample size: {h['n_samples']:,}\n") @@ -2174,7 +2433,7 @@ def write_complete_statistical_analysis( f.write(f"- p-value: {h['p_value']:.4g}\n") if "p_value_adj" in h: f.write( - f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅' if h['significant_adj'] else '❌'} (α=0.05)\n" + f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" ) f.write(f"- Effect size (ε²): {h['effect_size_epsilon_sq']:.4f}\n") f.write(f"- Number of groups: {h['n_groups']}\n") @@ -2185,13 +2444,13 @@ def write_complete_statistical_analysis( if "pnl_sign_reward_difference" in hypothesis_tests: h = hypothesis_tests["pnl_sign_reward_difference"] - f.write("#### 5.1.4 Positive vs Negative PnL Comparison\n\n") + f.write("#### 5.1.3 Positive vs Negative PnL Comparison\n\n") f.write(f"**Test Method:** {h['test']}\n\n") f.write(f"- U-statistic: **{h['statistic']:.4f}**\n") f.write(f"- p-value: {h['p_value']:.4g}\n") if "p_value_adj" in h: f.write( - f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅' if h['significant_adj'] else '❌'} (α=0.05)\n" + f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" ) f.write(f"- Median (PnL+): {h['median_pnl_positive']:.4f}\n") f.write(f"- Median (PnL-): {h['median_pnl_negative']:.4f}\n") @@ -2276,9 +2535,11 @@ def write_complete_statistical_analysis( f.write("**Interpretation Guide:**\n\n") f.write("| Metric | Threshold | Meaning |\n") f.write("|--------|-----------|--------|\n") - f.write("| KL Divergence | < 0.3 | ✅ Good representativeness |\n") - f.write("| JS Distance | < 0.2 | ✅ Similar distributions |\n") - f.write("| KS p-value | > 0.05 | ✅ No significant difference |\n\n") + f.write("| KL Divergence | < 0.3 | ✅ Yes: Good representativeness |\n") + f.write("| JS Distance | < 0.2 | ✅ Yes: Similar distributions |\n") + f.write( + "| KS p-value | > 0.05 | ✅ Yes: No significant difference |\n\n" + ) # Footer f.write("---\n\n") @@ -2291,7 +2552,7 @@ def write_complete_statistical_analysis( "2. **Sample Representativity** - Coverage of critical market scenarios\n" ) f.write( - "3. **Component Analysis** - Relationships between rewards and conditions\n" + "3. **Component Analysis** - Relationships between rewards and conditions (including PBRS)\n" ) f.write( "4. **Feature Importance** - Machine learning analysis of key drivers\n" @@ -2326,12 +2587,19 @@ def main() -> None: # Early parameter validation (moved before simulation for alignment with docs) params_validated, adjustments = validate_reward_parameters(params) params = params_validated + if adjustments: + # Compact adjustments summary (param: original->adjusted [reason]) + adj_lines = [ + f" - {k}: {v['original']} -> {v['adjusted']} ({v['reason']})" + for k, v in adjustments.items() + ] + print("Parameter adjustments applied:\n" + "\n".join(adj_lines)) # Normalize attenuation mode _normalize_and_validate_mode(params) - base_factor = _get_param_float(params, "base_factor", float(args.base_factor)) - profit_target = _get_param_float(params, "profit_target", float(args.profit_target)) - risk_reward_ratio = _get_param_float( + base_factor = _get_float_param(params, "base_factor", float(args.base_factor)) + profit_target = _get_float_param(params, "profit_target", float(args.profit_target)) + risk_reward_ratio = _get_float_param( params, "risk_reward_ratio", float(args.risk_reward_ratio) ) @@ -2465,5 +2733,470 @@ def main() -> None: print(f"Artifacts saved to: {args.output.resolve()}") +# === PBRS TRANSFORM FUNCTIONS === + + +def _apply_transform_tanh(value: float, scale: float = 1.0) -> float: + """tanh(scale*value) ∈ (-1,1).""" + return float(np.tanh(scale * value)) + + +def _apply_transform_softsign(value: float, scale: float = 1.0) -> float: + """softsign: x/(1+|x|) with x=scale*value.""" + x = scale * value + return float(x / (1.0 + abs(x))) + + +def _apply_transform_softsign_sharp( + value: float, scale: float = 1.0, sharpness: float = 1.0 +) -> float: + """softsign_sharp: x/(sharpness+|x|) with x=scale*value (smaller sharpness = steeper).""" + x = scale * value + return float(x / (sharpness + abs(x))) + + +def _apply_transform_arctan(value: float, scale: float = 1.0) -> float: + """arctan normalized: (2/pi)*atan(scale*value) ∈ (-1,1).""" + x = scale * value + return float((2.0 / math.pi) * math.atan(x)) + + +def _apply_transform_logistic(value: float, scale: float = 1.0) -> float: + """Overflow‑safe logistic transform mapped to (-1,1): 2σ(kx)−1 where k=scale.""" + x = scale * value + try: + if x >= 0: + z = math.exp(-x) # z in (0,1] + return float((1.0 - z) / (1.0 + z)) + else: + z = math.exp(x) # z in (0,1] + return float((z - 1.0) / (z + 1.0)) + except OverflowError: + return 1.0 if x > 0 else -1.0 + + +def _apply_transform_asinh_norm(value: float, scale: float = 1.0) -> float: + """Normalized asinh: x / sqrt(1 + x²) producing range (-1,1).""" + scaled = scale * value + return float(scaled / math.hypot(1.0, scaled)) + + +def _apply_transform_clip(value: float, scale: float = 1.0) -> float: + """clip(scale*value) to [-1,1].""" + return float(np.clip(scale * value, -1.0, 1.0)) + + +def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float: + """Apply named transform; unknown names fallback to tanh with warning.""" + transforms = { + "tanh": _apply_transform_tanh, + "softsign": _apply_transform_softsign, + "softsign_sharp": _apply_transform_softsign_sharp, + "arctan": _apply_transform_arctan, + "logistic": _apply_transform_logistic, + "asinh_norm": _apply_transform_asinh_norm, + "clip": _apply_transform_clip, + } + + if transform_name not in transforms: + warnings.warn( + f"Unknown potential transform '{transform_name}'; falling back to tanh", + category=UserWarning, + stacklevel=2, + ) + return _apply_transform_tanh(value, **kwargs) + + return transforms[transform_name](value, **kwargs) + + +# === PBRS HELPER FUNCTIONS === + + +def _get_potential_gamma(params: RewardParams) -> float: + """Return potential_gamma with fallback (missing/invalid -> 0.95 + warning).""" + value = params.get("potential_gamma", None) + + if value is None: + warnings.warn( + "potential_gamma not found in config, using default value of 0.95. " + "This parameter controls the discount factor for PBRS potential shaping.", + category=UserWarning, + stacklevel=2, + ) + return 0.95 + + if isinstance(value, (int, float)): + return float(value) + + warnings.warn( + f"Invalid potential_gamma value: {value}. Using default 0.95. " + "Expected numeric value in [0, 1].", + category=UserWarning, + stacklevel=2, + ) + return 0.95 + + +def _get_str_param(params: RewardParams, key: str, default: str) -> str: + """Extract string parameter with type safety.""" + value = params.get(key, default) + if isinstance(value, str): + return value + return default + + +def _get_bool_param(params: RewardParams, key: str, default: bool) -> bool: + """Extract boolean parameter with type safety.""" + value = params.get(key, default) + try: + return _to_bool(value) + except Exception: + return bool(default) + + +# === PBRS IMPLEMENTATION === + + +def _compute_hold_potential( + pnl: float, duration_ratio: float, params: RewardParams +) -> float: + """ + Compute PBRS hold potential: Φ(s) = scale * 0.5 * [T_pnl(g*pnl_ratio) + T_dur(g*duration_ratio)]. + + This implements the canonical PBRS potential function from Ng et al. (1999): + R'(s,a,s') = R_base(s,a,s') + γΦ(s') - Φ(s) + + Args: + pnl: Current profit/loss ratio + duration_ratio: Current duration as fraction of max_trade_duration + params: Reward parameters containing PBRS configuration + + Returns: + Potential value Φ(s) + """ + if not _get_bool_param(params, "hold_potential_enabled", True): + return _fail_safely("hold_potential_disabled") + + scale = _get_float_param(params, "hold_potential_scale", 1.0) + gain = _get_float_param(params, "hold_potential_gain", 1.0) + transform_pnl = _get_str_param(params, "hold_potential_transform_pnl", "tanh") + transform_duration = _get_str_param( + params, "hold_potential_transform_duration", "tanh" + ) + sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0) + + # Apply transforms + if transform_pnl == "softsign_sharp": + t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness) + else: + t_pnl = apply_transform(transform_pnl, gain * pnl) + + if transform_duration == "softsign_sharp": + t_dur = apply_transform( + transform_duration, gain * duration_ratio, sharpness=sharpness + ) + else: + t_dur = apply_transform(transform_duration, gain * duration_ratio) + + potential = scale * 0.5 * (t_pnl + t_dur) + + # Validate numerical safety + if not np.isfinite(potential): + return _fail_safely("non_finite_hold_potential") + + return float(potential) + + +def _compute_entry_additive( + pnl: float, duration_ratio: float, params: RewardParams +) -> float: + """ + Compute entry additive reward (non-PBRS component). + + Args: + pnl: Current profit/loss ratio + duration_ratio: Current duration as fraction of max_trade_duration + params: Reward parameters + + Returns: + Entry additive reward + """ + if not _get_bool_param(params, "entry_additive_enabled", False): + return _fail_safely("entry_additive_disabled") + + scale = _get_float_param(params, "entry_additive_scale", 1.0) + gain = _get_float_param(params, "entry_additive_gain", 1.0) + transform_pnl = _get_str_param(params, "entry_additive_transform_pnl", "tanh") + transform_duration = _get_str_param( + params, "entry_additive_transform_duration", "tanh" + ) + sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0) + + # Apply transforms + if transform_pnl == "softsign_sharp": + t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness) + else: + t_pnl = apply_transform(transform_pnl, gain * pnl) + + if transform_duration == "softsign_sharp": + t_dur = apply_transform( + transform_duration, gain * duration_ratio, sharpness=sharpness + ) + else: + t_dur = apply_transform(transform_duration, gain * duration_ratio) + + additive = scale * 0.5 * (t_pnl + t_dur) + + # Validate numerical safety + if not np.isfinite(additive): + return _fail_safely("non_finite_entry_additive") + + return float(additive) + + +def _compute_exit_additive( + pnl: float, duration_ratio: float, params: RewardParams +) -> float: + """ + Compute exit additive reward (non-PBRS component). + + Args: + pnl: Final profit/loss ratio at exit + duration_ratio: Final duration as fraction of max_trade_duration + params: Reward parameters + + Returns: + Exit additive reward + """ + if not _get_bool_param(params, "exit_additive_enabled", False): + return _fail_safely("exit_additive_disabled") + + scale = _get_float_param(params, "exit_additive_scale", 1.0) + gain = _get_float_param(params, "exit_additive_gain", 1.0) + transform_pnl = _get_str_param(params, "exit_additive_transform_pnl", "tanh") + transform_duration = _get_str_param( + params, "exit_additive_transform_duration", "tanh" + ) + sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0) + + # Apply transforms + if transform_pnl == "softsign_sharp": + t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness) + else: + t_pnl = apply_transform(transform_pnl, gain * pnl) + + if transform_duration == "softsign_sharp": + t_dur = apply_transform( + transform_duration, gain * duration_ratio, sharpness=sharpness + ) + else: + t_dur = apply_transform(transform_duration, gain * duration_ratio) + + additive = scale * 0.5 * (t_pnl + t_dur) + + # Validate numerical safety + if not np.isfinite(additive): + return _fail_safely("non_finite_exit_additive") + + return float(additive) + + +def _compute_exit_potential( + pnl: float, duration_ratio: float, params: RewardParams, last_potential: float = 0.0 +) -> float: + """Compute next potential Φ(s') for closing/exit transitions. + + Mirrors the original environment semantics: + - canonical: Φ' = 0.0 + - progressive_release: Φ' = Φ * (1 - decay) with decay clamped to [0,1] + - spike_cancel: Φ' = Φ / γ (neutralizes shaping spike ≈ 0 net effect) if γ>0 else Φ + - retain_previous: Φ' = Φ + Invalid modes fall back to canonical. + Any non-finite resulting potential is coerced to 0.0. + """ + mode = _get_str_param(params, "exit_potential_mode", "canonical") + if mode == "canonical": + return _fail_safely("canonical_exit_potential") + + if mode == "progressive_release": + decay = _get_float_param(params, "exit_potential_decay", 0.5) + if not np.isfinite(decay) or decay < 0.0: + decay = 0.5 + if decay > 1.0: + decay = 1.0 + next_potential = last_potential * (1.0 - decay) + elif mode == "spike_cancel": + gamma = _get_potential_gamma(params) + if gamma > 0.0 and np.isfinite(gamma): + next_potential = last_potential / gamma + else: + next_potential = last_potential + elif mode == "retain_previous": + next_potential = last_potential + else: + next_potential = _fail_safely("invalid_exit_potential_mode") + + if not np.isfinite(next_potential): + next_potential = _fail_safely("non_finite_next_exit_potential") + return float(next_potential) + + +def apply_potential_shaping( + base_reward: float, + current_pnl: float, + current_duration_ratio: float, + next_pnl: float, + next_duration_ratio: float, + is_terminal: bool, + last_potential: float, + params: RewardParams, +) -> tuple[float, float, float]: + """ + Apply PBRS potential-based reward shaping following Ng et al. (1999). + + Implements: R'(s,a,s') = R_base(s,a,s') + γΦ(s') - Φ(s) + + This function computes the complete PBRS transformation including: + - Hold potential: Φ(s) based on current state features + - Closing potential: Φ(s') with mode-specific terminal handling + - Entry/exit potentials: Non-PBRS additive components + - Gamma discounting: Standard RL discount factor + - Invariance guarantees: Optimal policy preservation + + Theory: + PBRS maintains optimal policy invariance by ensuring that the potential-based + shaping reward is the difference of a potential function. Terminal states + require special handling to preserve this property. + + Args: + base_reward: Base environment reward R_base(s,a,s') + current_pnl: Current state PnL ratio + current_duration_ratio: Current state duration ratio + next_pnl: Next state PnL ratio + next_duration_ratio: Next state duration ratio + is_terminal: Whether next state is terminal + last_potential: Previous potential for closing mode calculations + params: Reward parameters containing PBRS configuration + + Returns: + tuple[total_reward, shaping_reward, next_potential]: + - total_reward: R_base + R_shaping + additives + - shaping_reward: Pure PBRS component γΦ(s') - Φ(s) + - next_potential: Φ(s') for next iteration + + Raises: + ValueError: If gamma is invalid or numerical issues detected + """ + # Enforce PBRS invariance (auto-disable additives in canonical mode) + params = _enforce_pbrs_invariance(params) + + # Validate gamma (with None handling matching original environment) + gamma = _get_potential_gamma(params) + if not (0.0 <= gamma <= 1.0): + raise ValueError(f"Invalid gamma: {gamma}. Must be in [0, 1]") + + # Compute current potential Φ(s) + current_potential = _compute_hold_potential( + current_pnl, current_duration_ratio, params + ) + + # Compute next potential Φ(s') + if is_terminal: + next_potential = _compute_exit_potential( + next_pnl, next_duration_ratio, params, last_potential + ) + else: + next_potential = _compute_hold_potential(next_pnl, next_duration_ratio, params) + + # PBRS shaping reward: γΦ(s') - Φ(s) + shaping_reward = gamma * next_potential - current_potential + + # Compute additive components (non-PBRS) + entry_additive = _compute_entry_additive( + current_pnl, current_duration_ratio, params + ) + exit_additive = ( + _compute_exit_additive(next_pnl, next_duration_ratio, params) + if is_terminal + else 0.0 + ) + + # Invariance diagnostic + _log_pbrs_invariance_warning(params) + + # Total reward + total_reward = base_reward + shaping_reward + entry_additive + exit_additive + + # Numerical validation & normalization of tiny shaping + if not np.isfinite(total_reward): + return float(base_reward), 0.0, 0.0 + if np.isclose(shaping_reward, 0.0): + shaping_reward = 0.0 + return float(total_reward), float(shaping_reward), float(next_potential) + + +def _enforce_pbrs_invariance(params: RewardParams) -> RewardParams: + """Enforce PBRS invariance by auto-disabling additives in canonical mode. + + Matches original environment behavior: canonical mode automatically + disables entry/exit additives to preserve theoretical invariance. + + PBRS invariance (Ng et al. 1999) requires: + - canonical exit_potential_mode (terminal Φ=0) + - No path-dependent additive reward components enabled. + + Returns modified params dict with invariance enforced. + """ + mode = _get_str_param(params, "exit_potential_mode", "canonical") + if mode == "canonical": + # Make a copy to avoid mutating input + enforced_params = dict(params) + entry_enabled = _get_bool_param(params, "entry_additive_enabled", False) + exit_enabled = _get_bool_param(params, "exit_additive_enabled", False) + + if entry_enabled: + warnings.warn( + "Disabling entry additive to preserve PBRS invariance (canonical mode).", + category=UserWarning, + stacklevel=2, + ) + enforced_params["entry_additive_enabled"] = False + + if exit_enabled: + warnings.warn( + "Disabling exit additive to preserve PBRS invariance (canonical mode).", + category=UserWarning, + stacklevel=2, + ) + enforced_params["exit_additive_enabled"] = False + + return enforced_params + return params + + +def _log_pbrs_invariance_warning(params: RewardParams) -> None: + """Log an informational message if invariance conditions are violated. + + PBRS invariance (Ng et al. 1999) requires: + - canonical exit_potential_mode (terminal Φ=0) + - No path-dependent additive reward components enabled. + This mirrors original environment diagnostic behavior. + """ + mode = _get_str_param(params, "exit_potential_mode", "canonical") + if mode == "canonical": + if _get_bool_param(params, "entry_additive_enabled", False) or _get_bool_param( + params, "exit_additive_enabled", False + ): + warnings.warn( + ( + "PBRS invariance relaxed: canonical mode with additive components enabled " + f"(entry_additive_enabled={_get_bool_param(params, 'entry_additive_enabled', False)}, " + f"exit_additive_enabled={_get_bool_param(params, 'exit_additive_enabled', False)})" + ), + category=UserWarning, + stacklevel=2, + ) + + if __name__ == "__main__": main() diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py index 7f8d890..e5f2843 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py @@ -18,10 +18,21 @@ import tempfile import unittest import warnings from pathlib import Path +from typing import Iterable, Sequence import numpy as np import pandas as pd +# Central PBRS parameter lists +PBRS_INTEGRATION_PARAMS = [ + "potential_gamma", + "hold_potential_enabled", + "hold_potential_scale", + "entry_additive_enabled", + "exit_additive_enabled", +] +PBRS_REQUIRED_PARAMS = PBRS_INTEGRATION_PARAMS + ["exit_potential_mode"] + # Import functions to test try: from reward_space_analysis import ( @@ -29,9 +40,17 @@ try: Actions, Positions, RewardContext, + _compute_entry_additive, + _compute_exit_additive, + _compute_exit_potential, + _compute_hold_potential, + _get_bool_param, _get_exit_factor, - _get_param_float, + _get_float_param, _get_pnl_factor, + _get_str_param, + apply_potential_shaping, + apply_transform, bootstrap_confidence_intervals, build_argument_parser, calculate_reward, @@ -50,7 +69,13 @@ except ImportError as e: class RewardSpaceTestBase(unittest.TestCase): - """Base class with common test utilities.""" + """Base class with common test utilities. + + Central tolerance policy (avoid per-test commentary): + - Generic numeric equality: 1e-6 + - Component decomposition / identity: 1e-9 + - Monotonic attenuation allowance: +1e-9 drift + """ @classmethod def setUpClass(cls): @@ -77,14 +102,14 @@ class RewardSpaceTestBase(unittest.TestCase): def assertAlmostEqualFloat( self, - first: float, - second: float, + first: float | int, + second: float | int, tolerance: float = 1e-6, msg: str | None = None, ) -> None: """Absolute tolerance compare with explicit failure and finite check.""" - if not (np.isfinite(first) and np.isfinite(second)): - self.fail(msg or f"Non-finite comparison (a={first}, b={second})") + self.assertFinite(first, name="a") + self.assertFinite(second, name="b") diff = abs(first - second) if diff > tolerance: self.fail( @@ -92,6 +117,93 @@ class RewardSpaceTestBase(unittest.TestCase): or f"Difference {diff} exceeds tolerance {tolerance} (a={first}, b={second})" ) + # --- Statistical bounds helpers (factorize redundancy) --- + def assertPValue(self, value: float | int, msg: str = "") -> None: + """Assert a p-value is finite and within [0,1].""" + self.assertFinite(value, name="p-value") + self.assertGreaterEqual(value, 0.0, msg or f"p-value < 0: {value}") + self.assertLessEqual(value, 1.0, msg or f"p-value > 1: {value}") + + def assertDistanceMetric( + self, + value: float | int, + *, + non_negative: bool = True, + upper: float | None = None, + name: str = "metric", + ) -> None: + """Generic distance/divergence bounds: finite, optional non-negativity and optional upper bound.""" + self.assertFinite(value, name=name) + if non_negative: + self.assertGreaterEqual(value, 0.0, f"{name} negative: {value}") + if upper is not None: + self.assertLessEqual(value, upper, f"{name} > {upper}: {value}") + + def assertEffectSize( + self, + value: float | int, + *, + lower: float = -1.0, + upper: float = 1.0, + name: str = "effect size", + ) -> None: + """Assert effect size within symmetric interval and finite.""" + self.assertFinite(value, name=name) + self.assertGreaterEqual(value, lower, f"{name} < {lower}: {value}") + self.assertLessEqual(value, upper, f"{name} > {upper}: {value}") + + def assertFinite(self, value: float | int, name: str = "value") -> None: + """Assert scalar is finite.""" + if not np.isfinite(value): # low-level base check to avoid recursion + self.fail(f"{name} not finite: {value}") + + def assertMonotonic( + self, + seq: Sequence[float | int] | Iterable[float | int], + *, + non_increasing: bool | None = None, + non_decreasing: bool | None = None, + tolerance: float = 0.0, + name: str = "sequence", + ) -> None: + """Assert a sequence is monotonic under specified direction. + + Provide exactly one of non_increasing/non_decreasing=True. + tolerance allows tiny positive drift in expected monotone direction. + """ + data = list(seq) + if len(data) < 2: + return + if (non_increasing and non_decreasing) or ( + not non_increasing and not non_decreasing + ): + self.fail("Specify exactly one monotonic direction") + for a, b in zip(data, data[1:]): + if non_increasing: + if b > a + tolerance: + self.fail(f"{name} not non-increasing at pair ({a}, {b})") + elif non_decreasing: + if b + tolerance < a: + self.fail(f"{name} not non-decreasing at pair ({a}, {b})") + + def assertWithin( + self, + value: float | int, + low: float | int, + high: float | int, + *, + name: str = "value", + inclusive: bool = True, + ) -> None: + """Assert that value is within [low, high] (inclusive) or (low, high) if inclusive=False.""" + self.assertFinite(value, name=name) + if inclusive: + self.assertGreaterEqual(value, low, f"{name} < {low}") + self.assertLessEqual(value, high, f"{name} > {high}") + else: + self.assertGreater(value, low, f"{name} <= {low}") + self.assertLess(value, high, f"{name} >= {high}") + class TestIntegration(RewardSpaceTestBase): """Integration tests for CLI and file outputs.""" @@ -241,13 +353,19 @@ class TestStatisticalCoherence(RewardSpaceTestBase): # Values should be finite and reasonable for metric_name, value in metrics.items(): if "pnl" in metric_name: - self.assertTrue(np.isfinite(value), f"{metric_name} should be finite") + # All metrics must be finite; selected metrics must be non-negative if any( - suffix in metric_name for suffix in ["js_distance", "ks_statistic"] + suffix in metric_name + for suffix in [ + "js_distance", + "ks_statistic", + "wasserstein", + "kl_divergence", + ] ): - self.assertGreaterEqual( - value, 0, f"{metric_name} should be non-negative" - ) + self.assertDistanceMetric(value, name=metric_name) + else: + self.assertFinite(value, name=metric_name) def test_distribution_shift_identity_null_metrics(self): """Identical distributions should yield (near) zero shift metrics.""" @@ -289,7 +407,7 @@ class TestStatisticalCoherence(RewardSpaceTestBase): "Idle durations should be non-negative", ) - # Idle rewards should generally be negative (penalty for holding) + # Idle rewards should generally be negative (penalty for hold) negative_rewards = (idle_rew < 0).sum() total_rewards = len(idle_rew) negative_ratio = negative_rewards / total_rewards @@ -321,9 +439,7 @@ class TestStatisticalCoherence(RewardSpaceTestBase): for suffix in expected_suffixes: key = f"{prefix}{suffix}" if key in diagnostics: - self.assertTrue( - np.isfinite(diagnostics[key]), f"{key} should be finite" - ) + self.assertFinite(diagnostics[key], name=key) class TestRewardAlignment(RewardSpaceTestBase): @@ -354,7 +470,7 @@ class TestRewardAlignment(RewardSpaceTestBase): # Should return valid breakdown self.assertIsInstance(breakdown.total, (int, float)) - self.assertTrue(np.isfinite(breakdown.total)) + self.assertFinite(breakdown.total, name="breakdown.total") # Exit reward should be positive for profitable trade self.assertGreater( @@ -384,7 +500,7 @@ class TestRewardAlignment(RewardSpaceTestBase): pnl_factor = _get_pnl_factor(params, ctx, profit_target) # Expect no efficiency modulation: factor should be >= 0 and close to 1.0 - self.assertTrue(np.isfinite(pnl_factor)) + self.assertFinite(pnl_factor, name="pnl_factor") self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=1e-6) def test_max_idle_duration_candles_logic(self): @@ -436,6 +552,70 @@ class TestRewardAlignment(RewardSpaceTestBase): f"Expected less severe penalty with larger max_idle_duration_candles (large={breakdown_large.idle_penalty}, small={breakdown_small.idle_penalty})", ) + def test_pbrs_progressive_release_decay_clamped(self): + """progressive_release with decay>1 must clamp to 1 so Φ' = 0 and Δ = -Φ_prev.""" + params = self.DEFAULT_PARAMS.copy() + params.update( + { + "potential_gamma": 0.95, + "exit_potential_mode": "progressive_release", + "exit_potential_decay": 5.0, # should clamp to 1.0 + "hold_potential_enabled": True, + "entry_additive_enabled": False, + "exit_additive_enabled": False, + } + ) + current_pnl = 0.02 + current_dur = 0.5 + prev_potential = _compute_hold_potential(current_pnl, current_dur, params) + total_reward, shaping_reward, next_potential = apply_potential_shaping( + base_reward=0.0, + current_pnl=current_pnl, + current_duration_ratio=current_dur, + next_pnl=0.02, + next_duration_ratio=0.6, + is_terminal=True, + last_potential=prev_potential, + params=params, + ) + self.assertAlmostEqualFloat(next_potential, 0.0, tolerance=1e-9) + self.assertAlmostEqualFloat(shaping_reward, -prev_potential, tolerance=1e-9) + self.assertAlmostEqualFloat(total_reward, shaping_reward, tolerance=1e-9) + + def test_pbrs_spike_cancel_invariance(self): + """spike_cancel terminal shaping should be ≈ 0 (γ*(Φ/γ) - Φ).""" + params = self.DEFAULT_PARAMS.copy() + params.update( + { + "potential_gamma": 0.9, + "exit_potential_mode": "spike_cancel", + "hold_potential_enabled": True, + "entry_additive_enabled": False, + "exit_additive_enabled": False, + } + ) + current_pnl = 0.015 + current_dur = 0.4 + prev_potential = _compute_hold_potential(current_pnl, current_dur, params) + # Use helper accessor to avoid union type issues + gamma = _get_float_param(params, "potential_gamma", 0.95) + expected_next = ( + prev_potential / gamma if gamma not in (0.0, None) else prev_potential + ) + total_reward, shaping_reward, next_potential = apply_potential_shaping( + base_reward=0.0, + current_pnl=current_pnl, + current_duration_ratio=current_dur, + next_pnl=0.016, + next_duration_ratio=0.45, + is_terminal=True, + last_potential=prev_potential, + params=params, + ) + self.assertAlmostEqualFloat(next_potential, expected_next, tolerance=1e-9) + self.assertAlmostEqualFloat(shaping_reward, 0.0, tolerance=1e-9) + self.assertAlmostEqualFloat(total_reward, 0.0, tolerance=1e-9) + def test_idle_penalty_fallback_and_proportionality(self): """Fallback & proportionality validation. @@ -499,7 +679,9 @@ class TestRewardAlignment(RewardSpaceTestBase): msg=f"Idle penalty proportionality mismatch (ratio={ratio})", ) # Additional mid-range inference check (idle_duration between 1x and 2x trade duration) - ctx_mid = dataclasses.replace(ctx_a, idle_duration=120, max_trade_duration=100) + ctx_mid = dataclasses.replace( + ctx_a, idle_duration=120, max_trade_duration=100 + ) # Adjusted context for mid-range check br_mid = calculate_reward( ctx_mid, params, @@ -510,19 +692,19 @@ class TestRewardAlignment(RewardSpaceTestBase): action_masking=True, ) self.assertLess(br_mid.idle_penalty, 0.0) - idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 0.5) - idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.025) + idle_penalty_scale = _get_float_param(params, "idle_penalty_scale", 0.5) + idle_penalty_power = _get_float_param(params, "idle_penalty_power", 1.025) # Internal factor may come from params (overrides provided base_factor argument) - factor = _get_param_float(params, "base_factor", float(base_factor)) - idle_factor = factor * (profit_target * risk_reward_ratio) / 3.0 + factor = _get_float_param(params, "base_factor", float(base_factor)) + idle_factor = factor * (profit_target * risk_reward_ratio) / 4.0 observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale) if observed_ratio > 0: implied_D = 120 / (observed_ratio ** (1 / idle_penalty_power)) self.assertAlmostEqualFloat( implied_D, - 200.0, - tolerance=12.0, # modest tolerance for float ops / rounding - msg=f"Fallback denominator mismatch (implied={implied_D}, expected≈200, factor={factor})", + 400.0, + tolerance=20.0, + msg=f"Fallback denominator mismatch (implied={implied_D}, expected≈400, factor={factor})", ) def test_exit_factor_threshold_warning_non_capping(self): @@ -534,7 +716,7 @@ class TestRewardAlignment(RewardSpaceTestBase): params = self.DEFAULT_PARAMS.copy() # Remove base_factor from params so that the function uses the provided argument (makes scaling observable) params.pop("base_factor", None) - exit_factor_threshold = _get_param_float( + exit_factor_threshold = _get_float_param( params, "exit_factor_threshold", 10_000.0 ) @@ -607,9 +789,7 @@ class TestRewardAlignment(RewardSpaceTestBase): duration_ratio=0.3, params=test_params, ) - self.assertTrue( - np.isfinite(factor), f"Exit factor for {mode} should be finite" - ) + self.assertFinite(factor, name=f"exit_factor[{mode}]") self.assertGreater(factor, 0, f"Exit factor for {mode} should be positive") # Plateau+linear variant sanity check (grace region at 0.5) @@ -807,16 +987,17 @@ class TestRewardAlignment(RewardSpaceTestBase): ratios_observed.append(float(ratio)) # Monotonic non-decreasing (allow tiny float noise) - for a, b in zip(ratios_observed, ratios_observed[1:]): - self.assertGreaterEqual( - b + 1e-12, a, f"Amplification not monotonic: {ratios_observed}" - ) + self.assertMonotonic( + ratios_observed, + non_decreasing=True, + tolerance=1e-12, + name="pnl_amplification_ratio", + ) asymptote = 1.0 + win_reward_factor final_ratio = ratios_observed[-1] # Expect to be very close to asymptote (tanh(0.5*(10-1)) ≈ 0.9997) - if not np.isfinite(final_ratio): - self.fail(f"Final ratio is not finite: {final_ratio}") + self.assertFinite(final_ratio, name="final_ratio") self.assertLess( abs(final_ratio - asymptote), 1e-3, @@ -831,8 +1012,8 @@ class TestRewardAlignment(RewardSpaceTestBase): expected_ratios.append(expected) # Compare each observed to expected within loose tolerance (model parity) for obs, exp in zip(ratios_observed, expected_ratios): - if not (np.isfinite(obs) and np.isfinite(exp)): - self.fail(f"Non-finite observed/expected ratio: obs={obs}, exp={exp}") + self.assertFinite(obs, name="observed_ratio") + self.assertFinite(exp, name="expected_ratio") self.assertLess( abs(obs - exp), 5e-6, @@ -840,10 +1021,11 @@ class TestRewardAlignment(RewardSpaceTestBase): ) def test_scale_invariance_and_decomposition(self): - """Reward components should scale linearly with base_factor and total == sum of components. + """Core reward components scale ~ linearly with base_factor; total = core + shaping + additives. Contract: - R(base_factor * k) = k * R(base_factor) for each non-zero component. + For each non-zero core component C: C(base_factor * k) ≈ k * C(base_factor). + Decomposition uses total = (exit + idle + hold + invalid + shaping + entry_additive + exit_additive). """ params = self.DEFAULT_PARAMS.copy() # Remove internal base_factor so the explicit argument is used @@ -887,7 +1069,7 @@ class TestRewardAlignment(RewardSpaceTestBase): position=Positions.Neutral, action=Actions.Neutral, ), - # Holding penalty + # Hold penalty RewardContext( pnl=0.0, trade_duration=80, @@ -921,13 +1103,16 @@ class TestRewardAlignment(RewardSpaceTestBase): action_masking=True, ) - # Strict decomposition: total must equal sum of components + # Decomposition including shaping + additives (environment always applies PBRS pipeline) for br in (br1, br2): comp_sum = ( br.exit_component + br.idle_penalty - + br.holding_penalty + + br.hold_penalty + br.invalid_penalty + + br.shaping_reward + + br.entry_additive + + br.exit_additive ) self.assertAlmostEqual( br.total, @@ -940,16 +1125,23 @@ class TestRewardAlignment(RewardSpaceTestBase): components1 = { "exit_component": br1.exit_component, "idle_penalty": br1.idle_penalty, - "holding_penalty": br1.holding_penalty, + "hold_penalty": br1.hold_penalty, "invalid_penalty": br1.invalid_penalty, - "total": br1.total, + # Exclude shaping/additives from scale invariance check (some may have nonlinear dependence) + "total": br1.exit_component + + br1.idle_penalty + + br1.hold_penalty + + br1.invalid_penalty, } components2 = { "exit_component": br2.exit_component, "idle_penalty": br2.idle_penalty, - "holding_penalty": br2.holding_penalty, + "hold_penalty": br2.hold_penalty, "invalid_penalty": br2.invalid_penalty, - "total": br2.total, + "total": br2.exit_component + + br2.idle_penalty + + br2.hold_penalty + + br2.invalid_penalty, } for key, v1 in components1.items(): v2 = components2[key] @@ -1064,13 +1256,9 @@ class TestPublicAPI(RewardSpaceTestBase): self.assertIn("pnl", results) for metric, (mean, ci_low, ci_high) in results.items(): - self.assertTrue(np.isfinite(mean), f"Mean for {metric} should be finite") - self.assertTrue( - np.isfinite(ci_low), f"CI low for {metric} should be finite" - ) - self.assertTrue( - np.isfinite(ci_high), f"CI high for {metric} should be finite" - ) + self.assertFinite(mean, name=f"mean[{metric}]") + self.assertFinite(ci_low, name=f"ci_low[{metric}]") + self.assertFinite(ci_high, name=f"ci_high[{metric}]") self.assertLess( ci_low, ci_high, f"CI bounds for {metric} should be ordered" ) @@ -1089,7 +1277,7 @@ class TestPublicAPI(RewardSpaceTestBase): { "reward_total": np.random.normal(0, 1, 300), "reward_idle": np.where(idle_mask, np.random.normal(-1, 0.3, 300), 0.0), - "reward_holding": np.where( + "reward_hold": np.where( ~idle_mask, np.random.normal(-0.5, 0.2, 300), 0.0 ), "reward_exit": np.random.normal(0.8, 0.6, 300), @@ -1208,49 +1396,31 @@ class TestStatisticalValidation(RewardSpaceTestBase): # KL divergence must be >= 0 kl_key = f"{feature}_kl_divergence" if kl_key in metrics: - self.assertGreaterEqual( - metrics[kl_key], 0, f"KL divergence for {feature} must be >= 0" - ) + self.assertDistanceMetric(metrics[kl_key], name=kl_key) # JS distance must be in [0, 1] js_key = f"{feature}_js_distance" if js_key in metrics: - js_val = metrics[js_key] - self.assertGreaterEqual( - js_val, 0, f"JS distance for {feature} must be >= 0" - ) - self.assertLessEqual( - js_val, 1, f"JS distance for {feature} must be <= 1" - ) + self.assertDistanceMetric(metrics[js_key], upper=1.0, name=js_key) # Wasserstein must be >= 0 ws_key = f"{feature}_wasserstein" if ws_key in metrics: - self.assertGreaterEqual( - metrics[ws_key], - 0, - f"Wasserstein distance for {feature} must be >= 0", - ) + self.assertDistanceMetric(metrics[ws_key], name=ws_key) # KS statistic must be in [0, 1] ks_stat_key = f"{feature}_ks_statistic" if ks_stat_key in metrics: - ks_val = metrics[ks_stat_key] - self.assertGreaterEqual( - ks_val, 0, f"KS statistic for {feature} must be >= 0" - ) - self.assertLessEqual( - ks_val, 1, f"KS statistic for {feature} must be <= 1" + self.assertDistanceMetric( + metrics[ks_stat_key], upper=1.0, name=ks_stat_key ) # KS p-value must be in [0, 1] ks_p_key = f"{feature}_ks_pvalue" if ks_p_key in metrics: - p_val = metrics[ks_p_key] - self.assertGreaterEqual( - p_val, 0, f"KS p-value for {feature} must be >= 0" + self.assertPValue( + metrics[ks_p_key], msg=f"KS p-value out of bounds for {feature}" ) - self.assertLessEqual(p_val, 1, f"KS p-value for {feature} must be <= 1") def test_heteroscedasticity_pnl_validation(self): """Test that PnL variance increases with trade duration (heteroscedasticity).""" @@ -1390,7 +1560,7 @@ class TestStatisticalValidation(RewardSpaceTestBase): { "reward_total": np.random.normal(0, 1, 300), "reward_idle": np.random.normal(-1, 0.5, 300), - "reward_holding": np.random.normal(-0.5, 0.3, 300), + "reward_hold": np.random.normal(-0.5, 0.3, 300), "reward_exit": np.random.normal(1, 0.8, 300), "pnl": np.random.normal(0.01, 0.02, 300), "trade_duration": np.random.uniform(5, 150, 300), @@ -1406,25 +1576,18 @@ class TestStatisticalValidation(RewardSpaceTestBase): # Skewness can be any real number, but should be finite if f"{col}_skewness" in diagnostics: skew = diagnostics[f"{col}_skewness"] - self.assertTrue( - np.isfinite(skew), f"Skewness for {col} should be finite" - ) + self.assertFinite(skew, name=f"skewness[{col}]") # Kurtosis should be finite (can be negative for platykurtic distributions) if f"{col}_kurtosis" in diagnostics: kurt = diagnostics[f"{col}_kurtosis"] - self.assertTrue( - np.isfinite(kurt), f"Kurtosis for {col} should be finite" - ) + self.assertFinite(kurt, name=f"kurtosis[{col}]") # Shapiro p-value must be in [0, 1] if f"{col}_shapiro_pval" in diagnostics: - p_val = diagnostics[f"{col}_shapiro_pval"] - self.assertGreaterEqual( - p_val, 0, f"Shapiro p-value for {col} must be >= 0" - ) - self.assertLessEqual( - p_val, 1, f"Shapiro p-value for {col} must be <= 1" + self.assertPValue( + diagnostics[f"{col}_shapiro_pval"], + msg=f"Shapiro p-value bounds for {col}", ) # Test hypothesis tests results bounds @@ -1433,28 +1596,20 @@ class TestStatisticalValidation(RewardSpaceTestBase): for test_name, result in hypothesis_results.items(): # All p-values must be in [0, 1] if "p_value" in result: - p_val = result["p_value"] - self.assertGreaterEqual( - p_val, 0, f"p-value for {test_name} must be >= 0" + self.assertPValue( + result["p_value"], msg=f"p-value bounds for {test_name}" ) - self.assertLessEqual(p_val, 1, f"p-value for {test_name} must be <= 1") # Effect size epsilon squared (ANOVA/Kruskal) must be finite and >= 0 if "effect_size_epsilon_sq" in result: eps2 = result["effect_size_epsilon_sq"] - self.assertTrue( - np.isfinite(eps2), - f"Effect size epsilon^2 for {test_name} should be finite", - ) + self.assertFinite(eps2, name=f"epsilon_sq[{test_name}]") self.assertGreaterEqual( eps2, 0.0, f"Effect size epsilon^2 for {test_name} must be >= 0" ) # Rank-biserial correlation (Mann-Whitney) must be finite in [-1, 1] if "effect_size_rank_biserial" in result: rb = result["effect_size_rank_biserial"] - self.assertTrue( - np.isfinite(rb), - f"Rank-biserial correlation for {test_name} should be finite", - ) + self.assertFinite(rb, name=f"rank_biserial[{test_name}]") self.assertGreaterEqual( rb, -1.0, f"Rank-biserial correlation for {test_name} must be >= -1" ) @@ -1464,7 +1619,8 @@ class TestStatisticalValidation(RewardSpaceTestBase): # Generic correlation effect size (Spearman/Pearson) if present if "rho" in result: rho = result["rho"] - if rho is not None and np.isfinite(rho): + if rho is not None: + self.assertFinite(rho, name=f"rho[{test_name}]") self.assertGreaterEqual( rho, -1.0, f"Correlation rho for {test_name} must be >= -1" ) @@ -1501,11 +1657,8 @@ class TestStatisticalValidation(RewardSpaceTestBase): self.assertIn("significant_adj", res, f"Missing significant_adj in {name}") p_raw = res["p_value"] p_adj = res["p_value_adj"] - # Bounds & ordering - self.assertTrue(0 <= p_raw <= 1, f"Raw p-value out of bounds ({p_raw})") - self.assertTrue( - 0 <= p_adj <= 1, f"Adjusted p-value out of bounds ({p_adj})" - ) + self.assertPValue(p_raw, msg=f"Raw p-value out of bounds ({p_raw})") + self.assertPValue(p_adj, msg=f"Adjusted p-value out of bounds ({p_adj})") # BH should not reduce p-value (non-decreasing) after monotonic enforcement self.assertGreaterEqual( p_adj, @@ -1522,15 +1675,11 @@ class TestStatisticalValidation(RewardSpaceTestBase): # Optional: if effect sizes present, basic bounds if "effect_size_epsilon_sq" in res: eff = res["effect_size_epsilon_sq"] - self.assertTrue( - np.isfinite(eff), f"Effect size finite check failed for {name}" - ) + self.assertFinite(eff, name=f"effect_size[{name}]") self.assertGreaterEqual(eff, 0, f"ε² should be >=0 for {name}") if "effect_size_rank_biserial" in res: rb = res["effect_size_rank_biserial"] - self.assertTrue( - np.isfinite(rb), f"Rank-biserial finite check failed for {name}" - ) + self.assertFinite(rb, name=f"rank_biserial[{name}]") self.assertGreaterEqual(rb, -1, f"Rank-biserial lower bound {name}") self.assertLessEqual(rb, 1, f"Rank-biserial upper bound {name}") @@ -1582,7 +1731,7 @@ class TestStatisticalValidation(RewardSpaceTestBase): "reward_total", "reward_invalid", "reward_idle", - "reward_holding", + "reward_hold", "reward_exit", ] for col in required_columns: @@ -1636,10 +1785,7 @@ class TestStatisticalValidation(RewardSpaceTestBase): ) # Total should always be finite - self.assertTrue( - np.isfinite(breakdown.total), - f"Reward total should be finite for {position}/{action}", - ) + self.assertFinite(breakdown.total, name="breakdown.total") class TestBoundaryConditions(RewardSpaceTestBase): @@ -1673,10 +1819,7 @@ class TestBoundaryConditions(RewardSpaceTestBase): action_masking=True, ) - self.assertTrue( - np.isfinite(breakdown.total), - "Reward should be finite even with extreme parameters", - ) + self.assertFinite(breakdown.total, name="breakdown.total") def test_different_exit_attenuation_modes(self): """Test different exit attenuation modes (legacy, sqrt, linear, power, half_life).""" @@ -1708,14 +1851,10 @@ class TestBoundaryConditions(RewardSpaceTestBase): action_masking=True, ) - self.assertTrue( - np.isfinite(breakdown.exit_component), - f"Exit component should be finite for mode {mode}", - ) - self.assertTrue( - np.isfinite(breakdown.total), - f"Total reward should be finite for mode {mode}", + self.assertFinite( + breakdown.exit_component, name="breakdown.exit_component" ) + self.assertFinite(breakdown.total, name="breakdown.total") class TestHelperFunctions(RewardSpaceTestBase): @@ -1788,7 +1927,7 @@ class TestHelperFunctions(RewardSpaceTestBase): "reward_idle": np.concatenate( [np.zeros(150), np.random.normal(-1, 0.5, 50)] ), - "reward_holding": np.concatenate( + "reward_hold": np.concatenate( [np.zeros(150), np.random.normal(-0.5, 0.3, 50)] ), "reward_exit": np.concatenate( @@ -1889,13 +2028,20 @@ class TestPrivateFunctions(RewardSpaceTestBase): ) self.assertLess(breakdown.idle_penalty, 0, "Idle penalty should be negative") - self.assertEqual( - breakdown.total, breakdown.idle_penalty, "Total should equal idle penalty" + # Total now includes shaping/additives - require equality including those components. + self.assertAlmostEqualFloat( + breakdown.total, + breakdown.idle_penalty + + breakdown.shaping_reward + + breakdown.entry_additive + + breakdown.exit_additive, + tolerance=1e-9, + msg="Total should equal sum of components (idle + shaping/additives)", ) - def test_holding_penalty_via_rewards(self): - """Test holding penalty calculation via reward calculation.""" - # Create context that will trigger holding penalty + def test_hold_penalty_via_rewards(self): + """Test hold penalty calculation via reward calculation.""" + # Create context that will trigger hold penalty context = RewardContext( pnl=0.01, trade_duration=150, @@ -1917,13 +2063,15 @@ class TestPrivateFunctions(RewardSpaceTestBase): action_masking=True, ) - self.assertLess( - breakdown.holding_penalty, 0, "Holding penalty should be negative" - ) - self.assertEqual( + self.assertLess(breakdown.hold_penalty, 0, "Hold penalty should be negative") + self.assertAlmostEqualFloat( breakdown.total, - breakdown.holding_penalty, - "Total should equal holding penalty", + breakdown.hold_penalty + + breakdown.shaping_reward + + breakdown.entry_additive + + breakdown.exit_additive, + tolerance=1e-9, + msg="Total should equal sum of components (hold + shaping/additives)", ) def test_exit_reward_calculation(self): @@ -1995,14 +2143,18 @@ class TestPrivateFunctions(RewardSpaceTestBase): self.assertLess( breakdown.invalid_penalty, 0, "Invalid action should have negative penalty" ) - self.assertEqual( + self.assertAlmostEqualFloat( breakdown.total, - breakdown.invalid_penalty, - "Total should equal invalid penalty", + breakdown.invalid_penalty + + breakdown.shaping_reward + + breakdown.entry_additive + + breakdown.exit_additive, + tolerance=1e-9, + msg="Total should equal invalid penalty plus shaping/additives", ) - def test_holding_penalty_zero_before_max_duration(self): - """Test holding penalty logic: zero penalty before max_trade_duration.""" + def test_hold_penalty_zero_before_max_duration(self): + """Test hold penalty logic: zero penalty before max_trade_duration.""" max_duration = 128 # Test cases: before, at, and after max_duration @@ -2017,7 +2169,7 @@ class TestPrivateFunctions(RewardSpaceTestBase): for trade_duration, description in test_cases: with self.subTest(duration=trade_duration, desc=description): context = RewardContext( - pnl=0.0, # Neutral PnL to isolate holding penalty + pnl=0.0, # Neutral PnL to isolate hold penalty trade_duration=trade_duration, idle_duration=0, max_trade_duration=max_duration, @@ -2042,34 +2194,37 @@ class TestPrivateFunctions(RewardSpaceTestBase): if duration_ratio < 1.0: # Before max_duration: should be exactly 0.0 self.assertEqual( - breakdown.holding_penalty, + breakdown.hold_penalty, 0.0, - f"Holding penalty should be 0.0 {description} (ratio={duration_ratio:.2f})", + f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})", ) elif duration_ratio == 1.0: # At max_duration: (1.0-1.0)^power = 0, so should be 0.0 self.assertEqual( - breakdown.holding_penalty, + breakdown.hold_penalty, 0.0, - f"Holding penalty should be 0.0 {description} (ratio={duration_ratio:.2f})", + f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})", ) else: # After max_duration: should be negative self.assertLess( - breakdown.holding_penalty, + breakdown.hold_penalty, 0.0, - f"Holding penalty should be negative {description} (ratio={duration_ratio:.2f})", + f"Hold penalty should be negative {description} (ratio={duration_ratio:.2f})", ) - # Total should equal holding penalty (no other components active) - self.assertEqual( + self.assertAlmostEqualFloat( breakdown.total, - breakdown.holding_penalty, - f"Total should equal holding penalty {description}", + breakdown.hold_penalty + + breakdown.shaping_reward + + breakdown.entry_additive + + breakdown.exit_additive, + tolerance=1e-9, + msg=f"Total mismatch including shaping {description}", ) - def test_holding_penalty_progressive_scaling(self): - """Test that holding penalty scales progressively after max_duration.""" + def test_hold_penalty_progressive_scaling(self): + """Test that hold penalty scales progressively after max_duration.""" max_duration = 100 durations = [150, 200, 300] # All > max_duration penalties: list[float] = [] @@ -2096,7 +2251,7 @@ class TestPrivateFunctions(RewardSpaceTestBase): action_masking=True, ) - penalties.append(breakdown.holding_penalty) + penalties.append(breakdown.hold_penalty) # Penalties should be increasingly negative (monotonic decrease) for i in range(1, len(penalties)): @@ -2134,19 +2289,17 @@ class TestPrivateFunctions(RewardSpaceTestBase): short_allowed=True, action_masking=True, ) - self.assertTrue( - np.isfinite(breakdown.exit_component), "Exit component must be finite" - ) + self.assertFinite(breakdown.exit_component, name="exit_component") class TestRewardRobustness(RewardSpaceTestBase): """Tests implementing all prioritized robustness enhancements. Covers: - - Reward decomposition integrity (total == sum of active component exactly) + - Reward decomposition integrity (total == core components + shaping + additives) - Exit factor monotonic attenuation per mode where mathematically expected - Boundary parameter conditions (tau extremes, plateau grace edges, linear slope = 0) - - Non-linear power tests for idle & holding penalties (power != 1) + - Non-linear power tests for idle & hold penalties (power != 1) - Warning emission (exit_factor_threshold) without capping """ @@ -2192,7 +2345,7 @@ class TestRewardRobustness(RewardSpaceTestBase): ), active="idle_penalty", ), - # Holding penalty only + # Hold penalty only dict( ctx=RewardContext( pnl=0.0, @@ -2204,7 +2357,7 @@ class TestRewardRobustness(RewardSpaceTestBase): position=Positions.Long, action=Actions.Neutral, ), - active="holding_penalty", + active="hold_penalty", ), # Exit reward only (positive pnl) dict( @@ -2242,7 +2395,7 @@ class TestRewardRobustness(RewardSpaceTestBase): components = [ br.invalid_penalty, br.idle_penalty, - br.holding_penalty, + br.hold_penalty, br.exit_component, ] non_zero = [ @@ -2255,9 +2408,12 @@ class TestRewardRobustness(RewardSpaceTestBase): ) self.assertAlmostEqualFloat( br.total, - non_zero[0], + non_zero[0] + + br.shaping_reward + + br.entry_additive + + br.exit_additive, tolerance=1e-9, - msg=f"Total mismatch for {sc['active']}", + msg=f"Total mismatch including shaping for {sc['active']}", ) def test_exit_factor_monotonic_attenuation(self): @@ -2510,7 +2666,7 @@ class TestParameterValidation(RewardSpaceTestBase): self.assertIn("exit_power_tau", adjustments) self.assertIn("min=", str(adjustments["exit_power_tau"]["reason"])) - def test_idle_and_holding_penalty_power(self): + def test_idle_and_hold_penalty_power(self): """Test non-linear scaling when penalty powers != 1.""" params = self.DEFAULT_PARAMS.copy() params["idle_penalty_power"] = 2.0 @@ -2557,8 +2713,8 @@ class TestParameterValidation(RewardSpaceTestBase): delta=0.8, msg=f"Idle penalty quadratic scaling mismatch (ratio={ratio_quadratic})", ) - # Holding penalty with power 2: durations just above threshold - params["holding_penalty_power"] = 2.0 + # Hold penalty with power 2: durations just above threshold + params["hold_penalty_power"] = 2.0 ctx_h1 = RewardContext( pnl=0.0, trade_duration=130, @@ -2570,7 +2726,7 @@ class TestParameterValidation(RewardSpaceTestBase): action=Actions.Neutral, ) ctx_h2 = dataclasses.replace(ctx_h1, trade_duration=140) - # Compute baseline and comparison holding penalties + # Compute baseline and comparison hold penalties br_h1 = calculate_reward( ctx_h1, params, @@ -2591,13 +2747,13 @@ class TestParameterValidation(RewardSpaceTestBase): ) # Quadratic scaling: ((140-100)/(130-100))^2 = (40/30)^2 ≈ 1.777... hold_ratio = 0.0 - if br_h1.holding_penalty != 0: - hold_ratio = br_h2.holding_penalty / br_h1.holding_penalty + if br_h1.hold_penalty != 0: + hold_ratio = br_h2.hold_penalty / br_h1.hold_penalty self.assertAlmostEqual( abs(hold_ratio), (40 / 30) ** 2, delta=0.4, - msg=f"Holding penalty quadratic scaling mismatch (ratio={hold_ratio})", + msg=f"Hold penalty quadratic scaling mismatch (ratio={hold_ratio})", ) def test_exit_factor_threshold_warning_emission(self): @@ -2862,6 +3018,226 @@ class TestLoadRealEpisodes(RewardSpaceTestBase): self.assertIn("pnl", loaded_data.columns) +class TestPBRSIntegration(RewardSpaceTestBase): + """Tests for PBRS (Potential-Based Reward Shaping) integration.""" + + def test_tanh_transform(self): + """tanh transform: bounded in [-1,1], symmetric.""" + self.assertAlmostEqualFloat(apply_transform("tanh", 0.0), 0.0) + self.assertAlmostEqualFloat(apply_transform("tanh", 1.0), math.tanh(1.0)) + self.assertAlmostEqualFloat(apply_transform("tanh", -1.0), math.tanh(-1.0)) + self.assertTrue(abs(apply_transform("tanh", 100.0)) <= 1.0) + self.assertTrue(abs(apply_transform("tanh", -100.0)) <= 1.0) + + def test_softsign_transform(self): + """softsign transform: x/(1+|x|) in (-1,1).""" + self.assertAlmostEqualFloat(apply_transform("softsign", 0.0), 0.0) + self.assertAlmostEqualFloat(apply_transform("softsign", 1.0), 0.5) + self.assertAlmostEqualFloat(apply_transform("softsign", -1.0), -0.5) + self.assertTrue(abs(apply_transform("softsign", 100.0)) < 1.0) + self.assertTrue(abs(apply_transform("softsign", -100.0)) < 1.0) + + def test_arctan_transform(self): + """arctan transform: normalized (2/pi)atan(x) bounded [-1,1].""" + # Environment uses normalized arctan: (2/pi)*atan(x) + self.assertAlmostEqualFloat(apply_transform("arctan", 0.0), 0.0) + self.assertAlmostEqualFloat( + apply_transform("arctan", 1.0), + (2.0 / math.pi) * math.atan(1.0), + tolerance=1e-10, + ) + self.assertTrue(abs(apply_transform("arctan", 100.0)) <= 1.0) + self.assertTrue(abs(apply_transform("arctan", -100.0)) <= 1.0) + + def test_logistic_transform(self): + """logistic transform: 2σ(x)-1 in (-1,1).""" + # Environment logistic returns 2σ(x)-1 centered at 0 in (-1,1) + self.assertAlmostEqualFloat(apply_transform("logistic", 0.0), 0.0) + self.assertTrue(apply_transform("logistic", 100.0) > 0.99) + self.assertTrue(apply_transform("logistic", -100.0) < -0.99) + self.assertTrue(-1 < apply_transform("logistic", 10.0) < 1) + self.assertTrue(-1 < apply_transform("logistic", -10.0) < 1) + + def test_clip_transform(self): + """clip transform: clamp to [-1,1].""" + self.assertAlmostEqualFloat(apply_transform("clip", 0.0), 0.0) + self.assertAlmostEqualFloat(apply_transform("clip", 0.5), 0.5) + self.assertAlmostEqualFloat(apply_transform("clip", 2.0), 1.0) + self.assertAlmostEqualFloat(apply_transform("clip", -2.0), -1.0) + + def test_invalid_transform(self): + """Test error handling for invalid transforms.""" + # Environment falls back silently to tanh + self.assertAlmostEqualFloat( + apply_transform("invalid_transform", 1.0), math.tanh(1.0), tolerance=1e-9 + ) + + def test_get_float_param(self): + """Test float parameter extraction.""" + params = {"test_float": 1.5, "test_int": 2, "test_str": "hello"} + self.assertEqual(_get_float_param(params, "test_float", 0.0), 1.5) + self.assertEqual(_get_float_param(params, "test_int", 0.0), 2.0) + # Non parseable string -> NaN fallback in tolerant parser + val_str = _get_float_param(params, "test_str", 0.0) + if isinstance(val_str, float) and math.isnan(val_str): + pass + else: + self.fail("Expected NaN for non-numeric string in _get_float_param") + self.assertEqual(_get_float_param(params, "missing", 3.14), 3.14) + + def test_get_str_param(self): + """Test string parameter extraction.""" + params = {"test_str": "hello", "test_int": 2} + self.assertEqual(_get_str_param(params, "test_str", "default"), "hello") + self.assertEqual(_get_str_param(params, "test_int", "default"), "default") + self.assertEqual(_get_str_param(params, "missing", "default"), "default") + + def test_get_bool_param(self): + """Test boolean parameter extraction.""" + params = { + "test_true": True, + "test_false": False, + "test_int": 1, + "test_str": "yes", + } + self.assertTrue(_get_bool_param(params, "test_true", False)) + self.assertFalse(_get_bool_param(params, "test_false", True)) + # Environment coerces typical truthy numeric/string values + self.assertTrue(_get_bool_param(params, "test_int", False)) + self.assertTrue(_get_bool_param(params, "test_str", False)) + self.assertFalse(_get_bool_param(params, "missing", False)) + + def test_hold_potential_basic(self): + """Test basic hold potential calculation.""" + params = { + "hold_potential_enabled": True, + "hold_potential_scale": 1.0, + "hold_potential_gain": 1.0, + "hold_potential_transform_pnl": "tanh", + "hold_potential_transform_duration": "tanh", + } + val = _compute_hold_potential(0.5, 0.3, params) + self.assertFinite(val, name="hold_potential") + + def test_entry_additive_disabled(self): + """Test entry additive when disabled.""" + params = {"entry_additive_enabled": False} + val = _compute_entry_additive(0.5, 0.3, params) + self.assertEqual(val, 0.0) + + def test_exit_additive_disabled(self): + """Test exit additive when disabled.""" + params = {"exit_additive_enabled": False} + val = _compute_exit_additive(0.5, 0.3, params) + self.assertEqual(val, 0.0) + + def test_exit_potential_canonical(self): + """Test exit potential in canonical mode.""" + params = {"exit_potential_mode": "canonical"} + val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0) + self.assertEqual(val, 0.0) + + def test_exit_potential_progressive_release(self): + """Progressive release: Φ' = Φ * (1 - decay).""" + params = { + "exit_potential_mode": "progressive_release", + "exit_potential_decay": 0.8, + } + # Expected: Φ' = Φ * (1 - decay) = 1 * (1 - 0.8) = 0.2 + val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0) + self.assertAlmostEqual(val, 0.2) + + def test_exit_potential_spike_cancel(self): + """Spike cancel: Φ' = Φ / γ (inversion).""" + params = {"exit_potential_mode": "spike_cancel", "potential_gamma": 0.95} + val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0) + self.assertAlmostEqual(val, 1.0 / 0.95, places=7) + + def test_exit_potential_retain_previous(self): + """Test exit potential in retain previous mode.""" + params = {"exit_potential_mode": "retain_previous"} + val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0) + self.assertEqual(val, 1.0) + + def test_pbrs_terminal_canonical(self): + """Test PBRS behavior in canonical mode with terminal state.""" + params = { + "potential_gamma": 0.95, + "hold_potential_enabled": True, + "hold_potential_scale": 1.0, + "hold_potential_gain": 1.0, + "hold_potential_transform_pnl": "tanh", + "hold_potential_transform_duration": "tanh", + "exit_potential_mode": "canonical", + "entry_additive_enabled": False, + "exit_additive_enabled": False, + } + + current_pnl = 0.5 + current_duration_ratio = 0.3 + expected_current_potential = _compute_hold_potential( + current_pnl, current_duration_ratio, params + ) + + total_reward, shaping_reward, next_potential = apply_potential_shaping( + base_reward=100.0, + current_pnl=current_pnl, + current_duration_ratio=current_duration_ratio, + next_pnl=0.0, + next_duration_ratio=0.0, + is_terminal=True, + last_potential=0.0, + params=params, + ) + + # Terminal potential should be 0 in canonical mode + self.assertEqual(next_potential, 0.0) + # Shaping reward should be negative (releasing potential) + self.assertTrue(shaping_reward < 0) + # Check exact formula: γΦ(s') - Φ(s) = 0.95 * 0 - expected_current_potential + expected_shaping = 0.95 * 0.0 - expected_current_potential + self.assertAlmostEqual(shaping_reward, expected_shaping, delta=1e-10) + + def test_pbrs_invalid_gamma(self): + """Test PBRS with invalid gamma value.""" + params = {"potential_gamma": 1.5, "hold_potential_enabled": True} + with self.assertRaises(ValueError): + apply_potential_shaping( + base_reward=0.0, + current_pnl=0.0, + current_duration_ratio=0.0, + next_pnl=0.0, + next_duration_ratio=0.0, + is_terminal=True, + last_potential=0.0, + params=params, + ) + + def test_calculate_reward_with_pbrs_integration(self): + """Test that PBRS parameters are properly integrated in defaults.""" + # Test that PBRS parameters are in the default parameters + for param in PBRS_INTEGRATION_PARAMS: + self.assertIn( + param, + DEFAULT_MODEL_REWARD_PARAMETERS, + f"PBRS parameter {param} missing from defaults", + ) + + # Test basic PBRS function integration works + params = {"hold_potential_enabled": True, "hold_potential_scale": 1.0} + potential = _compute_hold_potential(0.1, 0.2, params) + self.assertFinite(potential, name="hold_potential") + + def test_pbrs_default_parameters_completeness(self): + """Test that all required PBRS parameters have defaults.""" + for param in PBRS_REQUIRED_PARAMS: + self.assertIn( + param, + DEFAULT_MODEL_REWARD_PARAMETERS, + f"Missing PBRS parameter: {param}", + ) + + if __name__ == "__main__": # Configure test discovery and execution loader = unittest.TestLoader() diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index a23a61c..cb9e9ca 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -116,6 +116,7 @@ class ReforceXY(BaseReinforcementLearningModel): """ _LOG_2 = math.log(2.0) + DEFAULT_IDLE_DURATION_MULTIPLIER: int = 4 _action_masks_cache: Dict[Tuple[bool, float], NDArray[np.bool_]] = {} def __init__(self, *args, **kwargs): @@ -354,6 +355,7 @@ class ReforceXY(BaseReinforcementLearningModel): model_params: Dict[str, Any] = copy.deepcopy(self.model_training_parameters) model_params.setdefault("seed", 42) + model_params.setdefault("gamma", 0.95) if not self.hyperopt and self.lr_schedule: lr = model_params.get("learning_rate", 0.0003) @@ -655,6 +657,14 @@ class ReforceXY(BaseReinforcementLearningModel): else: tensorboard_log_path = None + # Rebuild train and eval environments before training to sync model parameters + prices_train, prices_test = self.build_ohlc_price_dataframes( + dk.data_dictionary, dk.pair, dk + ) + self.set_train_and_eval_environments( + dk.data_dictionary, prices_train, prices_test, dk + ) + model = self.get_init_model(dk.pair) if model is not None: logger.info( @@ -722,7 +732,7 @@ class ReforceXY(BaseReinforcementLearningModel): dtype=np.float32, copy=False ) n = np_dataframe.shape[0] - window_size: int = self.CONV_WIDTH + window_size: int = self.window_size frame_stacking: int = self.frame_stacking frame_stacking_enabled: bool = bool(frame_stacking) and frame_stacking > 1 inference_masking: bool = self.action_masking and self.inference_masking @@ -1079,6 +1089,7 @@ class ReforceXY(BaseReinforcementLearningModel): seed: Optional[int] = None, env_info: Optional[Dict[str, Any]] = None, trial: Optional[Trial] = None, + model_params: Optional[Dict[str, Any]] = None, ) -> Tuple[VecEnv, VecEnv]: if ( train_df is None @@ -1095,7 +1106,30 @@ class ReforceXY(BaseReinforcementLearningModel): if trial is not None: seed += trial.number set_random_seed(seed) - env_info = self.pack_env_dict(dk.pair) if env_info is None else env_info + env_info: Dict[str, Any] = ( + self.pack_env_dict(dk.pair) if env_info is None else env_info + ) + gamma: Optional[float] = None + best_trial_params: Optional[Dict[str, Any]] = None + if self.hyperopt: + best_trial_params = self.load_best_trial_params( + dk.pair if self.rl_config_optuna.get("per_pair", False) else None + ) + if model_params and isinstance(model_params.get("gamma"), (int, float)): + gamma = model_params.get("gamma") + elif best_trial_params: + gamma = best_trial_params.get("gamma") + elif hasattr(self.model, "gamma") and isinstance( + self.model.gamma, (int, float) + ): + gamma = self.model.gamma + elif isinstance(self.get_model_params().get("gamma"), (int, float)): + gamma = self.get_model_params().get("gamma") + if gamma is not None: + # Align RL agent gamma with PBRS gamma for consistent discount factor + env_info["config"]["freqai"]["rl_config"]["model_reward_parameters"][ + "potential_gamma" + ] = float(gamma) env_prefix = f"trial_{trial.number}_" if trial is not None else "" train_fns = [ @@ -1217,7 +1251,9 @@ class ReforceXY(BaseReinforcementLearningModel): else: tensorboard_log_path = None - train_env, eval_env = self._get_train_and_eval_environments(dk, trial=trial) + train_env, eval_env = self._get_train_and_eval_environments( + dk, trial=trial, model_params=params + ) model = self.MODELCLASS( self.policy_type, @@ -1327,10 +1363,632 @@ class MyRLEnv(Base5ActionRLEnv): self.max_trade_duration_candles: int = self.rl_config.get( "max_trade_duration_candles", 128 ) + # === Constants === + self.MIN_SOFTSIGN_SHARPNESS: float = 0.01 + self.MAX_SOFTSIGN_SHARPNESS: float = 100.0 + # === INTERNAL STATE === self._last_closed_position: Optional[Positions] = None self._last_closed_trade_tick: int = 0 self._max_unrealized_profit: float = -np.inf self._min_unrealized_profit: float = np.inf + self._last_potential: float = 0.0 + # === PBRS INSTRUMENTATION === + self._total_shaping_reward: float = 0.0 + self._last_shaping_reward: float = 0.0 + model_reward_parameters = self.rl_config.get("model_reward_parameters", {}) + # === PBRS COMMON PARAMETERS === + potential_gamma = model_reward_parameters.get("potential_gamma") + if potential_gamma is None: + logger.warning("potential_gamma not specified; defaulting to 0.95") + self._potential_gamma = 0.95 + else: + self._potential_gamma = float(potential_gamma) + self._potential_softsign_sharpness: float = float( + model_reward_parameters.get("potential_softsign_sharpness", 1.0) + ) + self._potential_softsign_sharpness = max( + self.MIN_SOFTSIGN_SHARPNESS, + min(self.MAX_SOFTSIGN_SHARPNESS, self._potential_softsign_sharpness), + ) + # === EXIT POTENTIAL MODE === + # exit_potential_mode options: + # 'canonical' -> Φ(s')=0 (baseline PBRS, preserves invariance) + # 'progressive_release' -> Φ(s')=Φ(s)*(1-decay_factor) + # 'spike_cancel' -> Φ(s')=Φ(s)/γ (Δ ≈ 0, cancels shaping) + # 'retain_previous' -> Φ(s')=Φ(s) + self._exit_potential_mode = str( + model_reward_parameters.get("exit_potential_mode", "canonical") + ) + _allowed_exit_modes = { + "canonical", + "progressive_release", + "spike_cancel", + "retain_previous", + } + if self._exit_potential_mode not in _allowed_exit_modes: + logger.warning( + "Unknown exit_potential_mode '%s'; defaulting to 'canonical'", + self._exit_potential_mode, + ) + self._exit_potential_mode = "canonical" + self._exit_potential_decay: float = float( + model_reward_parameters.get("exit_potential_decay", 0.5) + ) + # === ENTRY ADDITIVE (non-PBRS additive term) === + self._entry_additive_enabled: bool = bool( + model_reward_parameters.get("entry_additive_enabled", False) + ) + self._entry_additive_scale: float = float( + model_reward_parameters.get("entry_additive_scale", 1.0) + ) + self._entry_additive_gain: float = float( + model_reward_parameters.get("entry_additive_gain", 1.0) + ) + self._entry_additive_transform_pnl: str = str( + model_reward_parameters.get("entry_additive_transform_pnl", "tanh") + ) + self._entry_additive_transform_duration: str = str( + model_reward_parameters.get("entry_additive_transform_duration", "tanh") + ) + # === HOLD POTENTIAL (PBRS function Φ) === + self._hold_potential_enabled: bool = bool( + model_reward_parameters.get("hold_potential_enabled", True) + ) + self._hold_potential_scale: float = float( + model_reward_parameters.get("hold_potential_scale", 1.0) + ) + self._hold_potential_gain: float = float( + model_reward_parameters.get("hold_potential_gain", 1.0) + ) + self._hold_potential_transform_pnl: str = str( + model_reward_parameters.get("hold_potential_transform_pnl", "tanh") + ) + self._hold_potential_transform_duration: str = str( + model_reward_parameters.get("hold_potential_transform_duration", "tanh") + ) + # === EXIT ADDITIVE (non-PBRS additive term) === + self._exit_additive_enabled: bool = bool( + model_reward_parameters.get("exit_additive_enabled", False) + ) + self._exit_additive_scale: float = float( + model_reward_parameters.get("exit_additive_scale", 1.0) + ) + self._exit_additive_gain: float = float( + model_reward_parameters.get("exit_additive_gain", 1.0) + ) + self._exit_additive_transform_pnl: str = str( + model_reward_parameters.get("exit_additive_transform_pnl", "tanh") + ) + self._exit_additive_transform_duration: str = str( + model_reward_parameters.get("exit_additive_transform_duration", "tanh") + ) + # === PBRS INVARIANCE CHECKS === + if self._exit_potential_mode == "canonical": + if self._entry_additive_enabled or self._exit_additive_enabled: + if self._entry_additive_enabled: + logger.info( + "Disabling entry additive to preserve PBRS invariance (canonical mode)." + ) + if self._exit_additive_enabled: + logger.info( + "Disabling exit additive to preserve PBRS invariance (canonical mode)." + ) + self._entry_additive_enabled = False + self._exit_additive_enabled = False + + def _get_next_position(self, action: int) -> Positions: + if action == Actions.Long_enter.value and self._position == Positions.Neutral: + return Positions.Long + if ( + action == Actions.Short_enter.value + and self._position == Positions.Neutral + and self.can_short + ): + return Positions.Short + if action == Actions.Long_exit.value and self._position == Positions.Long: + return Positions.Neutral + if action == Actions.Short_exit.value and self._position == Positions.Short: + return Positions.Neutral + return self._position + + def _get_next_transition_state( + self, + action: int, + trade_duration: float, + pnl: float, + ) -> Tuple[Positions, int, float]: + """Compute next transition state tuple.""" + next_position = self._get_next_position(action) + # Entry + if self._position == Positions.Neutral and next_position in ( + Positions.Long, + Positions.Short, + ): + return next_position, 0, 0.0 + # Exit + if ( + self._position in (Positions.Long, Positions.Short) + and next_position == Positions.Neutral + ): + return next_position, 0, 0.0 + # Hold + if self._position in (Positions.Long, Positions.Short) and next_position in ( + Positions.Long, + Positions.Short, + ): + return next_position, int(trade_duration) + 1, pnl + # Neutral self-loop + return next_position, 0, 0.0 + + def _is_invalid_pnl_target(self, pnl_target: float) -> bool: + """Check if pnl_target is invalid (negative or close to zero).""" + return pnl_target < 0.0 or np.isclose(pnl_target, 0.0) + + def _compute_pnl_duration_signal( + self, + *, + enabled: bool, + require_position: bool, + position: Positions, + pnl: float, + pnl_target: float, + duration_ratio: float, + scale: float, + gain: float, + transform_pnl: str, + transform_duration: str, + ) -> float: + """Generic bounded bi-component signal combining PnL and duration. + + Shared logic for: + - Hold potential Φ(s) + - Entry additive + - Exit additive + + Parameters + ---------- + enabled : bool + Whether this signal is active + require_position : bool + If True, only compute when position in (Long, Short) + position : Positions + Current position + pnl : float + PnL used for normalization + pnl_target : float + Target PnL normalizer (>0) + duration_ratio : float + Raw duration ratio + scale : float + Output scaling factor + gain : float + Gain multiplier before transform + transform_pnl : str + Transform name for PnL component + transform_duration : str + Transform name for duration component + + Returns + ------- + float + Bounded signal in [-scale, scale] + """ + if not enabled: + return 0.0 + if require_position and position not in (Positions.Long, Positions.Short): + return 0.0 + if self._is_invalid_pnl_target(pnl_target): + return 0.0 + + duration_ratio = 0.0 if duration_ratio < 0.0 else duration_ratio + if duration_ratio > 1.0: + duration_ratio = 1.0 + + try: + pnl_ratio = pnl / pnl_target + except Exception: + return 0.0 + + pnl_term = self._potential_transform(transform_pnl, gain * pnl_ratio) + dur_term = self._potential_transform(transform_duration, gain * duration_ratio) + value = scale * 0.5 * (pnl_term + dur_term) + return float(value) if np.isfinite(value) else 0.0 + + def _compute_hold_potential( + self, + position: Positions, + duration_ratio: float, + pnl: float, + pnl_target: float, + ) -> float: + """Compute PBRS potential Φ(s) for position holding states. + + See ``_apply_potential_shaping`` for complete PBRS documentation. + """ + return self._compute_pnl_duration_signal( + enabled=self._hold_potential_enabled, + require_position=True, + position=position, + pnl=pnl, + pnl_target=pnl_target, + duration_ratio=duration_ratio, + scale=self._hold_potential_scale, + gain=self._hold_potential_gain, + transform_pnl=self._hold_potential_transform_pnl, + transform_duration=self._hold_potential_transform_duration, + ) + + def _compute_exit_additive( + self, + pnl: float, + pnl_target: float, + duration_ratio: float, + ) -> float: + """Compute exit additive reward for position exit transitions. + + See ``_apply_potential_shaping`` for complete PBRS documentation. + """ + return self._compute_pnl_duration_signal( + enabled=self._exit_additive_enabled, + require_position=False, + position=Positions.Neutral, + pnl=pnl, + pnl_target=pnl_target, + duration_ratio=duration_ratio, + scale=self._exit_additive_scale, + gain=self._exit_additive_gain, + transform_pnl=self._exit_additive_transform_pnl, + transform_duration=self._exit_additive_transform_duration, + ) + + def _compute_entry_additive( + self, + pnl: float, + pnl_target: float, + duration_ratio: float, + ) -> float: + """Compute entry additive reward for position entry transitions. + + See ``_apply_potential_shaping`` for complete PBRS documentation. + """ + return self._compute_pnl_duration_signal( + enabled=self._entry_additive_enabled, + require_position=False, + position=Positions.Neutral, + pnl=pnl, + pnl_target=pnl_target, + duration_ratio=duration_ratio, + scale=self._entry_additive_scale, + gain=self._entry_additive_gain, + transform_pnl=self._entry_additive_transform_pnl, + transform_duration=self._entry_additive_transform_duration, + ) + + def _potential_transform(self, name: str, x: float) -> float: + """Apply bounded transform function for potential and additive computations. + + Provides numerical stability by mapping unbounded inputs to bounded outputs + using various smooth activation functions. Used in both PBRS potentials + and additive reward calculations. + + Parameters + ---------- + name : str + Transform function name: 'tanh', 'softsign', 'softsign_sharp', + 'arctan', 'logistic', 'asinh_norm', or 'clip' + x : float + Input value to transform + + Returns + ------- + float + Bounded output in [-1, 1] + """ + if name == "tanh": + return math.tanh(x) + + if name == "softsign": + ax = abs(x) + return x / (1.0 + ax) + + if name == "softsign_sharp": + s = self._potential_softsign_sharpness + xs = s * x + ax = abs(xs) + return xs / (1.0 + ax) + + if name == "arctan": + return (2.0 / math.pi) * math.atan(x) + + if name == "logistic": + if x >= 0: + z = math.exp(-x) # z in (0,1] + return (1.0 - z) / (1.0 + z) + else: + z = math.exp(x) # z in (0,1] + return (z - 1.0) / (z + 1.0) + + if name == "asinh_norm": + return x / math.hypot(1.0, x) + + if name == "clip": + return max(-1.0, min(1.0, x)) + + logger.info("Unknown potential transform '%s'; falling back to tanh", name) + return math.tanh(x) + + def _compute_exit_potential(self, prev_potential: float, gamma: float) -> float: + """Compute next potential Φ(s') for exit transitions based on exit potential mode. + + See ``_apply_potential_shaping`` for complete PBRS documentation. + """ + mode = self._exit_potential_mode + if mode == "canonical": + return 0.0 + if mode == "progressive_release": + decay = self._exit_potential_decay + if not np.isfinite(decay) or decay < 0.0: + decay = 0.5 + if decay > 1.0: + decay = 1.0 + next_potential = prev_potential * (1.0 - decay) + elif mode == "spike_cancel": + if gamma <= 0.0 or not np.isfinite(gamma): + next_potential = prev_potential + else: + next_potential = prev_potential / gamma + elif mode == "retain_previous": + next_potential = prev_potential + else: + next_potential = 0.0 + if not np.isfinite(next_potential): + next_potential = 0.0 + return next_potential + + def is_pbrs_invariant_mode(self) -> bool: + """Return True if current configuration preserves PBRS policy invariance. + + PBRS policy invariance (Ng et al. 1999) requires: + 1. Canonical exit mode: Φ(terminal) = 0 + 2. No path-dependent additives: entry_additive = exit_additive = 0 + + When True, the shaped policy π'(s) is guaranteed to be equivalent to + the policy π(s) learned with base rewards only. + + Returns + ------- + bool + True if configuration preserves theoretical PBRS invariance + """ + return self._exit_potential_mode == "canonical" and not ( + self._entry_additive_enabled or self._exit_additive_enabled + ) + + def _apply_potential_shaping( + self, + base_reward: float, + action: int, + trade_duration: float, + max_trade_duration: float, + pnl: float, + pnl_target: float, + ) -> float: + """Apply potential-based reward shaping (PBRS) following Ng et al. 1999. + + Implements the canonical PBRS formula: + + R'(s, a, s') = R_base(s, a, s') + γ Φ(s') - Φ(s) + + Notation + -------- + - R_base(s, a, s') : unshaped environment reward (code variable: ``base_reward``) + - Φ(s) : potential before transition (code: ``prev_potential`` / ``self._last_potential``) + - Φ(s') : potential after transition (computed per transition type) + - γ : shaping discount (``self._potential_gamma``) + - Δ(s,s') : shaping term = γ Φ(s') - Φ(s) (logged as ``shaping_reward`` per step) + - R'(s, a, s') : shaped reward delivered to the agent = R_base + Δ(s,s') + (additives if enabled) + - pnl_ratio : pnl / pnl_target (normalized profit component before transform) + - duration_ratio : trade_duration / max_trade_duration (clipped to [0,1] before transform) + + PBRS Theory & Compliance + ------------------------ + This implementation follows academic standards for potential-based reward shaping: + - Ng et al. 1999: Canonical formula with invariance guarantees + - Wiewiora et al. 2003: Terminal state handling (Φ(terminal)=0) + - Maintains policy invariance in canonical mode with proper terminal handling + + Architecture & Transitions + -------------------------- + Three mutually exclusive transition types: + + 1. **Entry** (Neutral → Long/Short): + - Initialize potential Φ for next step: Φ(s') = hold_potential(next_state) + - PBRS shaping reward: γΦ(s') - Φ(s) where Φ(s)=0 (neutral has no potential) + - Optional entry additive (non-PBRS additive term, breaks invariance if used) + + 2. **Hold** (Long/Short → Long/Short): + - Standard PBRS: γΦ(s') - Φ(s) where both potentials computed from hold_potential() + - Φ(s') accounts for updated PnL and trade duration progression + + 3. **Exit** (Long/Short → Neutral): + - **Canonical mode**: Φ(terminal)=0, Δ(s,s') = -Φ(s) + - **Heuristic modes**: Φ(s') computed by _compute_exit_potential(), Δ(s,s') = γΦ(s')-Φ(s) + - Optional exit additive (non-PBRS additive term for trade quality summary) + + Potential Function Φ(s) + ----------------------- + Hold potential formula: Φ(s) = scale * 0.5 * [T_pnl(g*pnl_ratio) + T_dur(g*duration_ratio)] + + **Bounded Transform Functions** (range [-1,1]): + - tanh: smooth saturation, tanh(x) + - softsign: x/(1+|x|), gentler than tanh + - softsign_sharp: softsign(sharpness*x), tunable steepness + - arctan: (2/π)*arctan(x), linear near origin + - logistic: 2σ(x)-1 where σ(x)=1/(1+e^(-x)), numerically stable implementation + - asinh_norm: x/√(1+x²), normalized asinh-like + - clip: hard clamp to [-1,1] + + **Parameters**: + - gain g: sharpens (g>1) or softens (g<1) transform input + - scale: multiplies final potential value + - sharpness: affects softsign_sharp transform (must be >0) + + Exit Potential Modes + -------------------- + **canonical** (PBRS-compliant): + - Φ(s')=0 for all exit transitions + - Maintains theoretical invariance guarantees + - Shaping reward: γ·0-Φ(s) = -Φ(s) + - Entry/exit additives automatically disabled to preserve invariance + + **progressive_release** (heuristic): + - Φ(s')=Φ(s)*(1-decay_factor), gradual decay + - Shaping reward: γΦ(s')-Φ(s) = γΦ(s)*(1-d)-Φ(s) + + **spike_cancel** (heuristic): + - Φ(s')=Φ(s)/γ, aims for zero net shaping + - Shaping reward: γΦ(s')-Φ(s) = γ*(Φ(s)/γ)-Φ(s) = 0 + + **retain_previous** (heuristic): + - Φ(s')=Φ(s), full retention + - Shaping reward: (γ-1)Φ(s) + + Additive Components & Path Dependence + ------------------------------------ + **Entry/Exit Additive Terms**: Non-PBRS additive rewards that break invariance + - Entry additive: Applied at entry transitions, computed via _compute_entry_additive() + - Exit additive: Applied at exit transitions, computed via _compute_exit_additive() + - Neither additive persists in stored potential (maintains neutrality) + + **Path Dependence**: Only canonical mode preserves PBRS invariance. Heuristic + exit modes introduce path dependence through non-zero terminal potentials. + + Invariance & Validation + ----------------------- + **Theoretical Guarantee**: In canonical mode, ∑ Δ(s,s') = 0 over + complete episodes due to Φ(terminal)=0. Entry/exit additives are automatically + disabled in canonical mode to preserve this invariance. + + **Deviations from Theory**: + - Heuristic exit modes violate invariance + - Entry/exit additives break policy invariance + - Non-canonical modes may cause path-dependent learning + + **Robustness**: + - Bounded transforms prevent potential explosion + - Finite value validation with fallback to 0 + - Terminal state enforcement: Φ(s)=0 when terminated=True + - All transform functions are strictly bounded in [-1, 1], ensuring numerical stability + + Parameters + ---------- + base_reward : float + Original reward before shaping + action : int + Action taken leading to transition + trade_duration : float + Current trade duration in candles + max_trade_duration : float + Maximum allowed trade duration + pnl : float + Current position PnL + pnl_target : float + Target PnL for normalization + + Returns + ------- + float + Shaped reward R'(s,a,s') = R_base + Δ(s,s') + optional_additives + + Notes + ----- + - Use canonical mode for theoretical compliance + - Monitor ∑Δ(s,s') for invariance validation (should sum to 0 over episodes) + - Heuristic exit modes are experimental and may affect convergence + - Transform validation removed from runtime (deferred to analysis tools) + - In canonical exit mode, Φ is reset to 0 at exit boundaries, ensuring telescoping cancellation (∑Δ=0) over closed episodes + """ + if not self._hold_potential_enabled and not ( + self._entry_additive_enabled or self._exit_additive_enabled + ): + return base_reward + prev_potential = self._last_potential + next_position, next_trade_duration, next_pnl = self._get_next_transition_state( + action=action, trade_duration=trade_duration, pnl=pnl + ) + if max_trade_duration <= 0: + next_duration_ratio = 0.0 + else: + next_duration_ratio = next_trade_duration / max_trade_duration + + is_entry = self._position == Positions.Neutral and next_position in ( + Positions.Long, + Positions.Short, + ) + is_exit = ( + self._position in (Positions.Long, Positions.Short) + and next_position == Positions.Neutral + ) + is_hold = self._position in ( + Positions.Long, + Positions.Short, + ) and next_position in (Positions.Long, Positions.Short) + + gamma = self._potential_gamma + if is_entry: + if self._hold_potential_enabled: + potential = self._compute_hold_potential( + next_position, next_duration_ratio, next_pnl, pnl_target + ) + shaping_reward = gamma * potential - prev_potential + self._last_potential = potential + else: + shaping_reward = 0.0 + self._last_potential = 0.0 + entry_additive = self._compute_entry_additive( + pnl=next_pnl, pnl_target=pnl_target, duration_ratio=next_duration_ratio + ) + self._last_shaping_reward = float(shaping_reward) + self._total_shaping_reward += float(shaping_reward) + return base_reward + shaping_reward + entry_additive + elif is_hold: + if self._hold_potential_enabled: + potential = self._compute_hold_potential( + next_position, next_duration_ratio, next_pnl, pnl_target + ) + shaping_reward = gamma * potential - prev_potential + self._last_potential = potential + else: + shaping_reward = 0.0 + self._last_potential = 0.0 + self._last_shaping_reward = float(shaping_reward) + self._total_shaping_reward += float(shaping_reward) + return base_reward + shaping_reward + elif is_exit: + if self._exit_potential_mode == "canonical": + next_potential = 0.0 + exit_shaping_reward = -prev_potential + else: + next_potential = self._compute_exit_potential(prev_potential, gamma) + exit_shaping_reward = gamma * next_potential - prev_potential + + exit_additive = 0.0 + if self._exit_additive_enabled and not self.is_pbrs_invariant_mode(): + duration_ratio = trade_duration / max(max_trade_duration, 1) + exit_additive = self._compute_exit_additive( + pnl, pnl_target, duration_ratio + ) + + exit_reward = exit_shaping_reward + exit_additive + self._last_potential = next_potential + self._last_shaping_reward = float(exit_shaping_reward) + self._total_shaping_reward += float(exit_shaping_reward) + return base_reward + exit_reward + else: + # Neutral self-loop + self._last_potential = 0.0 + self._last_shaping_reward = 0.0 + return base_reward def _set_observation_space(self) -> None: """ @@ -1376,6 +2034,9 @@ class MyRLEnv(Base5ActionRLEnv): self._last_closed_trade_tick: int = 0 self._max_unrealized_profit = -np.inf self._min_unrealized_profit = np.inf + self._last_potential = 0.0 + self._total_shaping_reward = 0.0 + self._last_shaping_reward = 0.0 return observation, history def _get_exit_factor( @@ -1542,65 +2203,56 @@ class MyRLEnv(Base5ActionRLEnv): return max(0.0, pnl_target_factor * efficiency_factor) def calculate_reward(self, action: int) -> float: - """ - An example reward function. This is the one function that users will likely - wish to inject their own creativity into. - - Warning! - This is function is a showcase of functionality designed to show as many possible - environment control features as possible. It is also designed to run quickly - on small computers. This is a benchmark, it is *not* for live production. - - :param action: int = The action made by the agent for the current candle. - :return: - float = the reward to give to the agent for current step (used for optimization - of weights in NN) + """Compute per-step reward and apply potential-based reward shaping (PBRS). + + Reward Pipeline: + 1. Invalid action penalty + 2. Idle penalty + 3. Hold overtime penalty + 4. Exit reward + 5. Default fallback (0.0 if no specific reward) + 6. PBRS application: R'(s,a,s') = R_base + Δ(s,s') + optional_additives + + The final shaped reward is what the RL agent receives for learning. + In canonical PBRS mode, the learned policy is theoretically equivalent + to training on base rewards only (policy invariance). + + Parameters + ---------- + action : int + Action index taken by the agent + + Returns + ------- + float + Shaped reward R'(s,a,s') = R_base + Δ(s,s') + optional_additives """ model_reward_parameters = self.rl_config.get("model_reward_parameters", {}) + base_reward: Optional[float] = None - # first, penalize if the action is not valid + # 1. Invalid action if not self.action_masking and not self._is_valid(action): self.tensorboard_log("invalid", category="actions") - return float(model_reward_parameters.get("invalid_action", -2.0)) - - pnl = self.get_unrealized_profit() - # mrr = self.get_most_recent_return() - # mrp = self.get_most_recent_profit() + base_reward = float(model_reward_parameters.get("invalid_action", -2.0)) max_trade_duration = max(self.max_trade_duration_candles, 1) trade_duration = self.get_trade_duration() duration_ratio = trade_duration / max_trade_duration - base_factor = float(model_reward_parameters.get("base_factor", 100.0)) pnl_target = self.profit_aim * self.rr - idle_factor = base_factor * pnl_target / 3.0 - holding_factor = idle_factor - - # # you can use feature values from dataframe - # rsi_now = self.get_feature_value( - # name="%-rsi", - # period=8, - # pair=self.pair, - # timeframe=self.config.get("timeframe"), - # raw=True, - # ) - - # # reward agent for entering trades when RSI is low - # if ( - # action in (Actions.Long_enter.value, Actions.Short_enter.value) - # and self._position == Positions.Neutral - # ): - # if rsi_now < 40: - # factor = 40 / rsi_now - # else: - # factor = 1 - # return 25.0 * factor - - # discourage agent from sitting idle - if action == Actions.Neutral.value and self._position == Positions.Neutral: + idle_factor = base_factor * pnl_target / 4.0 + hold_factor = idle_factor + + # 2. Idle penalty + if ( + base_reward is None + and action == Actions.Neutral.value + and self._position == Positions.Neutral + ): max_idle_duration = int( model_reward_parameters.get( - "max_idle_duration_candles", 2 * max_trade_duration + "max_idle_duration_candles", + ReforceXY.DEFAULT_IDLE_DURATION_MULTIPLIER * max_trade_duration, ) ) idle_penalty_scale = float( @@ -1611,48 +2263,63 @@ class MyRLEnv(Base5ActionRLEnv): ) idle_duration = self.get_idle_duration() idle_duration_ratio = idle_duration / max(1, max_idle_duration) - return ( + base_reward = ( -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power ) - # discourage agent from sitting in position + # 3. Hold overtime penalty if ( - self._position in (Positions.Short, Positions.Long) + base_reward is None + and self._position in (Positions.Short, Positions.Long) and action == Actions.Neutral.value ): - holding_penalty_scale = float( - model_reward_parameters.get("holding_penalty_scale", 0.25) + hold_penalty_scale = float( + model_reward_parameters.get("hold_penalty_scale", 0.25) ) - holding_penalty_power = float( - model_reward_parameters.get("holding_penalty_power", 1.025) + hold_penalty_power = float( + model_reward_parameters.get("hold_penalty_power", 1.025) ) - if duration_ratio < 1.0: - return 0.0 - - return ( - -holding_factor - * holding_penalty_scale - * (duration_ratio - 1.0) ** holding_penalty_power - ) - - # close long - if action == Actions.Long_exit.value and self._position == Positions.Long: - return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio) - - # close short - if action == Actions.Short_exit.value and self._position == Positions.Short: - return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio) + base_reward = 0.0 + else: + base_reward = ( + -hold_factor + * hold_penalty_scale + * (duration_ratio - 1.0) ** hold_penalty_power + ) - return 0.0 + # 4. Exit rewards + pnl = self.get_unrealized_profit() + if ( + base_reward is None + and action == Actions.Long_exit.value + and self._position == Positions.Long + ): + base_reward = pnl * self._get_exit_factor(base_factor, pnl, duration_ratio) + if ( + base_reward is None + and action == Actions.Short_exit.value + and self._position == Positions.Short + ): + base_reward = pnl * self._get_exit_factor(base_factor, pnl, duration_ratio) + + # 5. Default + if base_reward is None: + base_reward = 0.0 + + # 6. Potential-based reward shaping + return self._apply_potential_shaping( + base_reward=base_reward, + action=action, + trade_duration=trade_duration, + max_trade_duration=max_trade_duration, + pnl=pnl, + pnl_target=pnl_target, + ) def _get_observation(self) -> NDArray[np.float32]: - """ - This may or may not be independent of action types, user can inherit - this in their custom "MyRLEnv" - """ start_idx = max(self._start_tick, self._current_tick - self.window_size) end_idx = min(self._current_tick, len(self.signal_features)) features_window = self.signal_features.iloc[start_idx:end_idx] @@ -1762,17 +2429,25 @@ class MyRLEnv(Base5ActionRLEnv): "most_recent_return": round(self.get_most_recent_return(), 5), "most_recent_profit": round(self.get_most_recent_profit(), 5), "total_profit": round(self._total_profit, 5), + "potential": round(self._last_potential, 5), + "shaping_reward": round(self._last_shaping_reward, 5), + "total_shaping_reward": round(self._total_shaping_reward, 5), "reward": round(reward, 5), "total_reward": round(self.total_reward, 5), + "pbrs_invariant": self.is_pbrs_invariant_mode(), "idle_duration": self.get_idle_duration(), "trade_duration": self.get_trade_duration(), "trade_count": int(len(self.trade_history) // 2), } self._update_history(info) + terminated = self.is_terminated() + if terminated: + # Enforce Φ(terminal)=0 for PBRS invariance (Wiewiora et al. 2003) + self._last_potential = 0.0 return ( self._get_observation(), reward, - self.is_terminated(), + terminated, self.is_truncated(), info, ) diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index c8801a3..74dc2e9 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -774,8 +774,7 @@ def fit_regressor( if regressor == "xgboost": from xgboost import XGBRegressor - if model_training_parameters.get("random_state") is None: - model_training_parameters["random_state"] = 1 + model_training_parameters.setdefault("random_state", 1) if trial is not None: model_training_parameters["random_state"] = ( @@ -799,8 +798,7 @@ def fit_regressor( elif regressor == "lightgbm": from lightgbm import LGBMRegressor - if model_training_parameters.get("seed") is None: - model_training_parameters["seed"] = 1 + model_training_parameters.setdefault("seed", 1) if trial is not None: model_training_parameters["seed"] = ( -- 2.43.0