| `exit_potential_decay` | 0.5 | Decay for progressive_release |
| `hold_potential_enabled` | true | Enable hold potential Φ |
-PBRS invariance holds when: `exit_potential_mode=canonical` AND
-`entry_additive_enabled=false` AND `exit_additive_enabled=false`. Under this
-condition the algorithm enforces zero-sum shaping: if the summed shaping term
-deviates by more than 1e-6 (`PBRS_INVARIANCE_TOL`), a uniform drift correction
-subtracts the mean shaping offset across invariant samples.
+PBRS invariance holds when: `exit_potential_mode=canonical`.
+
+In canonical mode, the entry/exit additive terms are suppressed even if the
+corresponding `*_additive_enabled` flags are set.
+
+Note: PBRS telescoping/zero-sum shaping is a property of coherent trajectories
+(episodes). `simulate_samples()` generates synthetic trajectories (state carried
+across samples) and does not apply any drift correction in post-processing.
#### Hold Potential Transforms
### PBRS Configuration
-Canonical mode enforces zero-sum shaping (Φ terminal ≈ 0) for theoretical
-invariance. Non-canonical modes or additives modify this behavior. Choose
-canonical for standard PBRS compliance; use non-canonical when specific shaping
-behavior is required.
+Canonical mode enforces terminal release (Φ terminal ≈ 0) and suppresses
+entry/exit additive terms.
+
+Non-canonical exit modes can introduce non-zero terminal shaping; enable
+additives only when you want those extra terms to contribute.
### Real Data Comparison
DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = {
"invalid_action": -2.0,
"base_factor": 100.0,
- # Idle penalty (env defaults)
+ # Idle penalty defaults
"idle_penalty_scale": 0.5,
"idle_penalty_power": 1.025,
"max_trade_duration_candles": 128,
# Fallback: DEFAULT_IDLE_DURATION_MULTIPLIER * max_trade_duration_candles
"max_idle_duration_candles": None,
- # Hold penalty (env defaults)
+ # Hold penalty defaults
"hold_penalty_scale": 0.25,
"hold_penalty_power": 1.025,
- # Exit attenuation (env default)
+ # Exit attenuation defaults
"exit_attenuation_mode": "linear",
"exit_plateau": True,
"exit_plateau_grace": 1.0,
"exit_linear_slope": 1.0,
"exit_power_tau": 0.5,
"exit_half_life": 0.5,
- # Efficiency factor (env defaults)
+ # Efficiency factor defaults
"efficiency_weight": 1.0,
"efficiency_center": 0.5,
- # Profit factor (env defaults)
+ # Profit factor defaults
"win_reward_factor": 2.0,
"pnl_factor_beta": 0.5,
- # Invariant / safety (env defaults)
+ # Invariant / safety defaults
"check_invariants": True,
"exit_factor_threshold": 1000.0,
# === PBRS PARAMETERS ===
def _idle_penalty(context: RewardContext, idle_factor: float, params: RewardParams) -> float:
- """Mirror the environment's idle penalty behavior."""
+ """Compute idle penalty."""
idle_penalty_scale = _get_float_param(
params,
"idle_penalty_scale",
def _hold_penalty(context: RewardContext, hold_factor: float, params: RewardParams) -> float:
- """Mirror the environment's hold penalty behavior."""
+ """Compute hold penalty."""
hold_penalty_scale = _get_float_param(
params,
"hold_penalty_scale",
*,
short_allowed: bool,
action_masking: bool,
- previous_potential: float = np.nan,
+ prev_potential: float = np.nan,
) -> RewardBreakdown:
breakdown = RewardBreakdown()
else:
base_reward = 0.0
+ breakdown.base_reward = base_reward
+
# === PBRS INTEGRATION ===
current_pnl = context.pnl if context.position != Positions.Neutral else 0.0
next_duration_ratio = current_duration_ratio
# Apply PBRS only if enabled and not neutral self-loop
- pbrs_enabled = (
- _get_bool_param(
- params,
- "hold_potential_enabled",
- bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)),
- )
- or _get_bool_param(
+ exit_mode = _get_str_param(
+ params,
+ "exit_potential_mode",
+ str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
+ )
+
+ hold_potential_enabled = _get_bool_param(
+ params,
+ "hold_potential_enabled",
+ bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)),
+ )
+ entry_additive_enabled = (
+ False
+ if exit_mode == "canonical"
+ else _get_bool_param(
params,
"entry_additive_enabled",
bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)),
)
- or _get_bool_param(
+ )
+ exit_additive_enabled = (
+ False
+ if exit_mode == "canonical"
+ else _get_bool_param(
params,
"exit_additive_enabled",
bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)),
)
)
- if pbrs_enabled and not is_neutral:
- # Compute Φ(s) for the current state to preserve telescoping semantics Δ = γ·Φ(s') − Φ(s)
- current_potential = _compute_hold_potential(
- current_pnl, pnl_target, current_duration_ratio, params
- )
- if not np.isfinite(current_potential):
- current_potential = 0.0
+ pbrs_enabled = bool(hold_potential_enabled or entry_additive_enabled or exit_additive_enabled)
- last_potential = (
- float(previous_potential)
- if np.isfinite(previous_potential)
- else float(current_potential)
- )
+ if pbrs_enabled:
+ # Stored potential carried across steps.
+ prev_potential = float(prev_potential) if np.isfinite(prev_potential) else 0.0
+
+ if is_neutral:
+ # Neutral self-loop keeps stored potential unchanged.
+ breakdown.prev_potential = prev_potential
+ breakdown.next_potential = prev_potential
+ breakdown.total = base_reward
+ return breakdown
total_reward, reward_shaping, next_potential, pbrs_delta, entry_additive, exit_additive = (
apply_potential_shaping(
next_duration_ratio=next_duration_ratio,
is_exit=is_exit,
is_entry=is_entry,
- previous_potential=current_potential,
- last_potential=last_potential,
+ prev_potential=prev_potential,
params=params,
)
)
breakdown.reward_shaping = reward_shaping
- breakdown.prev_potential = current_potential
+ breakdown.prev_potential = prev_potential
breakdown.next_potential = next_potential
breakdown.entry_additive = entry_additive
breakdown.exit_additive = exit_additive
- breakdown.base_reward = base_reward
breakdown.pbrs_delta = pbrs_delta
- # In canonical mode with additives disabled, this should be ~0
breakdown.invariance_correction = reward_shaping - pbrs_delta
breakdown.total = total_reward
- else:
- breakdown.total = base_reward
+ return breakdown
+
+ breakdown.total = base_reward
return breakdown
pnl_base_std: float,
pnl_duration_vol_scale: float,
) -> pd.DataFrame:
- """Simulate synthetic samples for reward analysis."""
+ """Simulate synthetic samples for reward analysis.
+
+ The synthetic generator produces a *coherent trajectory* (state carried across samples)
+ so PJRS/PBRS stored-potential mechanics can be exercised realistically.
+
+ Notes
+ -----
+ - PnL is a state variable while in position (may be non-zero on holds).
+ - Neutral states always have pnl=0.
+ - Realized PnL appears on the exit step (position still Long/Short).
+ """
+
rng = random.Random(seed)
max_trade_duration_candles = _get_int_param(
params,
)
short_allowed = _is_short_allowed(trading_mode)
action_masking = _get_bool_param(params, "action_masking", True)
+
# Theoretical PBRS invariance flag
exit_mode = _get_str_param(
params,
"exit_potential_mode",
str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
)
- entry_enabled = _get_bool_param(
+ entry_enabled_raw = _get_bool_param(
params,
"entry_additive_enabled",
bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)),
)
- exit_enabled = _get_bool_param(
+ exit_enabled_raw = _get_bool_param(
params,
"exit_additive_enabled",
bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)),
)
+
+ entry_enabled = bool(entry_enabled_raw) if exit_mode != "canonical" else False
+ exit_enabled = bool(exit_enabled_raw) if exit_mode != "canonical" else False
pbrs_invariant = bool(exit_mode == "canonical" and not (entry_enabled or exit_enabled))
- samples: list[Dict[str, float]] = []
- last_potential: float = 0.0
- for _ in range(num_samples):
- if short_allowed:
- position_choices = [
- Positions.Neutral,
- Positions.Long,
- Positions.Short,
- ]
- position_weights = [0.45, 0.3, 0.25]
- else:
- position_choices = [Positions.Neutral, Positions.Long]
- position_weights = [0.6, 0.4]
- position = rng.choices(position_choices, weights=position_weights, k=1)[0]
- action = _sample_action(position, rng, short_allowed=short_allowed)
+ max_idle_duration_candles = get_max_idle_duration_candles(
+ params, max_trade_duration_candles=max_trade_duration_candles
+ )
+ max_trade_duration_cap = int(max_trade_duration_candles * max_duration_ratio)
- if position == Positions.Neutral:
- trade_duration = 0
- max_idle_duration_candles = get_max_idle_duration_candles(
- params, max_trade_duration_candles=max_trade_duration_candles
- )
- idle_duration = int(rng.uniform(0, max_idle_duration_candles))
- else:
- trade_duration = int(rng.uniform(1, max_trade_duration_candles * max_duration_ratio))
- trade_duration = max(1, trade_duration)
- idle_duration = 0
+ samples: list[Dict[str, float]] = []
+ prev_potential: float = 0.0
- # Only exit actions should have non-zero PnL
- pnl = 0.0 # Initialize as zero for all actions
+ # Stateful trajectory variables
+ position = Positions.Neutral
+ trade_duration = 0
+ idle_duration = 0
+ pnl = 0.0
+ max_unrealized_profit = 0.0
+ min_unrealized_profit = 0.0
- # Generate PnL only for exit actions (Long_exit=2, Short_exit=4)
- if action in (Actions.Long_exit, Actions.Short_exit):
+ for _ in range(num_samples):
+ # Simulate market movement while in position (PnL as a state variable)
+ if position in (Positions.Long, Positions.Short):
duration_ratio = _compute_duration_ratio(trade_duration, max_trade_duration_candles)
-
- # PnL variance scales with duration for more realistic heteroscedasticity
pnl_std = pnl_base_std * (1.0 + pnl_duration_vol_scale * duration_ratio)
- pnl = rng.gauss(0.0, pnl_std)
- if position == Positions.Long:
- pnl += 0.005 * duration_ratio
- elif position == Positions.Short:
- pnl -= 0.005 * duration_ratio
+ step_delta = rng.gauss(0.0, pnl_std)
- # Clip PnL to realistic range
- pnl = min(max(-0.15, pnl), 0.15)
+ # Small directional drift so signals aren't perfectly symmetric.
+ drift = 0.001 * duration_ratio
+ if position == Positions.Long:
+ step_delta += drift
+ else:
+ step_delta -= drift
- if position == Positions.Neutral:
+ pnl = min(max(-0.15, pnl + step_delta), 0.15)
+ max_unrealized_profit = max(max_unrealized_profit, pnl)
+ min_unrealized_profit = min(min_unrealized_profit, pnl)
+ else:
+ pnl = 0.0
max_unrealized_profit = 0.0
min_unrealized_profit = 0.0
- else:
- # Unrealized profit bounds
- span = abs(rng.gauss(0.0, 0.015))
- # max >= pnl >= min by construction
- max_unrealized_profit = pnl + abs(rng.gauss(0.0, span))
- min_unrealized_profit = pnl - abs(rng.gauss(0.0, span))
+
+ action = _sample_action(position, rng, short_allowed=short_allowed)
context = RewardContext(
pnl=pnl,
risk_reward_ratio,
short_allowed=short_allowed,
action_masking=action_masking,
- previous_potential=last_potential,
+ prev_potential=prev_potential,
)
+ prev_potential = breakdown.next_potential
- last_potential = breakdown.next_potential
-
- max_idle_duration_candles = get_max_idle_duration_candles(params)
idle_ratio = context.idle_duration / max(1, max_idle_duration_candles)
-
samples.append(
{
"pnl": context.pnl,
}
)
- df = pd.DataFrame(samples)
+ # Transition state
+ if position == Positions.Neutral:
+ if action == Actions.Neutral:
+ idle_duration = min(idle_duration + 1, max_idle_duration_candles)
+ elif action == Actions.Long_enter:
+ position = Positions.Long
+ trade_duration = 0
+ idle_duration = 0
+ elif action == Actions.Short_enter and short_allowed:
+ position = Positions.Short
+ trade_duration = 0
+ idle_duration = 0
+ else:
+ idle_duration = 0
+ if action == Actions.Neutral:
+ trade_duration = min(trade_duration + 1, max_trade_duration_cap)
+ elif action in (Actions.Long_exit, Actions.Short_exit):
+ position = Positions.Neutral
+ trade_duration = 0
+ idle_duration = 0
- # Enforce PBRS invariance: zero-sum shaping under canonical mode and no additives
- try:
- exit_mode = _get_str_param(
- params,
- "exit_potential_mode",
- str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
- )
- entry_enabled = _get_bool_param(
- params,
- "entry_additive_enabled",
- bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)),
- )
- exit_enabled = _get_bool_param(
- params,
- "exit_additive_enabled",
- bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)),
- )
- if exit_mode == "canonical" and not (entry_enabled or exit_enabled):
- if "reward_shaping" in df.columns:
- total_shaping = float(df["reward_shaping"].sum())
- if abs(total_shaping) > PBRS_INVARIANCE_TOL:
- # Drift correction distributes a constant offset across invariant samples
- n_invariant = (
- int(df["pbrs_invariant"].sum())
- if "pbrs_invariant" in df.columns
- else int(len(df))
- )
- drift = total_shaping / max(1, n_invariant)
- df.loc[:, "reward_shaping"] = df["reward_shaping"] - drift
- df.attrs["reward_params"] = dict(params)
- except Exception:
- # Graceful fallback (no invariance enforcement on failure)
- pass
+ df = pd.DataFrame(samples)
+ df.attrs["reward_params"] = dict(params)
# Validate critical algorithmic invariants
_validate_simulation_invariants(df)
def _validate_simulation_invariants(df: pd.DataFrame) -> None:
- """Fail fast if simulation violates PnL or action invariants."""
- # INVARIANT 1: PnL Conservation - Total PnL must equal sum of exit PnL
- total_pnl = df["pnl"].sum()
- exit_action_mask = df["action"].isin([2.0, 4.0])
- exit_pnl_sum = df.loc[exit_action_mask, "pnl"].sum()
-
- # Tolerances from INTERNAL_GUARDS to handle backend/OS numeric epsilons
- tol_pnl = float(INTERNAL_GUARDS.get("sim_pnl_conservation_tol", 1e-10))
+ """Fail fast if simulation violates action/state invariants."""
+
eps_pnl = float(INTERNAL_GUARDS.get("sim_zero_pnl_epsilon", 1e-12))
eps_reward = float(INTERNAL_GUARDS.get("sim_zero_reward_epsilon", 1e-12))
thr_extreme = float(INTERNAL_GUARDS.get("sim_extreme_pnl_threshold", 0.2))
- pnl_diff = abs(total_pnl - exit_pnl_sum)
- if pnl_diff > tol_pnl:
- raise AssertionError(
- f"PnL INVARIANT VIOLATION: Total PnL ({total_pnl:.6f}) != "
- f"Exit PnL sum ({exit_pnl_sum:.6f}), difference = {pnl_diff:.2e}"
- )
-
- # INVARIANT 2: PnL Exclusivity - Only exit actions should have non-zero PnL
- non_zero_pnl_actions = set(df[df["pnl"].abs() > eps_pnl]["action"].unique())
- valid_exit_actions = {2.0, 4.0}
- invalid_actions = non_zero_pnl_actions - valid_exit_actions
- if invalid_actions:
- raise AssertionError(
- f"PnL EXCLUSIVITY VIOLATION: Non-exit actions {invalid_actions} have non-zero PnL"
- )
-
- # INVARIANT 3: Exit Reward Consistency - Non-zero exit rewards require non-zero PnL
- inconsistent_exits = df[(df["pnl"].abs() <= eps_pnl) & (df["reward_exit"].abs() > eps_reward)]
- if len(inconsistent_exits) > 0:
- raise AssertionError(
- f"EXIT REWARD INCONSISTENCY: {len(inconsistent_exits)} actions have "
- f"zero PnL but non-zero exit reward"
- )
-
- # INVARIANT 4: Action-Position Compatibility
- # Validate that exit actions match positions
- long_exits = df[
- (df["action"] == 2.0) & (df["position"] != 1.0)
- ] # Long_exit but not Long position
- short_exits = df[
- (df["action"] == 4.0) & (df["position"] != 0.0)
- ] # Short_exit but not Short position
-
+ # INVARIANT 1: Action-position compatibility
+ long_exits = df[(df["action"] == 2.0) & (df["position"] != 1.0)]
+ short_exits = df[(df["action"] == 4.0) & (df["position"] != 0.0)]
if len(long_exits) > 0:
raise AssertionError(
f"ACTION-POSITION INCONSISTENCY: {len(long_exits)} Long_exit actions "
f"without Long position"
)
-
if len(short_exits) > 0:
raise AssertionError(
f"ACTION-POSITION INCONSISTENCY: {len(short_exits)} Short_exit actions "
f"without Short position"
)
- # INVARIANT 5: Duration Logic - Neutral positions should have trade_duration = 0
+ long_entries = df[(df["action"] == 1.0) & (df["position"] != 0.5)]
+ short_entries = df[(df["action"] == 3.0) & (df["position"] != 0.5)]
+ if len(long_entries) > 0:
+ raise AssertionError(
+ f"ACTION-POSITION INCONSISTENCY: {len(long_entries)} Long_enter actions "
+ f"without Neutral position"
+ )
+ if len(short_entries) > 0:
+ raise AssertionError(
+ f"ACTION-POSITION INCONSISTENCY: {len(short_entries)} Short_enter actions "
+ f"without Neutral position"
+ )
+
+ # INVARIANT 2: Duration logic
neutral_with_trade = df[(df["position"] == 0.5) & (df["trade_duration"] > 0)]
if len(neutral_with_trade) > 0:
raise AssertionError(
f"with non-zero trade_duration"
)
- # INVARIANT 6: Bounded Values - Check realistic bounds
- extreme_pnl = df[(df["pnl"].abs() > thr_extreme)] # Beyond reasonable range
+ inpos_with_idle = df[(df["position"] != 0.5) & (df["idle_duration"] > 0)]
+ if len(inpos_with_idle) > 0:
+ raise AssertionError(
+ f"DURATION LOGIC VIOLATION: {len(inpos_with_idle)} In-position samples "
+ f"with idle_duration > 0"
+ )
+
+ # INVARIANT 3: Neutral states have zero PnL (simulation design)
+ neutral_with_pnl = df[(df["position"] == 0.5) & (df["pnl"].abs() > eps_pnl)]
+ if len(neutral_with_pnl) > 0:
+ raise AssertionError(
+ f"PNL LOGIC VIOLATION: {len(neutral_with_pnl)} Neutral positions with non-zero pnl"
+ )
+
+ # INVARIANT 4: Exit rewards only appear on exit actions
+ non_exit_with_exit_reward = df[
+ (~df["action"].isin([2.0, 4.0])) & (df["reward_exit"].abs() > eps_reward)
+ ]
+ if len(non_exit_with_exit_reward) > 0:
+ raise AssertionError(
+ f"EXIT REWARD INCONSISTENCY: {len(non_exit_with_exit_reward)} non-exit actions "
+ f"have non-zero exit reward"
+ )
+
+ # INVARIANT 5: Bounded values
+ extreme_pnl = df[(df["pnl"].abs() > thr_extreme)]
if len(extreme_pnl) > 0:
- max_abs_pnl = df["pnl"].abs().max()
+ max_abs_pnl = float(df["pnl"].abs().max())
raise AssertionError(
f"BOUNDS VIOLATION: {len(extreme_pnl)} samples with extreme PnL, "
f"max |PnL| = {max_abs_pnl:.6f}"
)
-def _compute_exit_potential(last_potential: float, params: RewardParams) -> float:
- """Exit potential per mode (canonical/non_canonical -> 0; others transform Φ)."""
+def _compute_exit_potential(prev_potential: float, params: RewardParams) -> float:
+ """Exit potential per mode (canonical/non_canonical -> 0; others transform Φ(prev))."""
mode = _get_str_param(
params,
"exit_potential_mode",
stacklevel=2,
)
decay = 1.0
- next_potential = last_potential * (1.0 - decay)
+ next_potential = prev_potential * (1.0 - decay)
elif mode == "spike_cancel":
gamma = _get_potential_gamma(params)
if gamma <= 0.0 or not np.isfinite(gamma):
- next_potential = last_potential
+ next_potential = prev_potential
else:
- next_potential = last_potential / gamma
+ next_potential = prev_potential / gamma
elif mode == "retain_previous":
- next_potential = last_potential
+ next_potential = prev_potential
else:
_warn_unknown_mode(
"exit_potential_mode",
next_pnl: float,
next_duration_ratio: float,
params: RewardParams,
+ *,
is_exit: bool = False,
is_entry: bool = False,
- previous_potential: float = np.nan,
- last_potential: Optional[float] = None,
+ prev_potential: float,
) -> tuple[float, float, float, float, float, float]:
- """Compute shaped reward with explicit PBRS semantics.
+ """Compute shaped reward using PBRS.
Returns
-------
tuple[float, float, float, float, float, float]
(reward, reward_shaping, next_potential, pbrs_delta, entry_additive, exit_additive)
- where pbrs_delta = gamma * next_potential - prev_term is the pure PBRS component.
+ where pbrs_delta = gamma * next_potential - prev_potential is the pure PBRS component.
Notes
-----
- Shaping Δ = γ·Φ(next) − Φ(prev).
- - previous_potential:
- Previously computed Φ(s) for the prior transition. When provided and finite, it
- is used as Φ(prev) in Δ; otherwise Φ(prev) is derived from the current state.
- - last_potential:
- Potential used to compute terminal Φ′ at exit via _compute_exit_potential().
- Fallback logic: if last_potential is None or non-finite, then last_potential := previous_potential
- (or the derived prev term) to preserve telescoping semantics.
+ - Φ(prev) must be provided explicitly as the stored potential carried across steps.
+ This uses an explicit stored-potential value across steps.
+ - Exit potential modes compute Φ(next) from Φ(prev).
- Entry additive is applied only on entry transitions (based on next_* metrics).
- Exit additive is applied only on exit transitions (based on current_* metrics).
- - Canonical invariance: when exit_potential_mode == 'canonical' and additives are disabled,
- the telescoping sum ensures Σ reward_shaping ≈ 0 across a complete episode.
+
+ Note
+ ----------------------
+ Canonical mode is typically evaluated with additives disabled externally.
+ This helper intentionally does not mutate `params`.
"""
- params = _enforce_pbrs_invariance(params)
gamma = _get_potential_gamma(params)
- # Use provided previous_potential when finite; otherwise derive from current state
- prev_term = (
- float(previous_potential)
- if np.isfinite(previous_potential)
- else _compute_hold_potential(current_pnl, pnl_target, current_duration_ratio, params)
+ prev_potential = float(prev_potential) if np.isfinite(prev_potential) else 0.0
+
+ exit_mode = _get_str_param(
+ params,
+ "exit_potential_mode",
+ str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
+ )
+ canonical_mode = exit_mode == "canonical"
+
+ hold_potential_enabled = _get_bool_param(
+ params,
+ "hold_potential_enabled",
+ bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)),
)
- if not np.isfinite(prev_term):
- prev_term = 0.0
if is_exit:
- last_potential = (
- float(last_potential)
- if (last_potential is not None and np.isfinite(last_potential))
- else float(prev_term)
- )
- next_potential = _compute_exit_potential(last_potential, params)
+ next_potential = _compute_exit_potential(prev_potential, params)
+ # PBRS shaping Δ = γ·Φ(next) − Φ(prev)
+ pbrs_delta = gamma * next_potential - prev_potential
+ reward_shaping = pbrs_delta
else:
- next_potential = _compute_hold_potential(next_pnl, pnl_target, next_duration_ratio, params)
-
- # PBRS shaping Δ = γ·Φ(next) − Φ(prev)
- pbrs_delta = gamma * next_potential - float(prev_term)
- reward_shaping = pbrs_delta
+ # When hold potential is disabled, force Φ(next)=0 and emit no PBRS shaping on entry/hold.
+ if not hold_potential_enabled:
+ next_potential = 0.0
+ pbrs_delta = 0.0
+ reward_shaping = 0.0
+ else:
+ next_potential = _compute_hold_potential(
+ next_pnl, pnl_target, next_duration_ratio, params
+ )
+ # PBRS shaping Δ = γ·Φ(next) − Φ(prev)
+ pbrs_delta = gamma * next_potential - prev_potential
+ reward_shaping = pbrs_delta
# Non-PBRS additives
- cand_entry_add = _compute_entry_additive(next_pnl, pnl_target, next_duration_ratio, params)
- cand_exit_add = _compute_exit_additive(current_pnl, pnl_target, current_duration_ratio, params)
+ if canonical_mode:
+ entry_additive = 0.0
+ exit_additive = 0.0
+ else:
+ cand_entry_add = _compute_entry_additive(next_pnl, pnl_target, next_duration_ratio, params)
+ cand_exit_add = _compute_exit_additive(
+ current_pnl, pnl_target, current_duration_ratio, params
+ )
- entry_additive = cand_entry_add if is_entry else 0.0
- exit_additive = cand_exit_add if is_exit else 0.0
+ entry_additive = cand_entry_add if is_entry else 0.0
+ exit_additive = cand_exit_add if is_exit else 0.0
reward = base_reward + reward_shaping + entry_additive + exit_additive
if not np.isfinite(reward):
return float(base_reward), 0.0, 0.0, 0.0, 0.0, 0.0
- if np.isclose(reward_shaping, 0.0):
- reward_shaping = 0.0
- pbrs_delta = 0.0
+
return (
float(reward),
float(reward_shaping),
)
-def _enforce_pbrs_invariance(params: RewardParams) -> RewardParams:
- """Disable entry/exit additives once in canonical PBRS to preserve invariance."""
- mode = _get_str_param(
- params,
- "exit_potential_mode",
- str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
- )
- if mode != "canonical":
- return params
- if params.get("_pbrs_invariance_applied"):
- return params
- entry_enabled = _get_bool_param(
- params,
- "entry_additive_enabled",
- bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)),
- )
- exit_enabled = _get_bool_param(
- params,
- "exit_additive_enabled",
- bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)),
- )
- # Strict canonical enforcement
- if entry_enabled:
- warnings.warn(
- "Disabling entry additive to preserve PBRS invariance (canonical mode).",
- RewardDiagnosticsWarning,
- stacklevel=2,
- )
- params["entry_additive_enabled"] = False
- if exit_enabled:
- warnings.warn(
- "Disabling exit additive to preserve PBRS invariance (canonical mode).",
- RewardDiagnosticsWarning,
- stacklevel=2,
- )
- params["exit_additive_enabled"] = False
- params["_pbrs_invariance_applied"] = True
- return params
-
-
def _compute_bi_component(
kind: str,
pnl: float,
"--base_factor",
type=float,
default=100.0,
- help="Base reward factor used inside the environment (default: 100).",
+ help="Base reward scaling factor (default: 100).",
)
parser.add_argument(
"--profit_aim",
fp = " ".join(line.strip().split())[:160]
warnings_breakdown[fp] = warnings_breakdown.get(fp, 0) + 1
- # Collect environment + reproducibility metadata
+ # Collect reproducibility metadata
def _git_hash() -> Optional[str]:
try:
proc = subprocess.run(
- Assert: next_potential ≈ 0 within TOLERANCE.IDENTITY_RELAXED
2. Part B: Shaping recovery verification
- - Verify: reward_shaping ≈ -gamma * last_potential
- - Checks proper potential recovery mechanism
+ - Verify: reward_shaping ≈ -prev_potential (canonical exit)
+ - Checks proper potential release at terminal
- 3. Part C: Cumulative drift analysis
+ 3. Part C: Cumulative shaping magnitude
- Track cumulative shaping over 100-episode sequence
- - Assert: Bounded drift (no systematic bias accumulation)
+ - Assert: Bounded magnitude (no systematic bias accumulation)
**Setup:**
- Exit modes: [progressive_release, spike_cancel, canonical]
**Assertions:**
- Terminal potential: |next_potential| < TOLERANCE.IDENTITY_RELAXED
- - Shaping recovery: |shaping + gamma*last_pot| < TOLERANCE.IDENTITY_RELAXED
- - Cumulative drift: |sum(shaping)| < 10 * TOLERANCE.IDENTITY_RELAXED
+ - Shaping recovery: |shaping + prev_potential| < TOLERANCE.IDENTITY_RELAXED
+ - Cumulative sum bound: |sum(shaping)| < 10 * TOLERANCE.IDENTITY_RELAXED
**Tolerance rationale:**
- IDENTITY_RELAXED: PBRS calculations involve gamma discounting,
The suite enforces:
- Reward component mathematics & transform correctness
-- PBRS invariance mechanics (canonical drift correction, near-zero
+- PBRS shaping mechanics (canonical exit semantics, near-zero
classification)
- Robustness under extreme / invalid parameter settings
- Statistical metrics integrity (bootstrap, constant distributions)
| robustness-negative-grace-clamp-103 | robustness | Negative exit_plateau_grace clamps to 0.0 w/ warning | robustness/test_robustness.py:555 | |
| robustness-invalid-power-tau-104 | robustness | Invalid power tau falls back alpha=1.0 w/ warning | robustness/test_robustness.py:592 | |
| robustness-near-zero-half-life-105 | robustness | Near-zero half life yields no attenuation (factor≈base) | robustness/test_robustness.py:621 | |
-| pbrs-canonical-drift-correction-106 | pbrs | Canonical drift correction enforces near zero-sum shaping | pbrs/test_pbrs.py:449 | Multi-path: extension fallback (475), comparison path (517) |
+| pbrs-canonical-exit-semantic-106 | pbrs | Canonical exit uses shaping=-prev_potential and next_potential=0.0 | pbrs/test_pbrs.py:449 | Uses stored potential across steps; no drift correction applied |
| pbrs-canonical-near-zero-report-116 | pbrs | Canonical near-zero cumulative shaping classification | pbrs/test_pbrs.py:748 | Full report classification |
| statistics-partial-deps-skip-107 | statistics | skip_partial_dependence => empty PD structures | statistics/test_statistics.py:28 | Docstring line |
| helpers-duplicate-rows-drop-108 | helpers | Duplicate rows dropped w/ warning counting removals | helpers/test_utilities.py:26 | Docstring line |
Expect a single directory path. Examples:
```shell
-grep -R "drift_correction" -n .
grep -R "near_zero" -n .
+grep -R "pbrs_delta" -n .
```
## Coverage Parity Notes
"is_exit": False,
}
_t0, s0, _n0, _pbrs0, _entry0, _exit0 = apply_potential_shaping(
- last_potential=0.0, params=base, **ctx
+ prev_potential=0.0, params=base, **ctx
)
t1, s1, _n1, _pbrs1, _entry1, _exit1 = apply_potential_shaping(
- last_potential=0.0, params=with_add, **ctx
+ prev_potential=0.0, params=with_add, **ctx
)
self.assertFinite(t1)
self.assertFinite(s1)
risk_reward_ratio=1.0,
short_allowed=True,
action_masking=True,
- previous_potential=np.nan,
+ prev_potential=np.nan,
)
assert math.isfinite(breakdown.prev_potential)
assert math.isfinite(breakdown.next_potential)
TOLERANCE,
)
from ..helpers import (
- assert_non_canonical_shaping_exceeds,
- assert_pbrs_canonical_sum_within_tolerance,
assert_pbrs_invariance_report_classification,
assert_relaxed_multi_reason_aggregation,
build_validation_case,
next_duration_ratio=0.0,
is_exit=True,
is_entry=False,
- last_potential=0.789,
+ prev_potential=prev_potential,
params=params,
)
self.assertAlmostEqualFloat(next_potential, 0.0, tolerance=TOLERANCE.IDENTITY_RELAXED)
next_duration_ratio=0.0,
is_exit=True,
is_entry=False,
- last_potential=prev_potential,
+ prev_potential=prev_potential,
params=params,
)
self.assertAlmostEqualFloat(
)
self.assertNearZero(reward_shaping, atol=TOLERANCE.IDENTITY_RELAXED)
- # ---------------- Invariance sum checks (simulate_samples) ---------------- #
+ # ---------------- Invariance flags (simulate_samples) ---------------- #
- def test_canonical_invariance_flag_and_sum(self):
- """Canonical mode + no additives -> invariant flags True and Σ shaping ≈ 0."""
+ def test_canonical_invariance_flag(self):
+ """Canonical mode + no additives -> invariant flag True per-sample.
+
+ Note: `simulate_samples()` generates synthetic trajectories (coherent episodes).
+ This test only verifies the per-sample invariance flag and numeric stability; it does not
+ assert any telescoping/zero-sum property for the shaping term.
+ """
params = self.base_params(
exit_potential_mode="canonical",
)
unique_flags = set(df["pbrs_invariant"].unique().tolist())
self.assertEqual(unique_flags, {True}, f"Unexpected invariant flags: {unique_flags}")
- total_shaping = float(df["reward_shaping"].sum())
- assert_pbrs_canonical_sum_within_tolerance(self, total_shaping, PBRS_INVARIANCE_TOL)
+ self.assertTrue(np.isfinite(df["reward_shaping"]).all())
+ self.assertLessEqual(float(df["reward_shaping"].abs().max()), PBRS.MAX_ABS_SHAPING)
def test_non_canonical_flag_false_and_sum_nonzero(self):
- """Non-canonical mode -> invariant flags False and Σ shaping significantly non-zero."""
+ """Non-canonical mode -> invariant flags False and Σ shaping non-zero."""
params = self.base_params(
exit_potential_mode="progressive_release",
)
unique_flags = set(df["pbrs_invariant"].unique().tolist())
self.assertEqual(unique_flags, {False}, f"Unexpected invariant flags: {unique_flags}")
- total_shaping = float(df["reward_shaping"].sum())
- assert_non_canonical_shaping_exceeds(self, total_shaping, PBRS_INVARIANCE_TOL * 10)
+ abs_sum = float(df["reward_shaping"].abs().sum())
+ self.assertGreater(
+ abs_sum,
+ PBRS_INVARIANCE_TOL * 2,
+ f"Expected non-trivial shaping magnitude (got {abs_sum})",
+ )
# ---------------- Additives and canonical path mechanics ---------------- #
)
self.assertEqual(float(val_exit), 0.0)
+ def test_hold_potential_disabled_forces_zero_potential_on_entry(self):
+ """hold_potential_enabled=False: entry sets Φ(next)=0 and no shaping."""
+ params = self.base_params(
+ hold_potential_enabled=False,
+ exit_potential_mode="canonical",
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ potential_gamma=0.93,
+ )
+ (
+ total,
+ reward_shaping,
+ next_potential,
+ pbrs_delta,
+ entry_additive,
+ exit_additive,
+ ) = apply_potential_shaping(
+ base_reward=0.25,
+ current_pnl=0.0,
+ pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
+ current_duration_ratio=0.0,
+ next_pnl=0.01,
+ next_duration_ratio=0.0,
+ is_exit=False,
+ is_entry=True,
+ prev_potential=0.42,
+ params=params,
+ )
+ self.assertPlacesEqual(next_potential, 0.0, places=TOLERANCE.DECIMAL_PLACES_STRICT)
+ self.assertNearZero(reward_shaping, atol=TOLERANCE.IDENTITY_STRICT)
+ self.assertNearZero(pbrs_delta, atol=TOLERANCE.IDENTITY_STRICT)
+ self.assertNearZero(entry_additive, atol=TOLERANCE.IDENTITY_STRICT)
+ self.assertNearZero(exit_additive, atol=TOLERANCE.IDENTITY_STRICT)
+ self.assertAlmostEqualFloat(
+ total,
+ 0.25,
+ tolerance=TOLERANCE.IDENTITY_STRICT,
+ msg="Entry shaping must be suppressed when hold potential disabled",
+ )
+
+ def test_hold_potential_disabled_forces_zero_potential_on_hold(self):
+ """hold_potential_enabled=False: hold sets Φ(next)=0 and no shaping."""
+ params = self.base_params(
+ hold_potential_enabled=False,
+ exit_potential_mode="canonical",
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ potential_gamma=0.93,
+ )
+ (
+ total,
+ reward_shaping,
+ next_potential,
+ pbrs_delta,
+ _entry_additive,
+ _exit_additive,
+ ) = apply_potential_shaping(
+ base_reward=-0.1,
+ current_pnl=0.02,
+ pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
+ current_duration_ratio=0.4,
+ next_pnl=0.02,
+ next_duration_ratio=0.41,
+ is_exit=False,
+ is_entry=False,
+ prev_potential=0.5,
+ params=params,
+ )
+ self.assertPlacesEqual(next_potential, 0.0, places=TOLERANCE.DECIMAL_PLACES_STRICT)
+ self.assertNearZero(reward_shaping, atol=TOLERANCE.IDENTITY_STRICT)
+ self.assertNearZero(pbrs_delta, atol=TOLERANCE.IDENTITY_STRICT)
+ self.assertAlmostEqualFloat(
+ total,
+ -0.1,
+ tolerance=TOLERANCE.IDENTITY_STRICT,
+ msg="Hold shaping must be suppressed when hold potential disabled",
+ )
+
def test_exit_potential_canonical(self):
- """Verifies canonical exit resets potential and auto-disables additives."""
+ """Verifies canonical exit resets potential (no params mutation)."""
params = self.base_params(
exit_potential_mode="canonical",
hold_potential_enabled=True,
- entry_additive_enabled=True,
- exit_additive_enabled=True,
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
)
+ params_before = dict(params)
+
base_reward = 0.25
current_pnl = 0.05
current_duration_ratio = 0.4
next_duration_ratio=next_duration_ratio,
is_exit=True,
is_entry=False,
- last_potential=0.789,
+ prev_potential=0.789,
params=params,
)
)
- self.assertIn("_pbrs_invariance_applied", params)
- self.assertFalse(
- params["entry_additive_enabled"],
- "Entry additive should be auto-disabled in canonical mode",
- )
- self.assertFalse(
- params["exit_additive_enabled"],
- "Exit additive should be auto-disabled in canonical mode",
- )
+
+ self.assertEqual(params, params_before, "apply_potential_shaping must not mutate params")
self.assertPlacesEqual(next_potential, 0.0, places=TOLERANCE.DECIMAL_PLACES_STRICT)
- current_potential = _compute_hold_potential(
- current_pnl,
- PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
- current_duration_ratio,
- {"hold_potential_enabled": True, "hold_potential_scale": 1.0},
- )
- self.assertAlmostEqual(shaping, -current_potential, delta=TOLERANCE.IDENTITY_RELAXED)
+ self.assertAlmostEqual(shaping, -0.789, delta=TOLERANCE.IDENTITY_RELAXED)
residual = total - base_reward - shaping
self.assertAlmostEqual(residual, 0.0, delta=TOLERANCE.IDENTITY_RELAXED)
self.assertTrue(np.isfinite(total))
- def test_pbrs_invariance_internal_flag_set(self):
- """Verifies canonical path sets _pbrs_invariance_applied flag (idempotent)."""
+ def test_canonical_mode_suppresses_additives_even_if_enabled(self):
+ """Verifies canonical mode forces entry/exit additive terms to zero."""
params = self.base_params(
exit_potential_mode="canonical",
hold_potential_enabled=True,
entry_additive_enabled=True,
exit_additive_enabled=True,
+ entry_additive_scale=10.0,
+ exit_additive_scale=10.0,
)
- terminal_next_potentials, shaping_values = self._canonical_sweep(params)
- _t1, _s1, _n1, _pbrs_delta, _entry_additive, _exit_additive = apply_potential_shaping(
+
+ (
+ _total_entry,
+ _shaping_entry,
+ _next_potential_entry,
+ _pbrs_delta_entry,
+ entry_additive,
+ exit_additive_entry,
+ ) = apply_potential_shaping(
base_reward=0.0,
- current_pnl=0.05,
+ current_pnl=0.0,
pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
- current_duration_ratio=0.3,
- next_pnl=0.0,
+ current_duration_ratio=0.0,
+ next_pnl=0.02,
next_duration_ratio=0.0,
- is_exit=True,
- is_entry=False,
- last_potential=0.4,
+ is_exit=False,
+ is_entry=True,
+ prev_potential=0.0,
params=params,
)
- self.assertIn("_pbrs_invariance_applied", params)
- self.assertFalse(params["entry_additive_enabled"])
- self.assertFalse(params["exit_additive_enabled"])
- if terminal_next_potentials:
- self.assertTrue(all((abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials)))
- max_abs = max((abs(v) for v in shaping_values)) if shaping_values else 0.0
- self.assertLessEqual(max_abs, PBRS.MAX_ABS_SHAPING)
- state_after = (params["entry_additive_enabled"], params["exit_additive_enabled"])
- _t2, _s2, _n2, _pbrs_delta2, _entry_additive2, _exit_additive2 = apply_potential_shaping(
+ self.assertNearZero(entry_additive, atol=TOLERANCE.IDENTITY_STRICT)
+ self.assertNearZero(exit_additive_entry, atol=TOLERANCE.IDENTITY_STRICT)
+
+ (
+ _total_exit,
+ _shaping_exit,
+ _next_potential_exit,
+ _pbrs_delta_exit,
+ entry_additive_exit,
+ exit_additive,
+ ) = apply_potential_shaping(
base_reward=0.0,
current_pnl=0.02,
pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
- current_duration_ratio=0.1,
+ current_duration_ratio=0.5,
next_pnl=0.0,
next_duration_ratio=0.0,
is_exit=True,
is_entry=False,
- last_potential=0.1,
+ prev_potential=0.4,
params=params,
)
- self.assertEqual(
- state_after, (params["entry_additive_enabled"], params["exit_additive_enabled"])
+ self.assertNearZero(entry_additive_exit, atol=TOLERANCE.IDENTITY_STRICT)
+ self.assertNearZero(exit_additive, atol=TOLERANCE.IDENTITY_STRICT)
+
+ def test_canonical_sweep_does_not_require_param_enforcement(self):
+ """Verifies canonical sweep runs without mutating params."""
+ params = self.base_params(
+ exit_potential_mode="canonical",
+ hold_potential_enabled=True,
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
)
+ params_before = dict(params)
+ terminal_next_potentials, shaping_values = self._canonical_sweep(params)
+ self.assertEqual(params, params_before)
+ if terminal_next_potentials:
+ self.assertTrue(all((abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials)))
+ max_abs = max((abs(v) for v in shaping_values)) if shaping_values else 0.0
+ self.assertLessEqual(max_abs, PBRS.MAX_ABS_SHAPING)
def test_progressive_release_negative_decay_clamped(self):
"""Verifies negative decay clamping: next potential equals last potential."""
exit_potential_decay=-0.75,
hold_potential_enabled=True,
)
- last_potential = 0.42
+ prev_potential = 0.42
total, shaping, next_potential, _pbrs_delta, _entry_additive, _exit_additive = (
apply_potential_shaping(
base_reward=0.0,
next_pnl=0.0,
next_duration_ratio=0.0,
is_exit=True,
- last_potential=last_potential,
+ prev_potential=prev_potential,
params=params,
)
)
self.assertPlacesEqual(
- next_potential, last_potential, places=TOLERANCE.DECIMAL_PLACES_STRICT
+ next_potential, prev_potential, places=TOLERANCE.DECIMAL_PLACES_STRICT
)
gamma_raw = DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95)
gamma_fallback = 0.95 if gamma_raw is None else gamma_raw
gamma = float(gamma_fallback)
except Exception:
gamma = 0.95
- self.assertLessEqual(abs(shaping - gamma * last_potential), TOLERANCE.GENERIC_EQ)
+ # PBRS shaping Δ = γ·Φ(next) − Φ(prev). Here Φ(next)=Φ(prev) since decay clamps to 0.
+ self.assertLessEqual(
+ abs(shaping - ((gamma - 1.0) * prev_potential)),
+ TOLERANCE.GENERIC_EQ,
+ )
self.assertPlacesEqual(total, shaping, places=TOLERANCE.DECIMAL_PLACES_STRICT)
def test_potential_gamma_nan_fallback(self):
next_pnl=0.035,
next_duration_ratio=0.25,
is_exit=False,
- last_potential=0.0,
+ prev_potential=0.0,
params=params_nan,
)
params_ref = self.base_params(potential_gamma=default_gamma, hold_potential_enabled=True)
next_pnl=0.035,
next_duration_ratio=0.25,
is_exit=False,
- last_potential=0.0,
+ prev_potential=0.0,
params=params_ref,
)
self.assertLess(
self.assertLess(cumulative, -TOLERANCE.NEGLIGIBLE)
self.assertGreater(abs(cumulative), 10 * TOLERANCE.IDENTITY_RELAXED)
- # ---------------- Drift correction invariants (simulate_samples) ---------------- #
+ def test_exit_step_shaping_matches_exit_step_rules(self):
+ """Exit step: shaping uses stored prev_potential.
- # Owns invariant: pbrs-canonical-drift-correction-106
- def test_pbrs_106_canonical_drift_correction_zero_sum(self):
- """Invariant 106: canonical mode enforces near zero-sum shaping (drift correction)."""
+ For canonical mode, next_potential must be 0 and shaping_delta = -prev_potential.
+ """
params = self.base_params(
exit_potential_mode="canonical",
exit_additive_enabled=False,
potential_gamma=0.94,
)
- df = simulate_samples(
- params={**params, "max_trade_duration_candles": 100},
- num_samples=SCENARIOS.SAMPLE_SIZE_MEDIUM,
- seed=SEEDS.BASE,
- base_factor=PARAMS.BASE_FACTOR,
- profit_aim=PARAMS.PROFIT_AIM,
- risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
- max_duration_ratio=2.0,
- trading_mode="margin",
- pnl_base_std=PARAMS.PNL_STD,
- pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE,
+ prev_potential = 0.42
+ (
+ _total_reward,
+ reward_shaping,
+ next_potential,
+ pbrs_delta,
+ _entry_additive,
+ _exit_additive,
+ ) = apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=0.012,
+ pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
+ current_duration_ratio=0.3,
+ next_pnl=0.0,
+ next_duration_ratio=0.0,
+ is_exit=True,
+ is_entry=False,
+ prev_potential=prev_potential,
+ params=params,
)
- total_shaping = float(df["reward_shaping"].sum())
- assert_pbrs_canonical_sum_within_tolerance(self, total_shaping, PBRS_INVARIANCE_TOL)
- flags = set(df["pbrs_invariant"].unique().tolist())
- self.assertEqual(flags, {True}, f"Unexpected invariance flags canonical: {flags}")
-
- # Owns invariant (extension path): pbrs-canonical-drift-correction-106
- def test_pbrs_106_canonical_drift_correction_exception_fallback(self):
- """Invariant 106 (extension): exception path graceful fallback."""
- params = self.base_params(
- exit_potential_mode="canonical",
- hold_potential_enabled=True,
- entry_additive_enabled=False,
- exit_additive_enabled=False,
- potential_gamma=0.91,
+ self.assertPlacesEqual(next_potential, 0.0, places=TOLERANCE.DECIMAL_PLACES_STRICT)
+ self.assertAlmostEqualFloat(
+ reward_shaping,
+ -prev_potential,
+ tolerance=TOLERANCE.IDENTITY_RELAXED,
+ msg="Canonical exit shaping should be -prev_potential",
+ )
+ self.assertAlmostEqualFloat(
+ pbrs_delta,
+ -prev_potential,
+ tolerance=TOLERANCE.IDENTITY_RELAXED,
+ msg="Canonical exit PBRS delta should be -prev_potential",
)
- original_sum = pd.DataFrame.sum
- def boom(self, *args, **kwargs): # noqa: D401
- if isinstance(self, pd.DataFrame) and "reward_shaping" in self.columns:
- raise RuntimeError("forced drift correction failure")
- return original_sum(self, *args, **kwargs)
+ def test_simulate_samples_retains_signals_in_canonical_mode(self):
+ """simulate_samples() is not drift-corrected; it must not force Σ shaping ~ 0."""
- pd.DataFrame.sum = boom
- try:
- df_exc = simulate_samples(
- params={**params, "max_trade_duration_candles": 120},
- num_samples=250,
- seed=SEEDS.PBRS_INVARIANCE_2,
- base_factor=PARAMS.BASE_FACTOR,
- profit_aim=PARAMS.PROFIT_AIM,
- risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
- max_duration_ratio=2.0,
- trading_mode="margin",
- pnl_base_std=PARAMS.PNL_STD,
- pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE,
- )
- finally:
- pd.DataFrame.sum = original_sum
- flags_exc = set(df_exc["pbrs_invariant"].unique().tolist())
- self.assertEqual(flags_exc, {True})
- # Column presence and successful completion are primary guarantees under fallback.
- self.assertTrue("reward_shaping" in df_exc.columns)
- self.assertIn("reward_shaping", df_exc.columns)
-
- # Owns invariant (comparison path): pbrs-canonical-drift-correction-106
- def test_pbrs_106_canonical_drift_correction_uniform_offset(self):
- """Canonical drift correction reduces Σ shaping below tolerance vs non-canonical."""
-
- params_can = self.base_params(
+ params = self.base_params(
exit_potential_mode="canonical",
hold_potential_enabled=True,
entry_additive_enabled=False,
exit_additive_enabled=False,
potential_gamma=0.92,
)
- df_can = simulate_samples(
- params={**params_can, "max_trade_duration_candles": 120},
+ df = simulate_samples(
+ params={**params, "max_trade_duration_candles": 120},
num_samples=SCENARIOS.SAMPLE_SIZE_MEDIUM,
seed=SEEDS.PBRS_TERMINAL,
base_factor=PARAMS.BASE_FACTOR,
pnl_base_std=PARAMS.PNL_STD,
pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE,
)
- params_non = self.base_params(
- exit_potential_mode="retain_previous",
- hold_potential_enabled=True,
- entry_additive_enabled=False,
- exit_additive_enabled=False,
- potential_gamma=0.92,
- )
- df_non = simulate_samples(
- params={**params_non, "max_trade_duration_candles": 120},
- num_samples=SCENARIOS.SAMPLE_SIZE_MEDIUM,
- seed=SEEDS.PBRS_TERMINAL,
- base_factor=PARAMS.BASE_FACTOR,
- profit_aim=PARAMS.PROFIT_AIM,
- risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
- max_duration_ratio=2.0,
- trading_mode="margin",
- pnl_base_std=PARAMS.PNL_STD,
- pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE,
+ abs_sum = float(df["reward_shaping"].abs().sum())
+ self.assertTrue(np.isfinite(abs_sum))
+ self.assertLessEqual(float(df["reward_shaping"].abs().max()), PBRS.MAX_ABS_SHAPING)
+ # Even with trajectories, Σ can partially cancel; use L1 magnitude instead.
+ self.assertGreater(
+ abs_sum,
+ PBRS_INVARIANCE_TOL,
+ "Expected non-trivial shaping magnitudes for canonical mode",
)
- total_can = float(df_can["reward_shaping"].sum())
- total_non = float(df_non["reward_shaping"].sum())
- self.assertLess(abs(total_can), abs(total_non) + TOLERANCE.IDENTITY_RELAXED)
- assert_pbrs_canonical_sum_within_tolerance(self, total_can, PBRS_INVARIANCE_TOL)
- invariant_mask = df_can["pbrs_invariant"]
- if bool(getattr(invariant_mask, "any", lambda: False)()):
- corrected_values = df_can.loc[invariant_mask, "reward_shaping"].to_numpy()
- mean_corrected = float(np.mean(corrected_values))
- self.assertLess(abs(mean_corrected), TOLERANCE.IDENTITY_RELAXED)
- spread = float(np.max(corrected_values) - np.min(corrected_values))
- self.assertLess(spread, PBRS.MAX_ABS_SHAPING)
# ---------------- Statistical shape invariance ---------------- #
next_pnl=0.025,
next_duration_ratio=0.35,
is_exit=False,
- last_potential=0.0,
+ prev_potential=0.0,
params=params,
)
)
params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95)
)
rng = np.random.default_rng(321)
- last_potential = 0.0
+ prev_potential = 0.0
telescoping_sum = 0.0
max_abs_step = 0.0
steps = 0
next_pnl=next_pnl,
next_duration_ratio=next_dur,
is_exit=is_exit,
- last_potential=last_potential,
+ prev_potential=prev_potential,
params=params,
)
)
- inc = gamma * next_potential - last_potential
+ inc = gamma * next_potential - prev_potential
telescoping_sum += inc
if abs(inc) > max_abs_step:
max_abs_step = abs(inc)
steps += 1
if is_exit:
- last_potential = 0.0
+ prev_potential = 0.0
else:
- last_potential = next_potential
+ prev_potential = next_potential
mean_drift = telescoping_sum / max(1, steps)
self.assertLess(
abs(mean_drift),
exit_potential_decay=0.25,
)
rng = np.random.default_rng(321)
- last_potential = 0.0
+ prev_potential = 0.0
shaping_sum = 0.0
for _ in range(SCENARIOS.MONTE_CARLO_ITERATIONS):
next_pnl=next_pnl,
next_duration_ratio=next_dur,
is_exit=is_exit,
- last_potential=last_potential,
+ prev_potential=prev_potential,
params=params,
)
)
shaping_sum += shap
- last_potential = 0.0 if is_exit else next_pot
+ prev_potential = 0.0 if is_exit else next_pot
self.assertGreater(
abs(shaping_sum),
PBRS_INVARIANCE_TOL * 50,
# Owns invariant: robustness-exit-pnl-only-117 (robustness category)
def test_pnl_invariant_exit_only(self):
- """Invariant: only exit actions have non-zero PnL (robustness category)."""
+ """Invariant: PnL only non-zero while in position.
+
+ The simulator uses coherent trajectories, so PnL is a state variable during
+ holds and entries; however Neutral samples must have pnl == 0.
+ """
df = simulate_samples(
params=self.base_params(max_trade_duration_candles=50),
num_samples=200,
pnl_base_std=PARAMS.PNL_STD,
pnl_duration_vol_scale=PARAMS.PNL_DUR_VOL_SCALE,
)
- total_pnl = df["pnl"].sum()
- exit_mask = df["reward_exit"] != 0
- exit_pnl_sum = df.loc[exit_mask, "pnl"].sum()
- self.assertAlmostEqual(
- total_pnl,
- exit_pnl_sum,
- places=TOLERANCE.DECIMAL_PLACES_STANDARD,
- msg="PnL invariant violation: total PnL != sum of exit PnL",
- )
- non_zero_pnl_actions = set(np.unique(df[df["pnl"].abs() > np.finfo(float).eps]["action"]))
- expected_exit_actions = {2.0, 4.0}
- self.assertTrue(
- non_zero_pnl_actions.issubset(expected_exit_actions),
- f"Non-exit actions have PnL: {non_zero_pnl_actions - expected_exit_actions}",
+ neutral_mask = df["position"] == float(Positions.Neutral.value)
+ non_zero_neutral_pnl = df.loc[neutral_mask, "pnl"].abs().max()
+ self.assertLessEqual(
+ float(non_zero_neutral_pnl),
+ np.finfo(float).eps,
+ msg="PnL invariant violation: neutral states must have pnl == 0",
)
- invalid_combinations = df[
- (df["pnl"].abs() <= np.finfo(float).eps) & (df["reward_exit"] != 0)
- ]
- self.assertEqual(len(invalid_combinations), 0)
def test_exit_factor_comprehensive(self):
"""Comprehensive exit factor test: mathematical correctness and monotonic attenuation."""
iters = iterations or self.PBRS_SWEEP_ITER
term_p = terminal_prob or self.PBRS_TERMINAL_PROB
rng = np.random.default_rng(seed)
- last_potential = 0.0
+ prev_potential = 0.0
terminal_next: list[float] = []
shaping_vals: list[float] = []
current_pnl = 0.0
next_duration_ratio=next_dur,
is_exit=is_exit,
is_entry=False,
- last_potential=last_potential,
+ prev_potential=prev_potential,
params=params,
)
)
shaping_vals.append(shap_val)
if is_exit:
terminal_next.append(next_pot)
- last_potential = 0.0
+ prev_potential = 0.0
current_pnl = 0.0
current_dur = 0.0
else:
- last_potential = next_pot
+ prev_potential = next_pot
current_pnl = next_pnl
current_dur = next_dur
return (terminal_next, shaping_vals)