The exit factor is computed as:
-`exit_factor` = `base_factor `× `time_attenuation_coefficient` × `pnl_coefficient`
-where:
-`pnl_coefficient` = `pnl_target_coefficient` × `efficiency_coefficient`
+`exit_factor` = `base_factor ` × `pnl_target_coefficient` × `efficiency_coefficient` × `time_attenuation_coefficient`
##### PnL Target
| `hold_potential_transform_pnl` | tanh | PnL transform |
| `hold_potential_transform_duration` | tanh | Duration transform |
+**Hold Potential Formula:**
+
+The hold potential combines PnL and duration signals with an asymmetric duration
+multiplier for loss-side holds:
+
+```
+Φ_hold(s) = scale · 0.5 · [T_pnl(g·r_pnl) + sign(r_pnl)·m_dur·T_dur(g·r_dur)]
+```
+
+where:
+
+- `r_pnl = pnl / pnl_target`
+- `r_dur = clamp(duration_ratio, 0, 1)`
+- `g = hold_potential_gain`
+- `T_pnl`, `T_dur` = configured transforms
+- `m_dur = 1.0` if `r_pnl >= 0` (profit side)
+- `m_dur = risk_reward_ratio` if `r_pnl < 0` (loss side)
+
+The loss-side duration multiplier (`m_dur = risk_reward_ratio`) scales the
+duration penalty when holding losing positions, encouraging faster exits from
+losses compared to symmetric treatment.
+
#### Entry Additive (Optional)
| Parameter | Default | Description |
`--params` wins on conflicts.
-**Simulation-only keys** (not allowed in `--params`): `num_samples`, `seed`,
+**Simulation** (not allowed in `--params`): `num_samples`, `seed`,
`trading_mode`, `max_duration_ratio`, `out_dir`, `stats_seed`, `pnl_base_std`,
`pnl_duration_vol_scale`, `real_episodes`, `unrealized_pnl`,
`strict_diagnostics`, `strict_validation`, `bootstrap_resamples`,
`skip_feature_analysis`, `skip_partial_dependence`, `rf_n_jobs`, `perm_n_jobs`,
`pvalue_adjust`.
-**Hybrid simulation scalars** allowed in `--params`: `profit_aim`,
+**Hybrid simulation/params** allowed in `--params`: `profit_aim`,
`risk_reward_ratio`, `action_masking`.
**Reward tunables** (tunable via either direct flag or `--params`) correspond to
is_entry=is_entry,
prev_potential=prev_potential,
params=params,
+ risk_reward_ratio=risk_reward_ratio,
)
)
return float(pnl)
+def _loss_duration_multiplier(pnl_ratio: float, risk_reward_ratio: float) -> float:
+ """Return duration multiplier for loss-side holds."""
+
+ if not np.isfinite(pnl_ratio) or pnl_ratio >= 0.0:
+ return 1.0
+ if not np.isfinite(risk_reward_ratio) or risk_reward_ratio <= 0.0:
+ return 1.0
+ return float(risk_reward_ratio)
+
+
def _compute_hold_potential(
pnl: float,
pnl_target: float,
duration_ratio: float,
+ risk_reward_ratio: float,
params: RewardParams,
) -> float:
- """Compute PBRS hold potential Φ(s) = scale · 0.5 · [T_pnl(g · pnl_ratio) + sign(pnl_ratio) · T_dur(g · duration_ratio)].
-
- Args:
- pnl: Current unrealized profit/loss
- pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
- duration_ratio: Trade duration relative to target duration
- params: Reward configuration parameters
-
- Returns:
- float: Hold potential value (0.0 if disabled or invalid)
- """
+ """Compute PBRS hold potential Φ(s)."""
if not _get_bool_param(
params,
"hold_potential_enabled",
bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)),
):
return _fail_safely("hold_potential_disabled")
+
return _compute_bi_component(
kind="hold_potential",
pnl=pnl,
transform_pnl_key="hold_potential_transform_pnl",
transform_dur_key="hold_potential_transform_duration",
non_finite_key="non_finite_hold_potential",
+ risk_reward_ratio=risk_reward_ratio,
)
next_duration_ratio: float,
params: RewardParams,
*,
+ risk_reward_ratio: float,
is_exit: bool = False,
is_entry: bool = False,
prev_potential: float,
Canonical PBRS Formula
----------------------
- R'(s,a,s') = R(s,a,s') + γ·Φ(s') - Φ(s)
+ R'(s,a,s') = R(s,a,s') + Î\94(s,a,s')
where:
Δ(s,a,s') = γ·Φ(s') - Φ(s) (PBRS shaping term)
- Notation
- --------
- **States & Actions:**
- s : current state
- s' : next state
- a : action
-
- **Reward Components:**
- R(s,a,s') : base reward
- R'(s,a,s') : shaped reward
- Δ(s,a,s') : PBRS shaping term = γ·Φ(s') - Φ(s)
-
- **Potential Function:**
- Φ(s) : potential at state s
- γ : discount factor for shaping (gamma)
-
- **State Variables:**
- r_pnl : pnl / pnl_target (PnL ratio)
- r_dur : duration / max_duration (duration ratio, clamp [0,1])
- g : gain parameter
- T_x : transform function (tanh, softsign, etc.)
-
- **Potential Formula:**
- Φ(s) = scale · 0.5 · [T_pnl(g·r_pnl) + sgn(r_pnl)·T_dur(g·r_dur)]
-
- PBRS Theory & Compliance
- ------------------------
- - Ng et al. 1999: potential-based shaping preserves optimal policy
- - Wiewiora et al. 2003: terminal states must have Φ(terminal) = 0
- - Invariance holds ONLY in canonical mode with additives disabled
- - Theorem: Canonical + no additives ⇒ Σ_t γ^t·Δ_t = 0 over episodes
-
- Architecture & Transitions
- --------------------------
- **Three mutually exclusive transition types:**
-
- 1. **Entry** (Neutral → Long/Short):
- - Φ(s) = 0 (neutral state has no potential)
- - Φ(s') = hold_potential(s')
- - Δ(s,a,s') = γ·Φ(s') - 0 = γ·Φ(s')
- - Optional entry additive (breaks invariance)
-
- 2. **Hold** (Long/Short → Long/Short):
- - Φ(s) = hold_potential(s)
- - Φ(s') = hold_potential(s')
- - Δ(s,a,s') = γ·Φ(s') - Φ(s)
- - Φ(s') reflects updated PnL and duration
-
- 3. **Exit** (Long/Short → Neutral):
- - Φ(s) = hold_potential(s)
- - Φ(s') depends on exit_potential_mode:
- * **canonical**: Φ(s') = 0 → Δ = -Φ(s)
- * **heuristic**: Φ(s') = f(Φ(s)) → Δ = γ·Φ(s') - Φ(s)
- - Optional exit additive (breaks invariance)
-
- Exit Potential Modes
- --------------------
- **canonical** (PBRS-compliant):
- Φ(s') = 0
- Δ = γ·0 - Φ(s) = -Φ(s)
- Additives disabled automatically
-
- **non_canonical**:
- Φ(s') = 0
- Δ = -Φ(s)
- Additives allowed (breaks invariance)
-
- **progressive_release** (heuristic):
- Φ(s') = Φ(s)·(1 - d) where d = decay_factor
- Δ = γ·Φ(s)·(1-d) - Φ(s)
-
- **spike_cancel** (heuristic):
- Φ(s') = Φ(s)/γ
- Δ = γ·(Φ(s)/γ) - Φ(s) = 0
-
- **retain_previous** (heuristic):
- Φ(s') = Φ(s)
- Δ = γ·Φ(s) - Φ(s) = (γ-1)·Φ(s)
-
- Additive Terms (Non-PBRS)
- --------------------------
- Entry and exit additives are **optional bonuses** that break PBRS invariance:
- - Entry additive: applied on Neutral→Long/Short transitions
- - Exit additive: applied on Long/Short→Neutral transitions
- - These do NOT persist in Φ(s) storage
-
- Invariance & Validation
- -----------------------
- **Theoretical Guarantee:**
- Canonical + no additives ⇒ Σ_t γ^t·Δ_t = 0
- (Φ(start) = Φ(end) = 0)
-
- **Deviations from Theory:**
- - Heuristic exit modes violate invariance
- - Entry/exit additives break policy invariance
- - Non-canonical modes introduce path dependence
-
- **Robustness:**
- - All transforms bounded: |T_x| ≤ 1
- - Validation: |Φ(s)| ≤ scale
- - Bounds: |Δ(s,a,s')| ≤ (1+γ)·scale
- - Terminal enforcement: Φ(s) = 0 when terminated
-
- Implementation Details
+ Hold Potential Formula
----------------------
- This is a stateless pure function for analysis and testing:
- - All state (Φ(s), γ, configuration) passed explicitly as parameters
- - Returns diagnostic values (next_potential, pbrs_delta) for inspection
- - Does not mutate any inputs
- - Suitable for batch processing and unit testing
-
- For production RL environment use, see ReforceXY._compute_pbrs_components()
- which wraps this logic with stateful management (self._last_potential, etc.)
-
- Parameters
- ----------
- current_pnl : float
- Current state s PnL
- pnl_target : float
- Target PnL for ratio normalization: r_pnl = pnl / pnl_target
- current_duration_ratio : float
- Current state s duration ratio [0,1]: r_dur = duration / max_duration
- next_pnl : float
- Next state s' PnL (after action)
- next_duration_ratio : float
- Next state s' duration ratio [0,1]
- params : RewardParams
- Configuration dictionary with keys:
- - potential_gamma: γ (shaping discount factor)
- - exit_potential_mode: "canonical" | "non_canonical" | heuristic modes
- - hold_potential_enabled: enable/disable hold potential computation
- - entry_additive_enabled, exit_additive_enabled: enable non-PBRS additives
- - hold_potential_scale, hold_potential_gain, transforms, etc.
- is_exit : bool, optional
- True if this is an exit transition (Long/Short → Neutral)
- is_entry : bool, optional
- True if this is an entry transition (Neutral → Long/Short)
- prev_potential : float
- Φ(s) - potential at current state s (must be passed explicitly)
-
- Returns
- -------
- tuple[float, float, float, float, float]
- (reward_shaping, next_potential, pbrs_delta, entry_additive, exit_additive)
-
- - reward_shaping: Δ(s,a,s') = γ·Φ(s') - Φ(s), the PBRS shaping term
- - next_potential: Φ(s') for next step (caller must store this)
- - pbrs_delta: same as reward_shaping (diagnostic/compatibility)
- - entry_additive: optional non-PBRS entry bonus (0.0 if disabled or not entry)
- - exit_additive: optional non-PBRS exit bonus (0.0 if disabled or not exit)
-
- Notes
- -----
- **State Management:**
- - Caller is responsible for storing Φ(s') (returned as next_potential)
- - No internal state; pure function
-
- **Configuration:**
- - All parameters read from params dict
- - Use DEFAULT_MODEL_REWARD_PARAMETERS for defaults
-
- **Recommendations:**
- - Use canonical mode for policy-invariant shaping
- - Monitor Σ_t γ^t·Δ_t ≈ 0 per episode in canonical mode
- - Disable additives to preserve theoretical PBRS guarantees
-
- **Validation:**
- - Returns (0,0,0,0,0) if any output is non-finite
- - Transform bounds ensure |Φ| ≤ scale
-
- See Also
- --------
- ReforceXY._compute_pbrs_components : Stateful wrapper for RL environment
- apply_potential_shaping : Deprecated wrapper that adds base_reward
+ Let:
+ r_pnl = pnl / pnl_target
+ r_dur = clamp(duration_ratio, 0, 1)
+ g = gain
+ T_pnl, T_dur = configured bounded transforms
+ m_dur = 1.0 if r_pnl >= 0 else loss_duration_multiplier(r_pnl, risk_reward_ratio)
+
+ Then:
+ Φ_hold(s) = scale · 0.5 · [T_pnl(g·r_pnl) + sign(r_pnl)·m_dur·T_dur(g·r_dur)]
"""
gamma = _get_potential_gamma(params)
reward_shaping = 0.0
else:
next_potential = _compute_hold_potential(
- next_pnl, pnl_target, next_duration_ratio, params
+ next_pnl,
+ pnl_target,
+ next_duration_ratio,
+ risk_reward_ratio,
+ params,
)
pbrs_delta = gamma * next_potential - prev_potential
reward_shaping = pbrs_delta
next_duration_ratio: float,
params: RewardParams,
*,
+ risk_reward_ratio: float,
is_exit: bool = False,
is_entry: bool = False,
prev_potential: float,
next_pnl,
next_duration_ratio,
params,
+ risk_reward_ratio=risk_reward_ratio,
is_exit=is_exit,
is_entry=is_entry,
prev_potential=prev_potential,
transform_pnl_key: str,
transform_dur_key: str,
non_finite_key: str,
+ *,
+ risk_reward_ratio: Optional[float] = None,
) -> float:
"""Generic helper for (pnl, duration) bi-component transforms."""
if not (np.isfinite(pnl) and np.isfinite(pnl_target) and np.isfinite(duration_ratio)):
transform_pnl = _get_str_param(params, transform_pnl_key, "tanh")
transform_duration = _get_str_param(params, transform_dur_key, "tanh")
+ duration_multiplier = 1.0
+ if risk_reward_ratio is not None:
+ duration_multiplier = _loss_duration_multiplier(pnl_ratio, risk_reward_ratio)
+ if not np.isfinite(duration_multiplier) or duration_multiplier < 0.0:
+ duration_multiplier = 1.0
+
t_pnl = apply_transform(transform_pnl, gain * pnl_ratio)
t_dur = apply_transform(transform_duration, gain * duration_ratio)
- value = scale * 0.5 * (t_pnl + np.sign(pnl_ratio) * t_dur)
+ value = scale * 0.5 * (t_pnl + np.sign(pnl_ratio) * duration_multiplier * t_dur)
if not np.isfinite(value):
return _fail_safely(non_finite_key)
return float(value)
"current_duration_ratio": 0.2,
"next_pnl": 0.012,
"next_duration_ratio": 0.25,
+ "risk_reward_ratio": PARAMS.RISK_REWARD_RATIO,
"is_entry": True,
"is_exit": False,
}
"hold_potential_transform_duration": "tanh",
}
val = _compute_hold_potential(
- 0.5, PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO, 0.3, params
+ 0.5,
+ PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
+ 0.3,
+ PARAMS.RISK_REWARD_RATIO,
+ params,
)
self.assertFinite(val, name="hold_potential")
current_dur = 0.5
profit_aim = PARAMS.PROFIT_AIM
prev_potential = _compute_hold_potential(
- current_pnl, profit_aim * PARAMS.RISK_REWARD_RATIO, current_dur, params
+ current_pnl,
+ profit_aim * PARAMS.RISK_REWARD_RATIO,
+ current_dur,
+ PARAMS.RISK_REWARD_RATIO,
+ params,
)
(
_total_reward,
current_duration_ratio=current_dur,
next_pnl=0.0,
next_duration_ratio=0.0,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=True,
is_entry=False,
prev_potential=prev_potential,
current_dur = 0.4
profit_aim = PARAMS.PROFIT_AIM
prev_potential = _compute_hold_potential(
- current_pnl, profit_aim * PARAMS.RISK_REWARD_RATIO, current_dur, params
+ current_pnl,
+ profit_aim * PARAMS.RISK_REWARD_RATIO,
+ current_dur,
+ PARAMS.RISK_REWARD_RATIO,
+ params,
)
gamma = _get_float_param(
params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95)
current_duration_ratio=current_dur,
next_pnl=0.0,
next_duration_ratio=0.0,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=True,
is_entry=False,
prev_potential=prev_potential,
current_duration_ratio=0.0,
next_pnl=0.01,
next_duration_ratio=0.0,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=False,
is_entry=True,
prev_potential=0.42,
current_duration_ratio=0.4,
next_pnl=0.02,
next_duration_ratio=0.41,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=False,
is_entry=False,
prev_potential=0.5,
current_duration_ratio=current_duration_ratio,
next_pnl=next_pnl,
next_duration_ratio=next_duration_ratio,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=True,
is_entry=False,
prev_potential=0.789,
current_duration_ratio=0.0,
next_pnl=0.02,
next_duration_ratio=0.0,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=False,
is_entry=True,
prev_potential=0.0,
self.assertNearZero(entry_additive, atol=TOLERANCE.IDENTITY_STRICT)
self.assertNearZero(exit_additive_entry, atol=TOLERANCE.IDENTITY_STRICT)
+ current_pnl = 0.02
+ current_dur = 0.5
+ profit_aim = PARAMS.PROFIT_AIM
+ prev_potential = _compute_hold_potential(
+ current_pnl,
+ profit_aim * PARAMS.RISK_REWARD_RATIO,
+ current_dur,
+ PARAMS.RISK_REWARD_RATIO,
+ params,
+ )
+
(
_total_exit,
_shaping_exit,
exit_additive,
) = apply_potential_shaping(
base_reward=0.0,
- current_pnl=0.02,
- pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
- current_duration_ratio=0.5,
+ current_pnl=current_pnl,
+ pnl_target=profit_aim * PARAMS.RISK_REWARD_RATIO,
+ current_duration_ratio=current_dur,
next_pnl=0.0,
next_duration_ratio=0.0,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=True,
is_entry=False,
- prev_potential=0.4,
+ prev_potential=prev_potential,
params=params,
)
+
self.assertNearZero(entry_additive_exit, atol=TOLERANCE.IDENTITY_STRICT)
self.assertNearZero(exit_additive, atol=TOLERANCE.IDENTITY_STRICT)
current_duration_ratio=0.0,
next_pnl=0.0,
next_duration_ratio=0.0,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=True,
prev_potential=prev_potential,
params=params,
current_duration_ratio=0.2,
next_pnl=0.035,
next_duration_ratio=0.25,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=False,
prev_potential=0.0,
params=params_nan,
current_duration_ratio=0.2,
next_pnl=0.035,
next_duration_ratio=0.25,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=False,
prev_potential=0.0,
params=params_ref,
pnl=ctx.pnl,
pnl_target=PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
duration_ratio=(trade_duration / max_trade_duration_candles),
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
params=params,
)
self.assertAlmostEqualFloat(
ctx_dur_ratio = 0.3
params_can = self.base_params(exit_potential_mode="canonical", **base_common)
prev_phi = _compute_hold_potential(
- ctx_pnl, PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO, ctx_dur_ratio, params_can
+ ctx_pnl,
+ PARAMS.PROFIT_AIM * PARAMS.RISK_REWARD_RATIO,
+ ctx_dur_ratio,
+ PARAMS.RISK_REWARD_RATIO,
+ params_can,
)
self.assertFinite(prev_phi, name="prev_phi")
next_phi_can = _compute_exit_potential(prev_phi, params_can)
current_duration_ratio=0.3,
next_pnl=0.0,
next_duration_ratio=0.0,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=True,
is_entry=False,
prev_potential=prev_potential,
current_duration_ratio = ctx.trade_duration / params["max_trade_duration_candles"]
prev_potential = _compute_hold_potential(
- ctx.pnl, pnl_target, current_duration_ratio, params
+ ctx.pnl,
+ pnl_target,
+ current_duration_ratio,
+ PARAMS.RISK_REWARD_RATIO,
+ params,
)
self.assertNotEqual(prev_potential, 0.0)
current_duration_ratio=0.3,
next_pnl=0.025,
next_duration_ratio=0.35,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=False,
prev_potential=0.0,
params=params,
current_duration_ratio=current_dur,
next_pnl=next_pnl,
next_duration_ratio=next_dur,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=is_exit,
prev_potential=prev_potential,
params=params,
current_duration_ratio=float(rng.uniform(0, 1)),
next_pnl=next_pnl,
next_duration_ratio=next_dur,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=is_exit,
prev_potential=prev_potential,
params=params,
current_duration_ratio=current_dur,
next_pnl=next_pnl,
next_duration_ratio=next_dur,
+ risk_reward_ratio=PARAMS.RISK_REWARD_RATIO,
is_exit=is_exit,
is_entry=False,
prev_potential=prev_potential,
self._model_params_cache: Optional[Dict[str, Any]] = None
self.unset_unsupported()
+ model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
+ profit_aim = float(model_reward_parameters.get("profit_aim", np.nan))
+ rr = float(model_reward_parameters.get("rr", np.nan))
+ if (
+ (not np.isfinite(profit_aim))
+ or (profit_aim <= 0.0)
+ or np.isclose(profit_aim, 0.0)
+ ):
+ raise ValueError(
+ f"Invalid profit_aim={profit_aim:.12g}; expected a finite value > 0"
+ )
+ if (not np.isfinite(rr)) or (rr <= 0.0) or np.isclose(rr, 0.0):
+ raise ValueError(f"Invalid rr={rr:.12g}; expected a finite value > 0")
+
+ pnl_target = profit_aim * rr
+ if (
+ (not np.isfinite(pnl_target))
+ or (pnl_target <= 0.0)
+ or np.isclose(pnl_target, 0.0)
+ ):
+ raise ValueError(
+ f"Invalid pnl_target={pnl_target:.12g} computed from profit_aim={profit_aim:.12g} and rr={rr:.12g}"
+ )
+
@staticmethod
def _normalize_position(position: Any) -> Positions:
if isinstance(position, Positions):
self.add_state_info = True
self._set_observation_space()
- # === PNL TARGET VALIDATION ===
- pnl_target = self.profit_aim * self.rr
- if MyRLEnv._is_invalid_pnl_target(pnl_target):
- raise ValueError(
- f"Invalid pnl_target={pnl_target:.12g} computed from profit_aim={self.profit_aim:.12g} and rr={self.rr:.12g}"
- )
- self._pnl_target = pnl_target
+ # === PNL TARGET ===
+ self._pnl_target = float(self.profit_aim * self.rr)
def _get_next_position(self, action: int) -> Positions:
if action == Actions.Long_enter.value and self._position == Positions.Neutral:
return next_position, 0, 0.0
@staticmethod
- def _is_invalid_pnl_target(pnl_target: float) -> bool:
- """Return True when pnl_target is non-finite, <= 0, or effectively zero within tolerance."""
- return (
- (not np.isfinite(pnl_target))
- or (pnl_target <= 0.0)
- or np.isclose(pnl_target, 0.0)
- )
+ def _loss_duration_multiplier(pnl_ratio: float, risk_reward_ratio: float) -> float:
+ if not np.isfinite(pnl_ratio) or pnl_ratio >= 0.0:
+ return 1.0
+ if not np.isfinite(risk_reward_ratio) or risk_reward_ratio <= 0.0:
+ return 1.0
+ return float(risk_reward_ratio)
def _compute_pnl_duration_signal(
self,
gain: float,
transform_pnl: TransformFunction,
transform_duration: TransformFunction,
+ risk_reward_ratio: Optional[float] = None,
) -> float:
- """Generic bounded bi-component signal combining PnL and duration.
-
- Shared logic for:
- - Hold potential Φ(s)
- - Entry additive
- - Exit additive
-
- Parameters
- ----------
- enabled : bool
- Whether this signal is active
- require_position : bool
- If True, only compute when position in (Long, Short)
- position : Positions
- Current position
- pnl : float
- Current position PnL
- pnl_target : float
- Target PnL for normalization
- duration_ratio : float
- Raw duration ratio
- scale : float
- Output scaling factor
- gain : float
- Gain multiplier before transform
- transform_pnl : TransformFunction
- Transform name for PnL component
- transform_duration : TransformFunction
- Transform name for duration component
-
- Returns
- -------
- float
- Bounded signal in [-scale, scale]
- """
+ """Generic bounded bi-component signal combining PnL and duration."""
if not enabled:
return 0.0
if require_position and position not in (Positions.Long, Positions.Short):
except Exception:
return 0.0
+ duration_multiplier = 1.0
+ if risk_reward_ratio is not None:
+ duration_multiplier = self._loss_duration_multiplier(
+ pnl_ratio,
+ risk_reward_ratio,
+ )
+ if not np.isfinite(duration_multiplier) or duration_multiplier < 0.0:
+ duration_multiplier = 1.0
+
pnl_term = self._potential_transform(transform_pnl, gain * pnl_ratio)
dur_term = self._potential_transform(transform_duration, gain * duration_ratio)
- value = scale * 0.5 * (pnl_term + np.sign(pnl_ratio) * dur_term)
+ value = (
+ scale
+ * 0.5
+ * (pnl_term + np.sign(pnl_ratio) * duration_multiplier * dur_term)
+ )
return float(value) if np.isfinite(value) else 0.0
def _compute_hold_potential(
gain=self._hold_potential_gain,
transform_pnl=self._hold_potential_transform_pnl,
transform_duration=self._hold_potential_transform_duration,
+ risk_reward_ratio=self.rr,
)
def _compute_exit_additive(
Canonical PBRS Formula
----------------------
- R'(s,a,s') = R(s,a,s') + γ·Φ(s') - Φ(s)
+ R'(s,a,s') = R(s,a,s') + γ·Φ(s') - Î\94(s,a,s')
where:
Δ(s,a,s') = γ·Φ(s') - Φ(s) (PBRS shaping term)
g : gain parameter
T_x : transform function (tanh, softsign, etc.)
- **Potential Formula:**
- Φ(s) = scale · 0.5 · [T_pnl(g·r_pnl) + sgn(r_pnl)·T_dur(g·r_dur)]
+ **Hold Potential Formula:**
+ m_dur = 1.0 if r_pnl >= 0 else loss_duration_multiplier(r_pnl, rr)
+ Φ(s) = scale · 0.5 · [T_pnl(g·r_pnl) + sgn(r_pnl)·m_dur·T_dur(g·r_dur)]
PBRS Theory & Compliance
------------------------
- Use canonical mode for policy-invariant shaping
- Monitor Σ_t γ^t·Δ_t ≈ 0 per episode in canonical mode
- Disable additives to preserve theoretical PBRS guarantees
-
- See Also
- --------
- reward_space_analysis.compute_pbrs_components : Stateless version for analysis
"""
prev_potential = float(self._last_potential)
model_reward_parameters: Mapping[str, Any],
) -> float:
"""
- Compute exit factor: base_factor × time_attenuation_coefficient × pnl_coefficient.
+ Compute exit factor: base_factor × time_attenuation_coefficient x pnl_target_coefficient x efficiency_coefficient.
"""
if not (
np.isfinite(base_factor)