From: Jérôme Benoit Date: Sun, 21 Dec 2025 17:35:10 +0000 (+0100) Subject: fix(pbrs): canonical mode disables additives; migrate prev_potential API X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=fbb75c1a877b9354db574223e4cf7c661d70af16;p=freqai-strategies.git fix(pbrs): canonical mode disables additives; migrate prev_potential API --- diff --git a/ReforceXY/reward_space_analysis/README.md b/ReforceXY/reward_space_analysis/README.md index b41054a..3b00794 100644 --- a/ReforceXY/reward_space_analysis/README.md +++ b/ReforceXY/reward_space_analysis/README.md @@ -321,11 +321,14 @@ where `kernel_function` depends on `exit_attenuation_mode`. See [Exit Attenuatio | `exit_potential_decay` | 0.5 | Decay for progressive_release | | `hold_potential_enabled` | true | Enable hold potential Φ | -PBRS invariance holds when: `exit_potential_mode=canonical` AND -`entry_additive_enabled=false` AND `exit_additive_enabled=false`. Under this -condition the algorithm enforces zero-sum shaping: if the summed shaping term -deviates by more than 1e-6 (`PBRS_INVARIANCE_TOL`), a uniform drift correction -subtracts the mean shaping offset across invariant samples. +PBRS invariance holds when: `exit_potential_mode=canonical`. + +In canonical mode, the entry/exit additive terms are suppressed even if the +corresponding `*_additive_enabled` flags are set. + +Note: PBRS telescoping/zero-sum shaping is a property of coherent trajectories +(episodes). `simulate_samples()` generates synthetic trajectories (state carried +across samples) and does not apply any drift correction in post-processing. #### Hold Potential Transforms @@ -536,10 +539,11 @@ configuration. ### PBRS Configuration -Canonical mode enforces zero-sum shaping (Φ terminal ≈ 0) for theoretical -invariance. Non-canonical modes or additives modify this behavior. Choose -canonical for standard PBRS compliance; use non-canonical when specific shaping -behavior is required. +Canonical mode enforces terminal release (Φ terminal ≈ 0) and suppresses +entry/exit additive terms. + +Non-canonical exit modes can introduce non-zero terminal shaping; enable +additives only when you want those extra terms to contribute. ### Real Data Comparison diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index 575dad2..822ab66 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -109,29 +109,29 @@ ALLOWED_EXIT_POTENTIAL_MODES = { DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { "invalid_action": -2.0, "base_factor": 100.0, - # Idle penalty (env defaults) + # Idle penalty defaults "idle_penalty_scale": 0.5, "idle_penalty_power": 1.025, "max_trade_duration_candles": 128, # Fallback: DEFAULT_IDLE_DURATION_MULTIPLIER * max_trade_duration_candles "max_idle_duration_candles": None, - # Hold penalty (env defaults) + # Hold penalty defaults "hold_penalty_scale": 0.25, "hold_penalty_power": 1.025, - # Exit attenuation (env default) + # Exit attenuation defaults "exit_attenuation_mode": "linear", "exit_plateau": True, "exit_plateau_grace": 1.0, "exit_linear_slope": 1.0, "exit_power_tau": 0.5, "exit_half_life": 0.5, - # Efficiency factor (env defaults) + # Efficiency factor defaults "efficiency_weight": 1.0, "efficiency_center": 0.5, - # Profit factor (env defaults) + # Profit factor defaults "win_reward_factor": 2.0, "pnl_factor_beta": 0.5, - # Invariant / safety (env defaults) + # Invariant / safety defaults "check_invariants": True, "exit_factor_threshold": 1000.0, # === PBRS PARAMETERS === @@ -988,7 +988,7 @@ def _is_valid_action( def _idle_penalty(context: RewardContext, idle_factor: float, params: RewardParams) -> float: - """Mirror the environment's idle penalty behavior.""" + """Compute idle penalty.""" idle_penalty_scale = _get_float_param( params, "idle_penalty_scale", @@ -1005,7 +1005,7 @@ def _idle_penalty(context: RewardContext, idle_factor: float, params: RewardPara def _hold_penalty(context: RewardContext, hold_factor: float, params: RewardParams) -> float: - """Mirror the environment's hold penalty behavior.""" + """Compute hold penalty.""" hold_penalty_scale = _get_float_param( params, "hold_penalty_scale", @@ -1065,7 +1065,7 @@ def calculate_reward( *, short_allowed: bool, action_masking: bool, - previous_potential: float = np.nan, + prev_potential: float = np.nan, ) -> RewardBreakdown: breakdown = RewardBreakdown() @@ -1129,6 +1129,8 @@ def calculate_reward( else: base_reward = 0.0 + breakdown.base_reward = base_reward + # === PBRS INTEGRATION === current_pnl = context.pnl if context.position != Positions.Neutral else 0.0 @@ -1173,37 +1175,48 @@ def calculate_reward( next_duration_ratio = current_duration_ratio # Apply PBRS only if enabled and not neutral self-loop - pbrs_enabled = ( - _get_bool_param( - params, - "hold_potential_enabled", - bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)), - ) - or _get_bool_param( + exit_mode = _get_str_param( + params, + "exit_potential_mode", + str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")), + ) + + hold_potential_enabled = _get_bool_param( + params, + "hold_potential_enabled", + bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)), + ) + entry_additive_enabled = ( + False + if exit_mode == "canonical" + else _get_bool_param( params, "entry_additive_enabled", bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)), ) - or _get_bool_param( + ) + exit_additive_enabled = ( + False + if exit_mode == "canonical" + else _get_bool_param( params, "exit_additive_enabled", bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)), ) ) - if pbrs_enabled and not is_neutral: - # Compute Φ(s) for the current state to preserve telescoping semantics Δ = γ·Φ(s') − Φ(s) - current_potential = _compute_hold_potential( - current_pnl, pnl_target, current_duration_ratio, params - ) - if not np.isfinite(current_potential): - current_potential = 0.0 + pbrs_enabled = bool(hold_potential_enabled or entry_additive_enabled or exit_additive_enabled) - last_potential = ( - float(previous_potential) - if np.isfinite(previous_potential) - else float(current_potential) - ) + if pbrs_enabled: + # Stored potential carried across steps. + prev_potential = float(prev_potential) if np.isfinite(prev_potential) else 0.0 + + if is_neutral: + # Neutral self-loop keeps stored potential unchanged. + breakdown.prev_potential = prev_potential + breakdown.next_potential = prev_potential + breakdown.total = base_reward + return breakdown total_reward, reward_shaping, next_potential, pbrs_delta, entry_additive, exit_additive = ( apply_potential_shaping( @@ -1215,24 +1228,22 @@ def calculate_reward( next_duration_ratio=next_duration_ratio, is_exit=is_exit, is_entry=is_entry, - previous_potential=current_potential, - last_potential=last_potential, + prev_potential=prev_potential, params=params, ) ) breakdown.reward_shaping = reward_shaping - breakdown.prev_potential = current_potential + breakdown.prev_potential = prev_potential breakdown.next_potential = next_potential breakdown.entry_additive = entry_additive breakdown.exit_additive = exit_additive - breakdown.base_reward = base_reward breakdown.pbrs_delta = pbrs_delta - # In canonical mode with additives disabled, this should be ~0 breakdown.invariance_correction = reward_shaping - pbrs_delta breakdown.total = total_reward - else: - breakdown.total = base_reward + return breakdown + + breakdown.total = base_reward return breakdown @@ -1284,7 +1295,18 @@ def simulate_samples( pnl_base_std: float, pnl_duration_vol_scale: float, ) -> pd.DataFrame: - """Simulate synthetic samples for reward analysis.""" + """Simulate synthetic samples for reward analysis. + + The synthetic generator produces a *coherent trajectory* (state carried across samples) + so PJRS/PBRS stored-potential mechanics can be exercised realistically. + + Notes + ----- + - PnL is a state variable while in position (may be non-zero on holds). + - Neutral states always have pnl=0. + - Realized PnL appears on the exit step (position still Long/Short). + """ + rng = random.Random(seed) max_trade_duration_candles = _get_int_param( params, @@ -1293,78 +1315,67 @@ def simulate_samples( ) short_allowed = _is_short_allowed(trading_mode) action_masking = _get_bool_param(params, "action_masking", True) + # Theoretical PBRS invariance flag exit_mode = _get_str_param( params, "exit_potential_mode", str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")), ) - entry_enabled = _get_bool_param( + entry_enabled_raw = _get_bool_param( params, "entry_additive_enabled", bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)), ) - exit_enabled = _get_bool_param( + exit_enabled_raw = _get_bool_param( params, "exit_additive_enabled", bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)), ) + + entry_enabled = bool(entry_enabled_raw) if exit_mode != "canonical" else False + exit_enabled = bool(exit_enabled_raw) if exit_mode != "canonical" else False pbrs_invariant = bool(exit_mode == "canonical" and not (entry_enabled or exit_enabled)) - samples: list[Dict[str, float]] = [] - last_potential: float = 0.0 - for _ in range(num_samples): - if short_allowed: - position_choices = [ - Positions.Neutral, - Positions.Long, - Positions.Short, - ] - position_weights = [0.45, 0.3, 0.25] - else: - position_choices = [Positions.Neutral, Positions.Long] - position_weights = [0.6, 0.4] - position = rng.choices(position_choices, weights=position_weights, k=1)[0] - action = _sample_action(position, rng, short_allowed=short_allowed) + max_idle_duration_candles = get_max_idle_duration_candles( + params, max_trade_duration_candles=max_trade_duration_candles + ) + max_trade_duration_cap = int(max_trade_duration_candles * max_duration_ratio) - if position == Positions.Neutral: - trade_duration = 0 - max_idle_duration_candles = get_max_idle_duration_candles( - params, max_trade_duration_candles=max_trade_duration_candles - ) - idle_duration = int(rng.uniform(0, max_idle_duration_candles)) - else: - trade_duration = int(rng.uniform(1, max_trade_duration_candles * max_duration_ratio)) - trade_duration = max(1, trade_duration) - idle_duration = 0 + samples: list[Dict[str, float]] = [] + prev_potential: float = 0.0 - # Only exit actions should have non-zero PnL - pnl = 0.0 # Initialize as zero for all actions + # Stateful trajectory variables + position = Positions.Neutral + trade_duration = 0 + idle_duration = 0 + pnl = 0.0 + max_unrealized_profit = 0.0 + min_unrealized_profit = 0.0 - # Generate PnL only for exit actions (Long_exit=2, Short_exit=4) - if action in (Actions.Long_exit, Actions.Short_exit): + for _ in range(num_samples): + # Simulate market movement while in position (PnL as a state variable) + if position in (Positions.Long, Positions.Short): duration_ratio = _compute_duration_ratio(trade_duration, max_trade_duration_candles) - - # PnL variance scales with duration for more realistic heteroscedasticity pnl_std = pnl_base_std * (1.0 + pnl_duration_vol_scale * duration_ratio) - pnl = rng.gauss(0.0, pnl_std) - if position == Positions.Long: - pnl += 0.005 * duration_ratio - elif position == Positions.Short: - pnl -= 0.005 * duration_ratio + step_delta = rng.gauss(0.0, pnl_std) - # Clip PnL to realistic range - pnl = min(max(-0.15, pnl), 0.15) + # Small directional drift so signals aren't perfectly symmetric. + drift = 0.001 * duration_ratio + if position == Positions.Long: + step_delta += drift + else: + step_delta -= drift - if position == Positions.Neutral: + pnl = min(max(-0.15, pnl + step_delta), 0.15) + max_unrealized_profit = max(max_unrealized_profit, pnl) + min_unrealized_profit = min(min_unrealized_profit, pnl) + else: + pnl = 0.0 max_unrealized_profit = 0.0 min_unrealized_profit = 0.0 - else: - # Unrealized profit bounds - span = abs(rng.gauss(0.0, 0.015)) - # max >= pnl >= min by construction - max_unrealized_profit = pnl + abs(rng.gauss(0.0, span)) - min_unrealized_profit = pnl - abs(rng.gauss(0.0, span)) + + action = _sample_action(position, rng, short_allowed=short_allowed) context = RewardContext( pnl=pnl, @@ -1384,14 +1395,11 @@ def simulate_samples( risk_reward_ratio, short_allowed=short_allowed, action_masking=action_masking, - previous_potential=last_potential, + prev_potential=prev_potential, ) + prev_potential = breakdown.next_potential - last_potential = breakdown.next_potential - - max_idle_duration_candles = get_max_idle_duration_candles(params) idle_ratio = context.idle_duration / max(1, max_idle_duration_candles) - samples.append( { "pnl": context.pnl, @@ -1423,41 +1431,29 @@ def simulate_samples( } ) - df = pd.DataFrame(samples) + # Transition state + if position == Positions.Neutral: + if action == Actions.Neutral: + idle_duration = min(idle_duration + 1, max_idle_duration_candles) + elif action == Actions.Long_enter: + position = Positions.Long + trade_duration = 0 + idle_duration = 0 + elif action == Actions.Short_enter and short_allowed: + position = Positions.Short + trade_duration = 0 + idle_duration = 0 + else: + idle_duration = 0 + if action == Actions.Neutral: + trade_duration = min(trade_duration + 1, max_trade_duration_cap) + elif action in (Actions.Long_exit, Actions.Short_exit): + position = Positions.Neutral + trade_duration = 0 + idle_duration = 0 - # Enforce PBRS invariance: zero-sum shaping under canonical mode and no additives - try: - exit_mode = _get_str_param( - params, - "exit_potential_mode", - str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")), - ) - entry_enabled = _get_bool_param( - params, - "entry_additive_enabled", - bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)), - ) - exit_enabled = _get_bool_param( - params, - "exit_additive_enabled", - bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)), - ) - if exit_mode == "canonical" and not (entry_enabled or exit_enabled): - if "reward_shaping" in df.columns: - total_shaping = float(df["reward_shaping"].sum()) - if abs(total_shaping) > PBRS_INVARIANCE_TOL: - # Drift correction distributes a constant offset across invariant samples - n_invariant = ( - int(df["pbrs_invariant"].sum()) - if "pbrs_invariant" in df.columns - else int(len(df)) - ) - drift = total_shaping / max(1, n_invariant) - df.loc[:, "reward_shaping"] = df["reward_shaping"] - drift - df.attrs["reward_params"] = dict(params) - except Exception: - # Graceful fallback (no invariance enforcement on failure) - pass + df = pd.DataFrame(samples) + df.attrs["reward_params"] = dict(params) # Validate critical algorithmic invariants _validate_simulation_invariants(df) @@ -1466,64 +1462,40 @@ def simulate_samples( def _validate_simulation_invariants(df: pd.DataFrame) -> None: - """Fail fast if simulation violates PnL or action invariants.""" - # INVARIANT 1: PnL Conservation - Total PnL must equal sum of exit PnL - total_pnl = df["pnl"].sum() - exit_action_mask = df["action"].isin([2.0, 4.0]) - exit_pnl_sum = df.loc[exit_action_mask, "pnl"].sum() - - # Tolerances from INTERNAL_GUARDS to handle backend/OS numeric epsilons - tol_pnl = float(INTERNAL_GUARDS.get("sim_pnl_conservation_tol", 1e-10)) + """Fail fast if simulation violates action/state invariants.""" + eps_pnl = float(INTERNAL_GUARDS.get("sim_zero_pnl_epsilon", 1e-12)) eps_reward = float(INTERNAL_GUARDS.get("sim_zero_reward_epsilon", 1e-12)) thr_extreme = float(INTERNAL_GUARDS.get("sim_extreme_pnl_threshold", 0.2)) - pnl_diff = abs(total_pnl - exit_pnl_sum) - if pnl_diff > tol_pnl: - raise AssertionError( - f"PnL INVARIANT VIOLATION: Total PnL ({total_pnl:.6f}) != " - f"Exit PnL sum ({exit_pnl_sum:.6f}), difference = {pnl_diff:.2e}" - ) - - # INVARIANT 2: PnL Exclusivity - Only exit actions should have non-zero PnL - non_zero_pnl_actions = set(df[df["pnl"].abs() > eps_pnl]["action"].unique()) - valid_exit_actions = {2.0, 4.0} - invalid_actions = non_zero_pnl_actions - valid_exit_actions - if invalid_actions: - raise AssertionError( - f"PnL EXCLUSIVITY VIOLATION: Non-exit actions {invalid_actions} have non-zero PnL" - ) - - # INVARIANT 3: Exit Reward Consistency - Non-zero exit rewards require non-zero PnL - inconsistent_exits = df[(df["pnl"].abs() <= eps_pnl) & (df["reward_exit"].abs() > eps_reward)] - if len(inconsistent_exits) > 0: - raise AssertionError( - f"EXIT REWARD INCONSISTENCY: {len(inconsistent_exits)} actions have " - f"zero PnL but non-zero exit reward" - ) - - # INVARIANT 4: Action-Position Compatibility - # Validate that exit actions match positions - long_exits = df[ - (df["action"] == 2.0) & (df["position"] != 1.0) - ] # Long_exit but not Long position - short_exits = df[ - (df["action"] == 4.0) & (df["position"] != 0.0) - ] # Short_exit but not Short position - + # INVARIANT 1: Action-position compatibility + long_exits = df[(df["action"] == 2.0) & (df["position"] != 1.0)] + short_exits = df[(df["action"] == 4.0) & (df["position"] != 0.0)] if len(long_exits) > 0: raise AssertionError( f"ACTION-POSITION INCONSISTENCY: {len(long_exits)} Long_exit actions " f"without Long position" ) - if len(short_exits) > 0: raise AssertionError( f"ACTION-POSITION INCONSISTENCY: {len(short_exits)} Short_exit actions " f"without Short position" ) - # INVARIANT 5: Duration Logic - Neutral positions should have trade_duration = 0 + long_entries = df[(df["action"] == 1.0) & (df["position"] != 0.5)] + short_entries = df[(df["action"] == 3.0) & (df["position"] != 0.5)] + if len(long_entries) > 0: + raise AssertionError( + f"ACTION-POSITION INCONSISTENCY: {len(long_entries)} Long_enter actions " + f"without Neutral position" + ) + if len(short_entries) > 0: + raise AssertionError( + f"ACTION-POSITION INCONSISTENCY: {len(short_entries)} Short_enter actions " + f"without Neutral position" + ) + + # INVARIANT 2: Duration logic neutral_with_trade = df[(df["position"] == 0.5) & (df["trade_duration"] > 0)] if len(neutral_with_trade) > 0: raise AssertionError( @@ -1531,10 +1503,34 @@ def _validate_simulation_invariants(df: pd.DataFrame) -> None: f"with non-zero trade_duration" ) - # INVARIANT 6: Bounded Values - Check realistic bounds - extreme_pnl = df[(df["pnl"].abs() > thr_extreme)] # Beyond reasonable range + inpos_with_idle = df[(df["position"] != 0.5) & (df["idle_duration"] > 0)] + if len(inpos_with_idle) > 0: + raise AssertionError( + f"DURATION LOGIC VIOLATION: {len(inpos_with_idle)} In-position samples " + f"with idle_duration > 0" + ) + + # INVARIANT 3: Neutral states have zero PnL (simulation design) + neutral_with_pnl = df[(df["position"] == 0.5) & (df["pnl"].abs() > eps_pnl)] + if len(neutral_with_pnl) > 0: + raise AssertionError( + f"PNL LOGIC VIOLATION: {len(neutral_with_pnl)} Neutral positions with non-zero pnl" + ) + + # INVARIANT 4: Exit rewards only appear on exit actions + non_exit_with_exit_reward = df[ + (~df["action"].isin([2.0, 4.0])) & (df["reward_exit"].abs() > eps_reward) + ] + if len(non_exit_with_exit_reward) > 0: + raise AssertionError( + f"EXIT REWARD INCONSISTENCY: {len(non_exit_with_exit_reward)} non-exit actions " + f"have non-zero exit reward" + ) + + # INVARIANT 5: Bounded values + extreme_pnl = df[(df["pnl"].abs() > thr_extreme)] if len(extreme_pnl) > 0: - max_abs_pnl = df["pnl"].abs().max() + max_abs_pnl = float(df["pnl"].abs().max()) raise AssertionError( f"BOUNDS VIOLATION: {len(extreme_pnl)} samples with extreme PnL, " f"max |PnL| = {max_abs_pnl:.6f}" @@ -2837,8 +2833,8 @@ def _compute_exit_additive( ) -def _compute_exit_potential(last_potential: float, params: RewardParams) -> float: - """Exit potential per mode (canonical/non_canonical -> 0; others transform Φ).""" +def _compute_exit_potential(prev_potential: float, params: RewardParams) -> float: + """Exit potential per mode (canonical/non_canonical -> 0; others transform Φ(prev)).""" mode = _get_str_param( params, "exit_potential_mode", @@ -2867,15 +2863,15 @@ def _compute_exit_potential(last_potential: float, params: RewardParams) -> floa stacklevel=2, ) decay = 1.0 - next_potential = last_potential * (1.0 - decay) + next_potential = prev_potential * (1.0 - decay) elif mode == "spike_cancel": gamma = _get_potential_gamma(params) if gamma <= 0.0 or not np.isfinite(gamma): - next_potential = last_potential + next_potential = prev_potential else: - next_potential = last_potential / gamma + next_potential = prev_potential / gamma elif mode == "retain_previous": - next_potential = last_potential + next_potential = prev_potential else: _warn_unknown_mode( "exit_potential_mode", @@ -2899,73 +2895,86 @@ def apply_potential_shaping( next_pnl: float, next_duration_ratio: float, params: RewardParams, + *, is_exit: bool = False, is_entry: bool = False, - previous_potential: float = np.nan, - last_potential: Optional[float] = None, + prev_potential: float, ) -> tuple[float, float, float, float, float, float]: - """Compute shaped reward with explicit PBRS semantics. + """Compute shaped reward using PBRS. Returns ------- tuple[float, float, float, float, float, float] (reward, reward_shaping, next_potential, pbrs_delta, entry_additive, exit_additive) - where pbrs_delta = gamma * next_potential - prev_term is the pure PBRS component. + where pbrs_delta = gamma * next_potential - prev_potential is the pure PBRS component. Notes ----- - Shaping Δ = γ·Φ(next) − Φ(prev). - - previous_potential: - Previously computed Φ(s) for the prior transition. When provided and finite, it - is used as Φ(prev) in Δ; otherwise Φ(prev) is derived from the current state. - - last_potential: - Potential used to compute terminal Φ′ at exit via _compute_exit_potential(). - Fallback logic: if last_potential is None or non-finite, then last_potential := previous_potential - (or the derived prev term) to preserve telescoping semantics. + - Φ(prev) must be provided explicitly as the stored potential carried across steps. + This uses an explicit stored-potential value across steps. + - Exit potential modes compute Φ(next) from Φ(prev). - Entry additive is applied only on entry transitions (based on next_* metrics). - Exit additive is applied only on exit transitions (based on current_* metrics). - - Canonical invariance: when exit_potential_mode == 'canonical' and additives are disabled, - the telescoping sum ensures Σ reward_shaping ≈ 0 across a complete episode. + + Note + ---------------------- + Canonical mode is typically evaluated with additives disabled externally. + This helper intentionally does not mutate `params`. """ - params = _enforce_pbrs_invariance(params) gamma = _get_potential_gamma(params) - # Use provided previous_potential when finite; otherwise derive from current state - prev_term = ( - float(previous_potential) - if np.isfinite(previous_potential) - else _compute_hold_potential(current_pnl, pnl_target, current_duration_ratio, params) + prev_potential = float(prev_potential) if np.isfinite(prev_potential) else 0.0 + + exit_mode = _get_str_param( + params, + "exit_potential_mode", + str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")), + ) + canonical_mode = exit_mode == "canonical" + + hold_potential_enabled = _get_bool_param( + params, + "hold_potential_enabled", + bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)), ) - if not np.isfinite(prev_term): - prev_term = 0.0 if is_exit: - last_potential = ( - float(last_potential) - if (last_potential is not None and np.isfinite(last_potential)) - else float(prev_term) - ) - next_potential = _compute_exit_potential(last_potential, params) + next_potential = _compute_exit_potential(prev_potential, params) + # PBRS shaping Δ = γ·Φ(next) − Φ(prev) + pbrs_delta = gamma * next_potential - prev_potential + reward_shaping = pbrs_delta else: - next_potential = _compute_hold_potential(next_pnl, pnl_target, next_duration_ratio, params) - - # PBRS shaping Δ = γ·Φ(next) − Φ(prev) - pbrs_delta = gamma * next_potential - float(prev_term) - reward_shaping = pbrs_delta + # When hold potential is disabled, force Φ(next)=0 and emit no PBRS shaping on entry/hold. + if not hold_potential_enabled: + next_potential = 0.0 + pbrs_delta = 0.0 + reward_shaping = 0.0 + else: + next_potential = _compute_hold_potential( + next_pnl, pnl_target, next_duration_ratio, params + ) + # PBRS shaping Δ = γ·Φ(next) − Φ(prev) + pbrs_delta = gamma * next_potential - prev_potential + reward_shaping = pbrs_delta # Non-PBRS additives - cand_entry_add = _compute_entry_additive(next_pnl, pnl_target, next_duration_ratio, params) - cand_exit_add = _compute_exit_additive(current_pnl, pnl_target, current_duration_ratio, params) + if canonical_mode: + entry_additive = 0.0 + exit_additive = 0.0 + else: + cand_entry_add = _compute_entry_additive(next_pnl, pnl_target, next_duration_ratio, params) + cand_exit_add = _compute_exit_additive( + current_pnl, pnl_target, current_duration_ratio, params + ) - entry_additive = cand_entry_add if is_entry else 0.0 - exit_additive = cand_exit_add if is_exit else 0.0 + entry_additive = cand_entry_add if is_entry else 0.0 + exit_additive = cand_exit_add if is_exit else 0.0 reward = base_reward + reward_shaping + entry_additive + exit_additive if not np.isfinite(reward): return float(base_reward), 0.0, 0.0, 0.0, 0.0, 0.0 - if np.isclose(reward_shaping, 0.0): - reward_shaping = 0.0 - pbrs_delta = 0.0 + return ( float(reward), float(reward_shaping), @@ -2976,46 +2985,6 @@ def apply_potential_shaping( ) -def _enforce_pbrs_invariance(params: RewardParams) -> RewardParams: - """Disable entry/exit additives once in canonical PBRS to preserve invariance.""" - mode = _get_str_param( - params, - "exit_potential_mode", - str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")), - ) - if mode != "canonical": - return params - if params.get("_pbrs_invariance_applied"): - return params - entry_enabled = _get_bool_param( - params, - "entry_additive_enabled", - bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)), - ) - exit_enabled = _get_bool_param( - params, - "exit_additive_enabled", - bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)), - ) - # Strict canonical enforcement - if entry_enabled: - warnings.warn( - "Disabling entry additive to preserve PBRS invariance (canonical mode).", - RewardDiagnosticsWarning, - stacklevel=2, - ) - params["entry_additive_enabled"] = False - if exit_enabled: - warnings.warn( - "Disabling exit additive to preserve PBRS invariance (canonical mode).", - RewardDiagnosticsWarning, - stacklevel=2, - ) - params["exit_additive_enabled"] = False - params["_pbrs_invariance_applied"] = True - return params - - def _compute_bi_component( kind: str, pnl: float, @@ -3098,7 +3067,7 @@ def build_argument_parser() -> argparse.ArgumentParser: "--base_factor", type=float, default=100.0, - help="Base reward factor used inside the environment (default: 100).", + help="Base reward scaling factor (default: 100).", ) parser.add_argument( "--profit_aim", diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py index 071bb41..4569cd5 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py @@ -466,7 +466,7 @@ def main(): fp = " ".join(line.strip().split())[:160] warnings_breakdown[fp] = warnings_breakdown.get(fp, 0) + 1 - # Collect environment + reproducibility metadata + # Collect reproducibility metadata def _git_hash() -> Optional[str]: try: proc = subprocess.run( diff --git a/ReforceXY/reward_space_analysis/tests/.docstring_template.md b/ReforceXY/reward_space_analysis/tests/.docstring_template.md index d36f611..0551814 100644 --- a/ReforceXY/reward_space_analysis/tests/.docstring_template.md +++ b/ReforceXY/reward_space_analysis/tests/.docstring_template.md @@ -99,12 +99,12 @@ def test_pbrs_terminal_state_comprehensive(self): - Assert: next_potential ≈ 0 within TOLERANCE.IDENTITY_RELAXED 2. Part B: Shaping recovery verification - - Verify: reward_shaping ≈ -gamma * last_potential - - Checks proper potential recovery mechanism + - Verify: reward_shaping ≈ -prev_potential (canonical exit) + - Checks proper potential release at terminal - 3. Part C: Cumulative drift analysis + 3. Part C: Cumulative shaping magnitude - Track cumulative shaping over 100-episode sequence - - Assert: Bounded drift (no systematic bias accumulation) + - Assert: Bounded magnitude (no systematic bias accumulation) **Setup:** - Exit modes: [progressive_release, spike_cancel, canonical] @@ -114,8 +114,8 @@ def test_pbrs_terminal_state_comprehensive(self): **Assertions:** - Terminal potential: |next_potential| < TOLERANCE.IDENTITY_RELAXED - - Shaping recovery: |shaping + gamma*last_pot| < TOLERANCE.IDENTITY_RELAXED - - Cumulative drift: |sum(shaping)| < 10 * TOLERANCE.IDENTITY_RELAXED + - Shaping recovery: |shaping + prev_potential| < TOLERANCE.IDENTITY_RELAXED + - Cumulative sum bound: |sum(shaping)| < 10 * TOLERANCE.IDENTITY_RELAXED **Tolerance rationale:** - IDENTITY_RELAXED: PBRS calculations involve gamma discounting, diff --git a/ReforceXY/reward_space_analysis/tests/README.md b/ReforceXY/reward_space_analysis/tests/README.md index d55daf6..e20f7c1 100644 --- a/ReforceXY/reward_space_analysis/tests/README.md +++ b/ReforceXY/reward_space_analysis/tests/README.md @@ -8,7 +8,7 @@ policies, maintenance workflows, and full coverage mapping. The suite enforces: - Reward component mathematics & transform correctness -- PBRS invariance mechanics (canonical drift correction, near-zero +- PBRS shaping mechanics (canonical exit semantics, near-zero classification) - Robustness under extreme / invalid parameter settings - Statistical metrics integrity (bootstrap, constant distributions) @@ -183,7 +183,7 @@ Columns: | robustness-negative-grace-clamp-103 | robustness | Negative exit_plateau_grace clamps to 0.0 w/ warning | robustness/test_robustness.py:555 | | | robustness-invalid-power-tau-104 | robustness | Invalid power tau falls back alpha=1.0 w/ warning | robustness/test_robustness.py:592 | | | robustness-near-zero-half-life-105 | robustness | Near-zero half life yields no attenuation (factor≈base) | robustness/test_robustness.py:621 | | -| pbrs-canonical-drift-correction-106 | pbrs | Canonical drift correction enforces near zero-sum shaping | pbrs/test_pbrs.py:449 | Multi-path: extension fallback (475), comparison path (517) | +| pbrs-canonical-exit-semantic-106 | pbrs | Canonical exit uses shaping=-prev_potential and next_potential=0.0 | pbrs/test_pbrs.py:449 | Uses stored potential across steps; no drift correction applied | | pbrs-canonical-near-zero-report-116 | pbrs | Canonical near-zero cumulative shaping classification | pbrs/test_pbrs.py:748 | Full report classification | | statistics-partial-deps-skip-107 | statistics | skip_partial_dependence => empty PD structures | statistics/test_statistics.py:28 | Docstring line | | helpers-duplicate-rows-drop-108 | helpers | Duplicate rows dropped w/ warning counting removals | helpers/test_utilities.py:26 | Docstring line | @@ -291,8 +291,8 @@ grep -R "" -n . Expect a single directory path. Examples: ```shell -grep -R "drift_correction" -n . grep -R "near_zero" -n . +grep -R "pbrs_delta" -n . ``` ## Coverage Parity Notes diff --git a/ReforceXY/reward_space_analysis/tests/components/test_additives.py b/ReforceXY/reward_space_analysis/tests/components/test_additives.py index a06302b..ae16fed 100644 --- a/ReforceXY/reward_space_analysis/tests/components/test_additives.py +++ b/ReforceXY/reward_space_analysis/tests/components/test_additives.py @@ -72,10 +72,10 @@ class TestAdditivesDeterministicContribution(RewardSpaceTestBase): "is_exit": False, } _t0, s0, _n0, _pbrs0, _entry0, _exit0 = apply_potential_shaping( - last_potential=0.0, params=base, **ctx + prev_potential=0.0, params=base, **ctx ) t1, s1, _n1, _pbrs1, _entry1, _exit1 = apply_potential_shaping( - last_potential=0.0, params=with_add, **ctx + prev_potential=0.0, params=with_add, **ctx ) self.assertFinite(t1) self.assertFinite(s1) diff --git a/ReforceXY/reward_space_analysis/tests/helpers/test_internal_branches.py b/ReforceXY/reward_space_analysis/tests/helpers/test_internal_branches.py index 4581fc1..6cb1339 100644 --- a/ReforceXY/reward_space_analysis/tests/helpers/test_internal_branches.py +++ b/ReforceXY/reward_space_analysis/tests/helpers/test_internal_branches.py @@ -95,7 +95,7 @@ def test_calculate_reward_unrealized_pnl_hold_path(): risk_reward_ratio=1.0, short_allowed=True, action_masking=True, - previous_potential=np.nan, + prev_potential=np.nan, ) assert math.isfinite(breakdown.prev_potential) assert math.isfinite(breakdown.next_potential) diff --git a/ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py b/ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py index 9932e57..2c69ec8 100644 --- a/ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py +++ b/ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py @@ -33,8 +33,6 @@ from ..constants import ( TOLERANCE, ) from ..helpers import ( - assert_non_canonical_shaping_exceeds, - assert_pbrs_canonical_sum_within_tolerance, assert_pbrs_invariance_report_classification, assert_relaxed_multi_reason_aggregation, build_validation_case, @@ -89,7 +87,7 @@ class TestPBRS(RewardSpaceTestBase): next_duration_ratio=0.0, is_exit=True, is_entry=False, - last_potential=0.789, + prev_potential=prev_potential, params=params, ) self.assertAlmostEqualFloat(next_potential, 0.0, tolerance=TOLERANCE.IDENTITY_RELAXED) @@ -137,7 +135,7 @@ class TestPBRS(RewardSpaceTestBase): next_duration_ratio=0.0, is_exit=True, is_entry=False, - last_potential=prev_potential, + prev_potential=prev_potential, params=params, ) self.assertAlmostEqualFloat( @@ -145,10 +143,15 @@ class TestPBRS(RewardSpaceTestBase): ) self.assertNearZero(reward_shaping, atol=TOLERANCE.IDENTITY_RELAXED) - # ---------------- Invariance sum checks (simulate_samples) ---------------- # + # ---------------- Invariance flags (simulate_samples) ---------------- # - def test_canonical_invariance_flag_and_sum(self): - """Canonical mode + no additives -> invariant flags True and Σ shaping ≈ 0.""" + def test_canonical_invariance_flag(self): + """Canonical mode + no additives -> invariant flag True per-sample. + + Note: `simulate_samples()` generates synthetic trajectories (coherent episodes). + This test only verifies the per-sample invariance flag and numeric stability; it does not + assert any telescoping/zero-sum property for the shaping term. + """ params = self.base_params( exit_potential_mode="canonical", @@ -170,11 +173,11 @@ class TestPBRS(RewardSpaceTestBase): ) unique_flags = set(df["pbrs_invariant"].unique().tolist()) self.assertEqual(unique_flags, {True}, f"Unexpected invariant flags: {unique_flags}") - total_shaping = float(df["reward_shaping"].sum()) - assert_pbrs_canonical_sum_within_tolerance(self, total_shaping, PBRS_INVARIANCE_TOL) + self.assertTrue(np.isfinite(df["reward_shaping"]).all()) + self.assertLessEqual(float(df["reward_shaping"].abs().max()), PBRS.MAX_ABS_SHAPING) def test_non_canonical_flag_false_and_sum_nonzero(self): - """Non-canonical mode -> invariant flags False and Σ shaping significantly non-zero.""" + """Non-canonical mode -> invariant flags False and Σ shaping non-zero.""" params = self.base_params( exit_potential_mode="progressive_release", @@ -197,8 +200,12 @@ class TestPBRS(RewardSpaceTestBase): ) unique_flags = set(df["pbrs_invariant"].unique().tolist()) self.assertEqual(unique_flags, {False}, f"Unexpected invariant flags: {unique_flags}") - total_shaping = float(df["reward_shaping"].sum()) - assert_non_canonical_shaping_exceeds(self, total_shaping, PBRS_INVARIANCE_TOL * 10) + abs_sum = float(df["reward_shaping"].abs().sum()) + self.assertGreater( + abs_sum, + PBRS_INVARIANCE_TOL * 2, + f"Expected non-trivial shaping magnitude (got {abs_sum})", + ) # ---------------- Additives and canonical path mechanics ---------------- # @@ -215,14 +222,94 @@ class TestPBRS(RewardSpaceTestBase): ) self.assertEqual(float(val_exit), 0.0) + def test_hold_potential_disabled_forces_zero_potential_on_entry(self): + """hold_potential_enabled=False: entry sets Φ(next)=0 and no shaping.""" + params = self.base_params( + hold_potential_enabled=False, + exit_potential_mode="canonical", + entry_additive_enabled=False, + exit_additive_enabled=False, + potential_gamma=0.93, + ) + ( + total, + reward_shaping, + next_potential, + pbrs_delta, + entry_additive, + exit_additive, + ) = apply_potential_shaping( + base_reward=0.25, + current_pnl=0.0, + pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO, + current_duration_ratio=0.0, + next_pnl=0.01, + next_duration_ratio=0.0, + is_exit=False, + is_entry=True, + prev_potential=0.42, + params=params, + ) + self.assertPlacesEqual(next_potential, 0.0, places=TOLERANCE.DECIMAL_PLACES_STRICT) + self.assertNearZero(reward_shaping, atol=TOLERANCE.IDENTITY_STRICT) + self.assertNearZero(pbrs_delta, atol=TOLERANCE.IDENTITY_STRICT) + self.assertNearZero(entry_additive, atol=TOLERANCE.IDENTITY_STRICT) + self.assertNearZero(exit_additive, atol=TOLERANCE.IDENTITY_STRICT) + self.assertAlmostEqualFloat( + total, + 0.25, + tolerance=TOLERANCE.IDENTITY_STRICT, + msg="Entry shaping must be suppressed when hold potential disabled", + ) + + def test_hold_potential_disabled_forces_zero_potential_on_hold(self): + """hold_potential_enabled=False: hold sets Φ(next)=0 and no shaping.""" + params = self.base_params( + hold_potential_enabled=False, + exit_potential_mode="canonical", + entry_additive_enabled=False, + exit_additive_enabled=False, + potential_gamma=0.93, + ) + ( + total, + reward_shaping, + next_potential, + pbrs_delta, + _entry_additive, + _exit_additive, + ) = apply_potential_shaping( + base_reward=-0.1, + current_pnl=0.02, + pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO, + current_duration_ratio=0.4, + next_pnl=0.02, + next_duration_ratio=0.41, + is_exit=False, + is_entry=False, + prev_potential=0.5, + params=params, + ) + self.assertPlacesEqual(next_potential, 0.0, places=TOLERANCE.DECIMAL_PLACES_STRICT) + self.assertNearZero(reward_shaping, atol=TOLERANCE.IDENTITY_STRICT) + self.assertNearZero(pbrs_delta, atol=TOLERANCE.IDENTITY_STRICT) + self.assertAlmostEqualFloat( + total, + -0.1, + tolerance=TOLERANCE.IDENTITY_STRICT, + msg="Hold shaping must be suppressed when hold potential disabled", + ) + def test_exit_potential_canonical(self): - """Verifies canonical exit resets potential and auto-disables additives.""" + """Verifies canonical exit resets potential (no params mutation).""" params = self.base_params( exit_potential_mode="canonical", hold_potential_enabled=True, - entry_additive_enabled=True, - exit_additive_enabled=True, + entry_additive_enabled=False, + exit_additive_enabled=False, ) + params_before = dict(params) + base_reward = 0.25 current_pnl = 0.05 current_duration_ratio = 0.4 @@ -238,75 +325,88 @@ class TestPBRS(RewardSpaceTestBase): next_duration_ratio=next_duration_ratio, is_exit=True, is_entry=False, - last_potential=0.789, + prev_potential=0.789, params=params, ) ) - self.assertIn("_pbrs_invariance_applied", params) - self.assertFalse( - params["entry_additive_enabled"], - "Entry additive should be auto-disabled in canonical mode", - ) - self.assertFalse( - params["exit_additive_enabled"], - "Exit additive should be auto-disabled in canonical mode", - ) + + self.assertEqual(params, params_before, "apply_potential_shaping must not mutate params") self.assertPlacesEqual(next_potential, 0.0, places=TOLERANCE.DECIMAL_PLACES_STRICT) - current_potential = _compute_hold_potential( - current_pnl, - PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO, - current_duration_ratio, - {"hold_potential_enabled": True, "hold_potential_scale": 1.0}, - ) - self.assertAlmostEqual(shaping, -current_potential, delta=TOLERANCE.IDENTITY_RELAXED) + self.assertAlmostEqual(shaping, -0.789, delta=TOLERANCE.IDENTITY_RELAXED) residual = total - base_reward - shaping self.assertAlmostEqual(residual, 0.0, delta=TOLERANCE.IDENTITY_RELAXED) self.assertTrue(np.isfinite(total)) - def test_pbrs_invariance_internal_flag_set(self): - """Verifies canonical path sets _pbrs_invariance_applied flag (idempotent).""" + def test_canonical_mode_suppresses_additives_even_if_enabled(self): + """Verifies canonical mode forces entry/exit additive terms to zero.""" params = self.base_params( exit_potential_mode="canonical", hold_potential_enabled=True, entry_additive_enabled=True, exit_additive_enabled=True, + entry_additive_scale=10.0, + exit_additive_scale=10.0, ) - terminal_next_potentials, shaping_values = self._canonical_sweep(params) - _t1, _s1, _n1, _pbrs_delta, _entry_additive, _exit_additive = apply_potential_shaping( + + ( + _total_entry, + _shaping_entry, + _next_potential_entry, + _pbrs_delta_entry, + entry_additive, + exit_additive_entry, + ) = apply_potential_shaping( base_reward=0.0, - current_pnl=0.05, + current_pnl=0.0, pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO, - current_duration_ratio=0.3, - next_pnl=0.0, + current_duration_ratio=0.0, + next_pnl=0.02, next_duration_ratio=0.0, - is_exit=True, - is_entry=False, - last_potential=0.4, + is_exit=False, + is_entry=True, + prev_potential=0.0, params=params, ) - self.assertIn("_pbrs_invariance_applied", params) - self.assertFalse(params["entry_additive_enabled"]) - self.assertFalse(params["exit_additive_enabled"]) - if terminal_next_potentials: - self.assertTrue(all((abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials))) - max_abs = max((abs(v) for v in shaping_values)) if shaping_values else 0.0 - self.assertLessEqual(max_abs, PBRS.MAX_ABS_SHAPING) - state_after = (params["entry_additive_enabled"], params["exit_additive_enabled"]) - _t2, _s2, _n2, _pbrs_delta2, _entry_additive2, _exit_additive2 = apply_potential_shaping( + self.assertNearZero(entry_additive, atol=TOLERANCE.IDENTITY_STRICT) + self.assertNearZero(exit_additive_entry, atol=TOLERANCE.IDENTITY_STRICT) + + ( + _total_exit, + _shaping_exit, + _next_potential_exit, + _pbrs_delta_exit, + entry_additive_exit, + exit_additive, + ) = apply_potential_shaping( base_reward=0.0, current_pnl=0.02, pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO, - current_duration_ratio=0.1, + current_duration_ratio=0.5, next_pnl=0.0, next_duration_ratio=0.0, is_exit=True, is_entry=False, - last_potential=0.1, + prev_potential=0.4, params=params, ) - self.assertEqual( - state_after, (params["entry_additive_enabled"], params["exit_additive_enabled"]) + self.assertNearZero(entry_additive_exit, atol=TOLERANCE.IDENTITY_STRICT) + self.assertNearZero(exit_additive, atol=TOLERANCE.IDENTITY_STRICT) + + def test_canonical_sweep_does_not_require_param_enforcement(self): + """Verifies canonical sweep runs without mutating params.""" + params = self.base_params( + exit_potential_mode="canonical", + hold_potential_enabled=True, + entry_additive_enabled=False, + exit_additive_enabled=False, ) + params_before = dict(params) + terminal_next_potentials, shaping_values = self._canonical_sweep(params) + self.assertEqual(params, params_before) + if terminal_next_potentials: + self.assertTrue(all((abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials))) + max_abs = max((abs(v) for v in shaping_values)) if shaping_values else 0.0 + self.assertLessEqual(max_abs, PBRS.MAX_ABS_SHAPING) def test_progressive_release_negative_decay_clamped(self): """Verifies negative decay clamping: next potential equals last potential.""" @@ -315,7 +415,7 @@ class TestPBRS(RewardSpaceTestBase): exit_potential_decay=-0.75, hold_potential_enabled=True, ) - last_potential = 0.42 + prev_potential = 0.42 total, shaping, next_potential, _pbrs_delta, _entry_additive, _exit_additive = ( apply_potential_shaping( base_reward=0.0, @@ -325,12 +425,12 @@ class TestPBRS(RewardSpaceTestBase): next_pnl=0.0, next_duration_ratio=0.0, is_exit=True, - last_potential=last_potential, + prev_potential=prev_potential, params=params, ) ) self.assertPlacesEqual( - next_potential, last_potential, places=TOLERANCE.DECIMAL_PLACES_STRICT + next_potential, prev_potential, places=TOLERANCE.DECIMAL_PLACES_STRICT ) gamma_raw = DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95) gamma_fallback = 0.95 if gamma_raw is None else gamma_raw @@ -338,7 +438,11 @@ class TestPBRS(RewardSpaceTestBase): gamma = float(gamma_fallback) except Exception: gamma = 0.95 - self.assertLessEqual(abs(shaping - gamma * last_potential), TOLERANCE.GENERIC_EQ) + # PBRS shaping Δ = γ·Φ(next) − Φ(prev). Here Φ(next)=Φ(prev) since decay clamps to 0. + self.assertLessEqual( + abs(shaping - ((gamma - 1.0) * prev_potential)), + TOLERANCE.GENERIC_EQ, + ) self.assertPlacesEqual(total, shaping, places=TOLERANCE.DECIMAL_PLACES_STRICT) def test_potential_gamma_nan_fallback(self): @@ -354,7 +458,7 @@ class TestPBRS(RewardSpaceTestBase): next_pnl=0.035, next_duration_ratio=0.25, is_exit=False, - last_potential=0.0, + prev_potential=0.0, params=params_nan, ) params_ref = self.base_params(potential_gamma=default_gamma, hold_potential_enabled=True) @@ -366,7 +470,7 @@ class TestPBRS(RewardSpaceTestBase): next_pnl=0.035, next_duration_ratio=0.25, is_exit=False, - last_potential=0.0, + prev_potential=0.0, params=params_ref, ) self.assertLess( @@ -494,11 +598,11 @@ class TestPBRS(RewardSpaceTestBase): self.assertLess(cumulative, -TOLERANCE.NEGLIGIBLE) self.assertGreater(abs(cumulative), 10 * TOLERANCE.IDENTITY_RELAXED) - # ---------------- Drift correction invariants (simulate_samples) ---------------- # + def test_exit_step_shaping_matches_exit_step_rules(self): + """Exit step: shaping uses stored prev_potential. - # Owns invariant: pbrs-canonical-drift-correction-106 - def test_pbrs_106_canonical_drift_correction_zero_sum(self): - """Invariant 106: canonical mode enforces near zero-sum shaping (drift correction).""" + For canonical mode, next_potential must be 0 and shaping_delta = -prev_potential. + """ params = self.base_params( exit_potential_mode="canonical", @@ -507,75 +611,52 @@ class TestPBRS(RewardSpaceTestBase): exit_additive_enabled=False, potential_gamma=0.94, ) - df = simulate_samples( - params={**params, "max_trade_duration_candles": 100}, - num_samples=SCENARIOS.SAMPLE_SIZE_MEDIUM, - seed=SEEDS.BASE, - base_factor=PARAMS.BASE_FACTOR, - profit_aim=PARAMS.PROFIT_AIM, - risk_reward_ratio=PARAMS.RISK_REWARD_RATIO, - max_duration_ratio=2.0, - trading_mode="margin", - pnl_base_std=PARAMS.PNL_STD, - pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE, + prev_potential = 0.42 + ( + _total_reward, + reward_shaping, + next_potential, + pbrs_delta, + _entry_additive, + _exit_additive, + ) = apply_potential_shaping( + base_reward=0.0, + current_pnl=0.012, + pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO, + current_duration_ratio=0.3, + next_pnl=0.0, + next_duration_ratio=0.0, + is_exit=True, + is_entry=False, + prev_potential=prev_potential, + params=params, ) - total_shaping = float(df["reward_shaping"].sum()) - assert_pbrs_canonical_sum_within_tolerance(self, total_shaping, PBRS_INVARIANCE_TOL) - flags = set(df["pbrs_invariant"].unique().tolist()) - self.assertEqual(flags, {True}, f"Unexpected invariance flags canonical: {flags}") - - # Owns invariant (extension path): pbrs-canonical-drift-correction-106 - def test_pbrs_106_canonical_drift_correction_exception_fallback(self): - """Invariant 106 (extension): exception path graceful fallback.""" - params = self.base_params( - exit_potential_mode="canonical", - hold_potential_enabled=True, - entry_additive_enabled=False, - exit_additive_enabled=False, - potential_gamma=0.91, + self.assertPlacesEqual(next_potential, 0.0, places=TOLERANCE.DECIMAL_PLACES_STRICT) + self.assertAlmostEqualFloat( + reward_shaping, + -prev_potential, + tolerance=TOLERANCE.IDENTITY_RELAXED, + msg="Canonical exit shaping should be -prev_potential", + ) + self.assertAlmostEqualFloat( + pbrs_delta, + -prev_potential, + tolerance=TOLERANCE.IDENTITY_RELAXED, + msg="Canonical exit PBRS delta should be -prev_potential", ) - original_sum = pd.DataFrame.sum - def boom(self, *args, **kwargs): # noqa: D401 - if isinstance(self, pd.DataFrame) and "reward_shaping" in self.columns: - raise RuntimeError("forced drift correction failure") - return original_sum(self, *args, **kwargs) + def test_simulate_samples_retains_signals_in_canonical_mode(self): + """simulate_samples() is not drift-corrected; it must not force Σ shaping ~ 0.""" - pd.DataFrame.sum = boom - try: - df_exc = simulate_samples( - params={**params, "max_trade_duration_candles": 120}, - num_samples=250, - seed=SEEDS.PBRS_INVARIANCE_2, - base_factor=PARAMS.BASE_FACTOR, - profit_aim=PARAMS.PROFIT_AIM, - risk_reward_ratio=PARAMS.RISK_REWARD_RATIO, - max_duration_ratio=2.0, - trading_mode="margin", - pnl_base_std=PARAMS.PNL_STD, - pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE, - ) - finally: - pd.DataFrame.sum = original_sum - flags_exc = set(df_exc["pbrs_invariant"].unique().tolist()) - self.assertEqual(flags_exc, {True}) - # Column presence and successful completion are primary guarantees under fallback. - self.assertTrue("reward_shaping" in df_exc.columns) - self.assertIn("reward_shaping", df_exc.columns) - - # Owns invariant (comparison path): pbrs-canonical-drift-correction-106 - def test_pbrs_106_canonical_drift_correction_uniform_offset(self): - """Canonical drift correction reduces Σ shaping below tolerance vs non-canonical.""" - - params_can = self.base_params( + params = self.base_params( exit_potential_mode="canonical", hold_potential_enabled=True, entry_additive_enabled=False, exit_additive_enabled=False, potential_gamma=0.92, ) - df_can = simulate_samples( - params={**params_can, "max_trade_duration_candles": 120}, + df = simulate_samples( + params={**params, "max_trade_duration_candles": 120}, num_samples=SCENARIOS.SAMPLE_SIZE_MEDIUM, seed=SEEDS.PBRS_TERMINAL, base_factor=PARAMS.BASE_FACTOR, @@ -586,36 +667,15 @@ class TestPBRS(RewardSpaceTestBase): pnl_base_std=PARAMS.PNL_STD, pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE, ) - params_non = self.base_params( - exit_potential_mode="retain_previous", - hold_potential_enabled=True, - entry_additive_enabled=False, - exit_additive_enabled=False, - potential_gamma=0.92, - ) - df_non = simulate_samples( - params={**params_non, "max_trade_duration_candles": 120}, - num_samples=SCENARIOS.SAMPLE_SIZE_MEDIUM, - seed=SEEDS.PBRS_TERMINAL, - base_factor=PARAMS.BASE_FACTOR, - profit_aim=PARAMS.PROFIT_AIM, - risk_reward_ratio=PARAMS.RISK_REWARD_RATIO, - max_duration_ratio=2.0, - trading_mode="margin", - pnl_base_std=PARAMS.PNL_STD, - pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE, + abs_sum = float(df["reward_shaping"].abs().sum()) + self.assertTrue(np.isfinite(abs_sum)) + self.assertLessEqual(float(df["reward_shaping"].abs().max()), PBRS.MAX_ABS_SHAPING) + # Even with trajectories, Σ can partially cancel; use L1 magnitude instead. + self.assertGreater( + abs_sum, + PBRS_INVARIANCE_TOL, + "Expected non-trivial shaping magnitudes for canonical mode", ) - total_can = float(df_can["reward_shaping"].sum()) - total_non = float(df_non["reward_shaping"].sum()) - self.assertLess(abs(total_can), abs(total_non) + TOLERANCE.IDENTITY_RELAXED) - assert_pbrs_canonical_sum_within_tolerance(self, total_can, PBRS_INVARIANCE_TOL) - invariant_mask = df_can["pbrs_invariant"] - if bool(getattr(invariant_mask, "any", lambda: False)()): - corrected_values = df_can.loc[invariant_mask, "reward_shaping"].to_numpy() - mean_corrected = float(np.mean(corrected_values)) - self.assertLess(abs(mean_corrected), TOLERANCE.IDENTITY_RELAXED) - spread = float(np.max(corrected_values) - np.min(corrected_values)) - self.assertLess(spread, PBRS.MAX_ABS_SHAPING) # ---------------- Statistical shape invariance ---------------- # @@ -697,7 +757,7 @@ class TestPBRS(RewardSpaceTestBase): next_pnl=0.025, next_duration_ratio=0.35, is_exit=False, - last_potential=0.0, + prev_potential=0.0, params=params, ) ) @@ -718,7 +778,7 @@ class TestPBRS(RewardSpaceTestBase): params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95) ) rng = np.random.default_rng(321) - last_potential = 0.0 + prev_potential = 0.0 telescoping_sum = 0.0 max_abs_step = 0.0 steps = 0 @@ -737,19 +797,19 @@ class TestPBRS(RewardSpaceTestBase): next_pnl=next_pnl, next_duration_ratio=next_dur, is_exit=is_exit, - last_potential=last_potential, + prev_potential=prev_potential, params=params, ) ) - inc = gamma * next_potential - last_potential + inc = gamma * next_potential - prev_potential telescoping_sum += inc if abs(inc) > max_abs_step: max_abs_step = abs(inc) steps += 1 if is_exit: - last_potential = 0.0 + prev_potential = 0.0 else: - last_potential = next_potential + prev_potential = next_potential mean_drift = telescoping_sum / max(1, steps) self.assertLess( abs(mean_drift), @@ -773,7 +833,7 @@ class TestPBRS(RewardSpaceTestBase): exit_potential_decay=0.25, ) rng = np.random.default_rng(321) - last_potential = 0.0 + prev_potential = 0.0 shaping_sum = 0.0 for _ in range(SCENARIOS.MONTE_CARLO_ITERATIONS): @@ -789,12 +849,12 @@ class TestPBRS(RewardSpaceTestBase): next_pnl=next_pnl, next_duration_ratio=next_dur, is_exit=is_exit, - last_potential=last_potential, + prev_potential=prev_potential, params=params, ) ) shaping_sum += shap - last_potential = 0.0 if is_exit else next_pot + prev_potential = 0.0 if is_exit else next_pot self.assertGreater( abs(shaping_sum), PBRS_INVARIANCE_TOL * 50, diff --git a/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py b/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py index 21f0292..ed2ad5b 100644 --- a/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py +++ b/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py @@ -134,7 +134,11 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): # Owns invariant: robustness-exit-pnl-only-117 (robustness category) def test_pnl_invariant_exit_only(self): - """Invariant: only exit actions have non-zero PnL (robustness category).""" + """Invariant: PnL only non-zero while in position. + + The simulator uses coherent trajectories, so PnL is a state variable during + holds and entries; however Neutral samples must have pnl == 0. + """ df = simulate_samples( params=self.base_params(max_trade_duration_candles=50), num_samples=200, @@ -147,25 +151,13 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): pnl_base_std=PARAMS.PNL_STD, pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE, ) - total_pnl = df["pnl"].sum() - exit_mask = df["reward_exit"] != 0 - exit_pnl_sum = df.loc[exit_mask, "pnl"].sum() - self.assertAlmostEqual( - total_pnl, - exit_pnl_sum, - places=TOLERANCE.DECIMAL_PLACES_STANDARD, - msg="PnL invariant violation: total PnL != sum of exit PnL", - ) - non_zero_pnl_actions = set(np.unique(df[df["pnl"].abs() > np.finfo(float).eps]["action"])) - expected_exit_actions = {2.0, 4.0} - self.assertTrue( - non_zero_pnl_actions.issubset(expected_exit_actions), - f"Non-exit actions have PnL: {non_zero_pnl_actions - expected_exit_actions}", + neutral_mask = df["position"] == float(Positions.Neutral.value) + non_zero_neutral_pnl = df.loc[neutral_mask, "pnl"].abs().max() + self.assertLessEqual( + float(non_zero_neutral_pnl), + np.finfo(float).eps, + msg="PnL invariant violation: neutral states must have pnl == 0", ) - invalid_combinations = df[ - (df["pnl"].abs() <= np.finfo(float).eps) & (df["reward_exit"] != 0) - ] - self.assertEqual(len(invalid_combinations), 0) def test_exit_factor_comprehensive(self): """Comprehensive exit factor test: mathematical correctness and monotonic attenuation.""" diff --git a/ReforceXY/reward_space_analysis/tests/test_base.py b/ReforceXY/reward_space_analysis/tests/test_base.py index 43a0c87..79fc075 100644 --- a/ReforceXY/reward_space_analysis/tests/test_base.py +++ b/ReforceXY/reward_space_analysis/tests/test_base.py @@ -104,7 +104,7 @@ class RewardSpaceTestBase(unittest.TestCase): iters = iterations or self.PBRS_SWEEP_ITER term_p = terminal_prob or self.PBRS_TERMINAL_PROB rng = np.random.default_rng(seed) - last_potential = 0.0 + prev_potential = 0.0 terminal_next: list[float] = [] shaping_vals: list[float] = [] current_pnl = 0.0 @@ -124,18 +124,18 @@ class RewardSpaceTestBase(unittest.TestCase): next_duration_ratio=next_dur, is_exit=is_exit, is_entry=False, - last_potential=last_potential, + prev_potential=prev_potential, params=params, ) ) shaping_vals.append(shap_val) if is_exit: terminal_next.append(next_pot) - last_potential = 0.0 + prev_potential = 0.0 current_pnl = 0.0 current_dur = 0.0 else: - last_potential = next_pot + prev_potential = next_pot current_pnl = next_pnl current_dur = next_dur return (terminal_next, shaping_vals)