- [Quick Start](#quick-start)
- [Prerequisites](#prerequisites)
- [Common Use Cases](#common-use-cases)
+ - [1. Validate Reward Logic](#1-validate-reward-logic)
+ - [2. Parameter Sensitivity](#2-parameter-sensitivity)
+ - [3. Debug Anomalies](#3-debug-anomalies)
+ - [4. Real vs Synthetic](#4-real-vs-synthetic)
- [CLI Parameters](#cli-parameters)
- [Simulation & Environment](#simulation--environment)
- [Hybrid Simulation Scalars](#hybrid-simulation-scalars)
- [Overrides vs --params](#overrides-vs--params)
- [Examples](#examples)
- [Outputs](#outputs)
+ - [Main Report (`statistical_analysis.md`)](#main-report-statistical_analysismd)
+ - [Data Exports](#data-exports)
+ - [Manifest (`manifest.json`)](#manifest-manifestjson)
+ - [Distribution Shift Metrics](#distribution-shift-metrics)
- [Advanced Usage](#advanced-usage)
- [Parameter Sweeps](#parameter-sweeps)
- - [PBRS Rationale](#pbrs-rationale)
+ - [PBRS Configuration](#pbrs-configuration)
- [Real Data Comparison](#real-data-comparison)
- [Batch Analysis](#batch-analysis)
- [Testing](#testing)
- [Troubleshooting](#troubleshooting)
+ - [No Output Files](#no-output-files)
+ - [Unexpected Reward Values](#unexpected-reward-values)
+ - [Slow Execution](#slow-execution)
+ - [Memory Errors](#memory-errors)
## Prerequisites
- **`--strict_diagnostics`** (flag, default: false) – Fail-fast on degenerate
statistical diagnostics (zero-width CIs, undefined distribution metrics)
instead of graceful fallbacks.
-- **`--exit_factor_threshold`** (float, default: 10000.0) – Warn if exit factor
- exceeds threshold.
+- **`--exit_factor_threshold`** (float, default: 1000.0) – Emits a warning if the absolute value of the exit factor exceeds the threshold.
- **`--pvalue_adjust`** (none|benjamini_hochberg, default: none) – Multiple
testing p-value adjustment method.
- **`--bootstrap_resamples`** (int, default: 10000) – Bootstrap iterations for
#### Core
-| Parameter | Default | Description |
-| ------------------- | ------- | --------------------------- |
-| `base_factor` | 100.0 | Base reward scale |
-| `invalid_action` | -2.0 | Penalty for invalid actions |
-| `win_reward_factor` | 2.0 | Profit overshoot multiplier |
-| `pnl_factor_beta` | 0.5 | PnL amplification beta |
+| Parameter | Default | Description |
+| ---------------- | ------- | --------------------------- |
+| `base_factor` | 100.0 | Base reward scale |
+| `invalid_action` | -2.0 | Penalty for invalid actions |
-#### Duration Penalties
+#### Exit Factor
-| Parameter | Default | Description |
-| ---------------------------- | ------- | -------------------------- |
-| `max_trade_duration_candles` | 128 | Trade duration cap |
-| `max_idle_duration_candles` | None | Fallback 4× trade duration |
-| `idle_penalty_scale` | 0.5 | Idle penalty scale |
-| `idle_penalty_power` | 1.025 | Idle penalty exponent |
-| `hold_penalty_scale` | 0.25 | Hold penalty scale |
-| `hold_penalty_power` | 1.025 | Hold penalty exponent |
+The exit factor is computed as:
-#### Exit Attenuation
+`exit_factor` = `base_factor `× `time_attenuation_coefficient` × `pnl_coefficient`
+where:
+`pnl_coefficient` = `pnl_target_coefficient` × `efficiency_coefficient`
-| Parameter | Default | Description |
-| ----------------------- | ------- | ------------------------------ |
-| `exit_attenuation_mode` | linear | Kernel mode |
-| `exit_plateau` | true | Flat region before attenuation |
-| `exit_plateau_grace` | 1.0 | Plateau grace ratio |
-| `exit_linear_slope` | 1.0 | Linear slope |
-| `exit_power_tau` | 0.5 | Power kernel tau (0,1] |
-| `exit_half_life` | 0.5 | Half-life for half_life kernel |
+##### PnL Target
+
+| Parameter | Default | Description |
+| ------------------- | ------- | ----------------------------- |
+| `profit_target` | 0.03 | Target profit threshold |
+| `risk_reward_ratio` | 1.0 | Risk/reward multiplier |
+| `win_reward_factor` | 2.0 | Profit overshoot bonus factor |
+| `pnl_factor_beta` | 0.5 | PnL amplification sensitivity |
-#### Efficiency
+**Note:** In ReforceXY, `profit_target` maps to `profit_aim` and `risk_reward_ratio` maps to `rr`.
+
+**Formula:**
+
+Let `pnl_target = profit_target × risk_reward_ratio`, `pnl_ratio = pnl / pnl_target`.
+
+- If `pnl_target ≤ 0`: `pnl_target_coefficient = 1.0`
+- If `pnl_ratio > 1.0`:
+ `pnl_target_coefficient = 1.0 + win_reward_factor × tanh(pnl_factor_beta × (pnl_ratio − 1.0))`
+- If `pnl_ratio < −(1.0 / risk_reward_ratio)`:
+ `pnl_target_coefficient = 1.0 + (win_reward_factor × risk_reward_ratio) × tanh(pnl_factor_beta × (|pnl_ratio| − 1.0))`
+- Else: `pnl_target_coefficient = 1.0`
+
+##### Efficiency
| Parameter | Default | Description |
| ------------------- | ------- | ------------------------------ |
| `efficiency_weight` | 1.0 | Efficiency contribution weight |
| `efficiency_center` | 0.5 | Efficiency pivot in [0,1] |
-**Formula (unrealized profit normalization):**
+**Formula:**
Let `max_u = max_unrealized_profit`, `min_u = min_unrealized_profit`,
`range = max_u - min_u`, `ratio = (pnl - min_u)/range`. Then:
- If `pnl > 0`:
- `efficiency_factor = 1 + efficiency_weight * (ratio - efficiency_center)`
+ `efficiency_coefficient = 1 + efficiency_weight * (ratio - efficiency_center)`
- If `pnl < 0`:
- `efficiency_factor = 1 + efficiency_weight * (efficiency_center - ratio)`
-- Else: `efficiency_factor = 1`
+ `efficiency_coefficient = 1 + efficiency_weight * (efficiency_center - ratio)`
+- Else: `efficiency_coefficient = 1`
+
+##### Exit Attenuation
-Final exit multiplier path: `exit_reward = pnl * exit_factor`, where
-`exit_factor = kernel(base_factor, duration_ratio_adjusted) * pnl_factor` and
-`pnl_factor` includes the `efficiency_factor` above.
+| Parameter | Default | Description |
+| ----------------------- | ------- | ------------------------------ |
+| `exit_attenuation_mode` | linear | Kernel mode |
+| `exit_plateau` | true | Flat region before attenuation |
+| `exit_plateau_grace` | 1.0 | Plateau grace ratio |
+| `exit_linear_slope` | 1.0 | Linear slope |
+| `exit_power_tau` | 0.5 | Power kernel tau (0,1] |
+| `exit_half_life` | 0.5 | Half-life for half_life kernel |
+
+**Formula:**
+
+`time_attenuation_coefficient = kernel_function(duration_ratio)`
+
+where `kernel_function` depends on `exit_attenuation_mode`. See [Exit Attenuation Kernels](#exit-attenuation-kernels) for detailed formulas.
+
+#### Duration Penalties
+
+| Parameter | Default | Description |
+| ---------------------------- | ------- | -------------------------- |
+| `max_trade_duration_candles` | 128 | Trade duration cap |
+| `max_idle_duration_candles` | None | Fallback 4× trade duration |
+| `idle_penalty_scale` | 0.5 | Idle penalty scale |
+| `idle_penalty_power` | 1.025 | Idle penalty exponent |
+| `hold_penalty_scale` | 0.25 | Hold penalty scale |
+| `hold_penalty_power` | 1.025 | Hold penalty exponent |
#### Validation
| Parameter | Default | Description |
| ----------------------- | ------- | --------------------------------- |
| `check_invariants` | true | Invariant enforcement (see above) |
-| `exit_factor_threshold` | 10000.0 | Warn on excessive factor |
+| `exit_factor_threshold` | 1000.0 | Warn on excessive factor |
#### PBRS (Potential-Based Reward Shaping)
r* = r if not exit_plateau
```
-| Mode | Multiplier applied to base_factor \* pnl \* pnl_factor \* efficiency_factor | Monotonic | Notes | Use Case |
-| --------- | --------------------------------------------------------------------------- | --------- | ------------------------------------------- | ------------------------------------ |
-| legacy | step: ×1.5 if r\* ≤ 1 else ×0.5 | No | Non-monotonic legacy mode (not recommended) | Backward compatibility only |
-| sqrt | 1 / sqrt(1 + r\*) | Yes | Sub-linear decay | Gentle long-trade penalty |
-| linear | 1 / (1 + slope \* r\*) | Yes | slope = `exit_linear_slope` | Balanced duration penalty (default) |
-| power | (1 + r\*)^(-alpha) | Yes | alpha = -ln(tau)/ln(2); tau=1 ⇒ alpha=0 | Tunable decay rate via tau parameter |
-| half_life | 2^(- r\* / hl) | Yes | hl = `exit_half_life`; r\*=hl ⇒ factor ×0.5 | Time-based exponential discount |
+| Mode | Formula | Monotonic | Notes | Use Case |
+| --------- | ------------------------------- | --------- | ------------------------------------------- | ------------------------------------ |
+| legacy | step: ×1.5 if r\* ≤ 1 else ×0.5 | No | Non-monotonic legacy mode (not recommended) | Backward compatibility only |
+| sqrt | 1 / sqrt(1 + r\*) | Yes | Sub-linear decay | Gentle long-trade penalty |
+| linear | 1 / (1 + slope \* r\*) | Yes | slope = `exit_linear_slope` | Balanced duration penalty (default) |
+| power | (1 + r\*)^(-alpha) | Yes | alpha = -ln(tau)/ln(2); tau=1 ⇒ alpha=0 | Tunable decay rate via tau parameter |
+| half_life | 2^(- r\* / hl) | Yes | hl = `exit_half_life`; r\*=hl ⇒ factor ×0.5 | Time-based exponential discount |
### Transform Functions
"pnl_factor_beta": 0.5,
# Invariant / safety (env defaults)
"check_invariants": True,
- "exit_factor_threshold": 10000.0,
+ "exit_factor_threshold": 1000.0,
# === PBRS PARAMETERS ===
# Potential-based reward shaping core parameters
# Discount factor γ for potential term (0 ≤ γ ≤ 1)
invariance_correction: float = 0.0
-def _get_exit_factor(
- base_factor: float,
- pnl: float,
- pnl_factor: float,
+def _compute_time_attenuation_coefficient(
duration_ratio: float,
params: RewardParams,
) -> float:
- """Exit factor (kernel + optional plateau) * pnl_factor with invariants."""
- if not np.isfinite(base_factor) or not np.isfinite(pnl) or not np.isfinite(duration_ratio):
- return _fail_safely("non_finite_exit_factor_inputs")
+ """
+ Calculate time-based attenuation coefficient using configurable strategy.
+ Returns a coefficient (typically in range [0.5, 2.0]) to multiply with base_factor.
+ """
if duration_ratio < 0.0:
duration_ratio = 0.0
)
exit_linear_slope = 1.0
- def _legacy_kernel(f: float, dr: float) -> float:
- return f * (1.5 if dr <= 1.0 else 0.5)
+ def _legacy_kernel(dr: float) -> float:
+ return 1.5 if dr <= 1.0 else 0.5
- def _sqrt_kernel(f: float, dr: float) -> float:
- return f / math.sqrt(1.0 + dr)
+ def _sqrt_kernel(dr: float) -> float:
+ return 1.0 / math.sqrt(1.0 + dr)
- def _linear_kernel(f: float, dr: float) -> float:
- return f / (1.0 + exit_linear_slope * dr)
+ def _linear_kernel(dr: float) -> float:
+ return 1.0 / (1.0 + exit_linear_slope * dr)
- def _power_kernel(f: float, dr: float) -> float:
+ def _power_kernel(dr: float) -> float:
tau = _get_float_param(
params,
"exit_power_tau",
stacklevel=2,
)
alpha = 1.0
- return f / math.pow(1.0 + dr, alpha)
+ return 1.0 / math.pow(1.0 + dr, alpha)
- def _half_life_kernel(f: float, dr: float) -> float:
+ def _half_life_kernel(dr: float) -> float:
hl = _get_float_param(
params,
"exit_half_life",
stacklevel=2,
)
return 1.0
- return f * math.pow(2.0, -dr / hl)
+ return math.pow(2.0, -dr / hl)
kernels = {
"legacy": _legacy_kernel,
kernel = _linear_kernel
try:
- attenuation_factor = kernel(base_factor, effective_dr)
+ time_attenuation_coefficient = kernel(effective_dr)
except Exception as e:
warnings.warn(
f"exit_attenuation_mode '{exit_attenuation_mode}' failed ({e!r}); fallback linear (effective_dr={effective_dr:.5f})",
RewardDiagnosticsWarning,
stacklevel=2,
)
- attenuation_factor = _linear_kernel(base_factor, effective_dr)
+ time_attenuation_coefficient = _linear_kernel(effective_dr)
+
+ return time_attenuation_coefficient
+
+
+def _get_exit_factor(
+ base_factor: float,
+ pnl: float,
+ pnl_coefficient: float,
+ duration_ratio: float,
+ params: RewardParams,
+) -> float:
+ """
+ Compute exit reward factor by applying multiplicative coefficients to base_factor.
+
+ Formula: exit_factor = base_factor × time_attenuation_coefficient × pnl_coefficient
+
+ The time_attenuation_coefficient reduces rewards for longer trades, and the
+ pnl_coefficient adjusts rewards based on profit/target ratio and exit timing efficiency.
+
+ Args:
+ base_factor: Base reward value before coefficient adjustments
+ pnl: Realized profit/loss
+ pnl_coefficient: PnL scaling coefficient (already calculated)
+ duration_ratio: Trade duration relative to target duration
+ params: Reward configuration parameters
+
+ Returns:
+ float: Final exit factor (can be negative for losses)
+ """
+ if not np.isfinite(base_factor) or not np.isfinite(pnl) or not np.isfinite(duration_ratio):
+ return _fail_safely("non_finite_exit_factor_inputs")
+
+ time_attenuation_coefficient = _compute_time_attenuation_coefficient(duration_ratio, params)
- exit_factor = attenuation_factor * pnl_factor
+ exit_factor = base_factor * time_attenuation_coefficient * pnl_coefficient
if _get_bool_param(
params,
exit_factor_threshold = _get_float_param(
params,
"exit_factor_threshold",
- DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_factor_threshold", 10000.0),
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_factor_threshold", 1000.0),
)
if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold):
if abs(exit_factor) > exit_factor_threshold:
return exit_factor
-def _get_pnl_factor(
+def _compute_pnl_target_coefficient(
params: RewardParams,
- context: RewardContext,
+ pnl: float,
profit_target: float,
risk_reward_ratio: float,
) -> float:
- """PnL factor: tanh overshoot/loss modulation + efficiency tilt (non-negative)."""
- pnl = context.pnl
- if not np.isfinite(pnl) or not np.isfinite(profit_target) or not np.isfinite(risk_reward_ratio):
- return _fail_safely("non_finite_inputs_pnl_factor")
- if profit_target <= 0.0:
- return 0.0
+ """
+ Compute PnL target coefficient based on PnL/target ratio using tanh.
- win_reward_factor = _get_float_param(
- params,
- "win_reward_factor",
- DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0),
- )
- pnl_factor_beta = _get_float_param(
- params,
- "pnl_factor_beta",
- DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
- )
- rr = risk_reward_ratio if risk_reward_ratio > 0 else 1.0
-
- pnl_ratio = pnl / profit_target
- pnl_target_factor = 1.0
- if abs(pnl_ratio) > 1.0:
- base_pnl_target_factor = math.tanh(pnl_factor_beta * (abs(pnl_ratio) - 1.0))
- if pnl_ratio > 1.0:
- pnl_target_factor = 1.0 + win_reward_factor * base_pnl_target_factor
- elif pnl_ratio < -(1.0 / rr):
- loss_penalty_factor = win_reward_factor * rr
- pnl_target_factor = 1.0 + loss_penalty_factor * base_pnl_target_factor
-
- efficiency_factor = 1.0
+ Returns a coefficient (typically 0.5-2.0) to be multiplied with base_factor.
+ The coefficient rewards trades that exceed profit targets and penalizes losses
+ beyond the risk/reward threshold.
+
+ Args:
+ params: Reward configuration parameters
+ pnl: Realized profit/loss
+ profit_target: Target profit threshold
+ risk_reward_ratio: Risk/reward ratio for loss penalty calculation
+
+ Returns:
+ float: Coefficient ≥ 0.0 (typically 0.5-2.0 range)
+ """
+ pnl_target_coefficient = 1.0
+
+ if profit_target > 0.0:
+ win_reward_factor = _get_float_param(
+ params,
+ "win_reward_factor",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0),
+ )
+ pnl_factor_beta = _get_float_param(
+ params,
+ "pnl_factor_beta",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
+ )
+ rr = risk_reward_ratio if risk_reward_ratio > 0 else 1.0
+
+ pnl_ratio = pnl / profit_target
+ if abs(pnl_ratio) > 1.0:
+ base_pnl_target_coefficient = math.tanh(pnl_factor_beta * (abs(pnl_ratio) - 1.0))
+ if pnl_ratio > 1.0:
+ pnl_target_coefficient = 1.0 + win_reward_factor * base_pnl_target_coefficient
+ elif pnl_ratio < -(1.0 / rr):
+ loss_penalty_factor = win_reward_factor * rr
+ pnl_target_coefficient = 1.0 + loss_penalty_factor * base_pnl_target_coefficient
+
+ return pnl_target_coefficient
+
+
+def _compute_efficiency_coefficient(
+ params: RewardParams,
+ context: RewardContext,
+ pnl: float,
+) -> float:
+ """
+ Compute exit efficiency coefficient based on PnL position relative to unrealized extremes.
+
+ Returns a coefficient (typically 0.5-1.5) that rewards exits closer to optimal timing.
+ For profitable trades, higher coefficient when exiting near max unrealized profit.
+ For losing trades, higher coefficient when exiting near min unrealized loss.
+
+ Args:
+ params: Reward configuration parameters containing:
+ - efficiency_weight: Amplification factor for efficiency adjustment
+ - efficiency_center: Target efficiency ratio (0.0-1.0)
+ context: Trade context with unrealized profit/loss extremes
+ pnl: Realized profit/loss
+
+ Returns:
+ float: Coefficient ≥ 0.0 (typically 0.5-1.5 range)
+ """
+ efficiency_coefficient = 1.0
efficiency_weight = _get_float_param(
params,
"efficiency_weight",
if np.isfinite(range_pnl) and not np.isclose(range_pnl, 0.0):
efficiency_ratio = (pnl - min_pnl) / range_pnl
if pnl > 0.0:
- efficiency_factor = 1.0 + efficiency_weight * (efficiency_ratio - efficiency_center)
+ efficiency_coefficient = 1.0 + efficiency_weight * (
+ efficiency_ratio - efficiency_center
+ )
elif pnl < 0.0:
- efficiency_factor = 1.0 + efficiency_weight * (efficiency_center - efficiency_ratio)
+ efficiency_coefficient = 1.0 + efficiency_weight * (
+ efficiency_center - efficiency_ratio
+ )
+
+ return efficiency_coefficient
- return max(0.0, pnl_target_factor * efficiency_factor)
+
+def _get_pnl_coefficient(
+ params: RewardParams,
+ context: RewardContext,
+ profit_target: float,
+ risk_reward_ratio: float,
+) -> float:
+ """
+ Compute combined PnL coefficient from target and efficiency components.
+
+ Multiplies the PnL target coefficient (based on profit/target ratio) with
+ the efficiency coefficient (based on exit timing quality) to produce a
+ single composite coefficient applied to the base reward factor.
+
+ Args:
+ params: Reward configuration parameters
+ context: Trade context with PnL and unrealized extremes
+ profit_target: Target profit threshold
+ risk_reward_ratio: Risk/reward ratio for loss penalty calculation
+
+ Returns:
+ float: Composite coefficient ≥ 0.0 (typically 0.25-4.0 range)
+ """
+ pnl = context.pnl
+ if not np.isfinite(pnl) or not np.isfinite(profit_target) or not np.isfinite(risk_reward_ratio):
+ return _fail_safely("non_finite_inputs_pnl_coefficient")
+ if profit_target <= 0.0:
+ return 0.0
+
+ pnl_target_coefficient = _compute_pnl_target_coefficient(
+ params, pnl, profit_target, risk_reward_ratio
+ )
+ efficiency_coefficient = _compute_efficiency_coefficient(params, context, pnl)
+
+ return max(0.0, pnl_target_coefficient * efficiency_coefficient)
def _is_valid_action(
def _compute_exit_reward(
base_factor: float,
- pnl_factor: float,
+ pnl_coefficient: float,
context: RewardContext,
params: RewardParams,
) -> float:
DEFAULT_MODEL_REWARD_PARAMETERS.get("max_trade_duration_candles", 128),
)
duration_ratio = _compute_duration_ratio(context.trade_duration, max_trade_duration_candles)
- exit_factor = _get_exit_factor(base_factor, context.pnl, pnl_factor, duration_ratio, params)
+ exit_factor = _get_exit_factor(
+ base_factor, context.pnl, pnl_coefficient, duration_ratio, params
+ )
return context.pnl * exit_factor
pnl_target = float(profit_target * risk_reward_ratio)
idle_factor = factor * pnl_target / 4.0
- pnl_factor = _get_pnl_factor(
+ pnl_coefficient = _get_pnl_coefficient(
params,
context,
pnl_target,
base_reward = _hold_penalty(context, hold_factor, params)
breakdown.hold_penalty = base_reward
elif context.action == Actions.Long_exit and context.position == Positions.Long:
- base_reward = _compute_exit_reward(factor, pnl_factor, context, params)
+ base_reward = _compute_exit_reward(factor, pnl_coefficient, context, params)
breakdown.exit_component = base_reward
elif context.action == Actions.Short_exit and context.position == Positions.Short:
- base_reward = _compute_exit_reward(factor, pnl_factor, context, params)
+ base_reward = _compute_exit_reward(factor, pnl_coefficient, context, params)
breakdown.exit_component = base_reward
else:
base_reward = 0.0
_compute_hold_potential,
_get_exit_factor,
_get_float_param,
- _get_pnl_factor,
+ _get_pnl_coefficient,
calculate_reward,
)
)
def test_efficiency_zero_policy(self):
- """Test efficiency zero policy produces expected PnL factor.
+ """Test efficiency zero policy produces expected PnL coefficient.
Verifies:
- - efficiency_weight = 0 → pnl_factor ≈ 1.0
- - Factor is finite and positive
+ - efficiency_weight = 0 → pnl_coefficient ≈ 1.0
+ - Coefficient is finite and positive
"""
ctx = self.make_ctx(
pnl=0.0,
)
params = self.base_params()
profit_target = self.TEST_PROFIT_TARGET * self.TEST_RR
- pnl_factor = _get_pnl_factor(params, ctx, profit_target, self.TEST_RR)
- self.assertFinite(pnl_factor, name="pnl_factor")
- self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=self.TOL_GENERIC_EQ)
+ pnl_coefficient = _get_pnl_coefficient(params, ctx, profit_target, self.TEST_RR)
+ self.assertFinite(pnl_coefficient, name="pnl_coefficient")
+ self.assertAlmostEqualFloat(pnl_coefficient, 1.0, tolerance=self.TOL_GENERIC_EQ)
def test_max_idle_duration_candles_logic(self):
"""Test max idle duration candles parameter affects penalty magnitude.
for mode in modes_to_test:
test_params = self.base_params(exit_attenuation_mode=mode)
factor = _get_exit_factor(
- base_factor=1.0, pnl=0.02, pnl_factor=1.5, duration_ratio=0.3, params=test_params
+ base_factor=1.0,
+ pnl=0.02,
+ pnl_coefficient=1.5,
+ duration_ratio=0.3,
+ params=test_params,
)
self.assertFinite(factor, name=f"exit_factor[{mode}]")
self.assertGreater(factor, 0, f"Exit factor for {mode} should be positive")
_get_exit_factor,
base_factor=1.0,
pnl=0.02,
- pnl_factor=1.5,
+ pnl_coefficient=1.5,
plateau_params=plateau_params,
grace=0.5,
tolerance_strict=self.TOL_IDENTITY_STRICT,
msg="invariance_correction should be ~0 in canonical mode",
)
+ def test_efficiency_center_extremes(self):
+ """Efficiency center extremes affect pnl_coefficient as expected when pnl_target_coefficient=1."""
+ context = self.make_ctx(
+ pnl=0.05,
+ trade_duration=10,
+ idle_duration=0,
+ max_unrealized_profit=0.10,
+ min_unrealized_profit=0.00,
+ position=Positions.Long,
+ action=Actions.Long_exit,
+ )
+ profit_target = 0.20
+ base_params = self.base_params(efficiency_weight=2.0)
+ params_center0 = dict(base_params, efficiency_center=0.0)
+ params_center1 = dict(base_params, efficiency_center=1.0)
+ coef_c0 = _get_pnl_coefficient(params_center0, context, profit_target, self.TEST_RR)
+ coef_c1 = _get_pnl_coefficient(params_center1, context, profit_target, self.TEST_RR)
+ self.assertFinite(coef_c0, name="coef_center0")
+ self.assertFinite(coef_c1, name="coef_center1")
+ self.assertGreater(coef_c0, coef_c1)
+
+ def test_efficiency_weight_zero_vs_two(self):
+ """Efficiency weight 0 yields ~1; weight 2 amplifies pnl_coefficient when center < ratio."""
+ context = self.make_ctx(
+ pnl=0.05,
+ trade_duration=10,
+ idle_duration=0,
+ max_unrealized_profit=0.10,
+ min_unrealized_profit=0.00,
+ position=Positions.Long,
+ action=Actions.Long_exit,
+ )
+ profit_target = 0.20
+ params_w0 = self.base_params(efficiency_weight=0.0, efficiency_center=0.2)
+ params_w2 = self.base_params(efficiency_weight=2.0, efficiency_center=0.2)
+ c0 = _get_pnl_coefficient(params_w0, context, profit_target, self.TEST_RR)
+ c2 = _get_pnl_coefficient(params_w2, context, profit_target, self.TEST_RR)
+ self.assertFinite(c0, name="coef_w0")
+ self.assertFinite(c2, name="coef_w2")
+ self.assertAlmostEqualFloat(c0, 1.0, tolerance=self.TOL_GENERIC_EQ)
+ self.assertGreater(c2, c0)
+
if __name__ == "__main__":
unittest.main()
from reward_space_analysis import (
_get_exit_factor,
- _get_pnl_factor,
+ _get_pnl_coefficient,
calculate_reward,
)
test_case,
base_factor: float,
pnl: float,
- pnl_factor: float,
+ pnl_coefficient: float,
attenuation_modes: Sequence[str],
base_params_fn,
tolerance_relaxed: float,
test_case: Test case instance with assertion methods
base_factor: Base scaling factor
pnl: Profit/loss value
- pnl_factor: PnL amplification factor
+ pnl_coefficient: PnL amplification coefficient
attenuation_modes: List of mode names to test
base_params_fn: Factory function for creating parameter dicts
tolerance_relaxed: Numerical tolerance for monotonicity checks
mode_params = base_params_fn(exit_attenuation_mode="sqrt")
ratios = np.linspace(0, 2, 15)
values = [
- _get_exit_factor(base_factor, pnl, pnl_factor, r, mode_params) for r in ratios
+ _get_exit_factor(base_factor, pnl, pnl_coefficient, r, mode_params) for r in ratios
]
if mode == "plateau_linear":
grace = float(mode_params["exit_plateau_grace"])
short_allowed=True,
action_masking=True,
)
- pnl_factor_hl = _get_pnl_factor(params, context, profit_target, risk_reward_ratio)
+ pnl_coefficient_hl = _get_pnl_coefficient(params, context, profit_target, risk_reward_ratio)
observed_exit_factor = _get_exit_factor(
- base_factor, context.pnl, pnl_factor_hl, duration_ratio, params
+ base_factor, context.pnl, pnl_coefficient_hl, duration_ratio, params
)
observed_half_life_factor = observed_exit_factor / (
- base_factor * max(pnl_factor_hl, np.finfo(float).eps)
+ base_factor * max(pnl_coefficient_hl, np.finfo(float).eps)
)
expected_half_life_factor = 2 ** (-duration_ratio / params["exit_half_life"])
test_case.assertAlmostEqual(
suite_cases: List of scenario dicts with keys:
- base_factor: Base scaling factor
- pnl: Profit/loss value
- - pnl_factor: PnL amplification factor
+ - pnl_coefficient: PnL amplification coefficient
- duration_ratio: Duration ratio (0-2)
- params: Parameter dictionary
- expectation: Expected invariant ("non_negative", "safe_zero", "clamped")
Example:
cases = [
{
- "base_factor": 90.0, "pnl": 0.08, "pnl_factor": 1.5,
+ "base_factor": 90.0, "pnl": 0.08, "pnl_coefficient": 1.5,
"duration_ratio": 0.5, "params": {...},
"expectation": "non_negative", "tolerance": 1e-09
},
{
- "base_factor": 90.0, "pnl": 0.0, "pnl_factor": 0.0,
+ "base_factor": 90.0, "pnl": 0.0, "pnl_coefficient": 0.0,
"duration_ratio": 0.5, "params": {...},
"expectation": "safe_zero"
},
f_val = exit_factor_fn(
case["base_factor"],
case["pnl"],
- case["pnl_factor"],
+ case["pnl_coefficient"],
case["duration_ratio"],
case["params"],
)
exit_factor_fn,
base_factor: float,
pnl: float,
- pnl_factor: float,
+ pnl_coefficient: float,
duration_ratio: float,
bad_params: Dict[str, Any],
reference_params: Dict[str, Any],
exit_factor_fn: Exit factor calculation function
base_factor: Base scaling factor
pnl: Profit/loss value
- pnl_factor: PnL amplification factor
+ pnl_coefficient: PnL amplification coefficient
duration_ratio: Duration ratio
bad_params: Parameters that trigger kernel failure
reference_params: Reference linear mode parameters for comparison
)
"""
- f_bad = exit_factor_fn(base_factor, pnl, pnl_factor, duration_ratio, bad_params)
- f_ref = exit_factor_fn(base_factor, pnl, pnl_factor, duration_ratio, reference_params)
+ f_bad = exit_factor_fn(base_factor, pnl, pnl_coefficient, duration_ratio, bad_params)
+ f_ref = exit_factor_fn(base_factor, pnl, pnl_coefficient, duration_ratio, reference_params)
test_case.assertAlmostEqual(f_bad, f_ref, delta=TOLERANCE.IDENTITY_STRICT)
test_case.assertGreaterEqual(f_bad, 0.0)
exit_factor_fn,
base_factor: float,
pnl: float,
- pnl_factor: float,
+ pnl_coefficient: float,
plateau_params: dict,
grace: float,
tolerance_strict: float,
exit_factor_fn: Exit factor calculation function (_get_exit_factor)
base_factor: Base factor for exit calculation
pnl: PnL value
- pnl_factor: PnL factor multiplier
+ pnl_coefficient: PnL coefficient multiplier
plateau_params: Parameters dict with plateau configuration
grace: Grace period threshold (exit_plateau_grace value)
tolerance_strict: Tolerance for numerical comparisons
plateau_factor_pre = exit_factor_fn(
base_factor=base_factor,
pnl=pnl,
- pnl_factor=pnl_factor,
+ pnl_coefficient=pnl_coefficient,
duration_ratio=duration_ratio_pre,
params=plateau_params,
)
plateau_factor_post = exit_factor_fn(
base_factor=base_factor,
pnl=pnl,
- pnl_factor=pnl_factor,
+ pnl_coefficient=pnl_coefficient,
duration_ratio=duration_ratio_post,
params=plateau_params,
)
Attributes:
base_factor: Base scaling factor
pnl: Profit/loss value
- pnl_factor: PnL amplification factor
+ pnl_coefficient: PnL amplification coefficient
duration_ratio: Ratio of current to maximum duration
attenuation_mode: Mode of attenuation ("linear", "power", etc.)
plateau_enabled: Whether plateau behavior is active
base_factor: float
pnl: float
- pnl_factor: float
+ pnl_coefficient: float
duration_ratio: float
attenuation_mode: str
plateau_enabled: bool = False
factor = _get_exit_factor(
base_factor=10.0,
pnl=0.01,
- pnl_factor=1.0,
+ pnl_coefficient=1.0,
duration_ratio=0.5,
params=params,
)
factor = _get_exit_factor(
base_factor=10.0,
pnl=0.01,
- pnl_factor=1.0,
+ pnl_coefficient=1.0,
duration_ratio=2.0,
params=params,
)
factor = _get_exit_factor(
base_factor=5.0,
pnl=0.02,
- pnl_factor=1.0,
+ pnl_coefficient=1.0,
duration_ratio=1.5,
params=params,
)
factor = _get_exit_factor(
base_factor=5.0,
pnl=0.02,
- pnl_factor=1.0,
+ pnl_coefficient=1.0,
duration_ratio=2.0,
params=params,
)
{
"base_factor": 15.0,
"pnl": 0.02,
- "pnl_factor": 1.0,
+ "pnl_coefficient": 1.0,
"duration_ratio": -5.0,
"params": {
"exit_attenuation_mode": "linear",
{
"base_factor": 15.0,
"pnl": 0.02,
- "pnl_factor": 1.0,
+ "pnl_coefficient": 1.0,
"duration_ratio": 0.0,
"params": {
"exit_attenuation_mode": "linear",
{
"base_factor": float("nan"),
"pnl": 0.01,
- "pnl_factor": 1.0,
+ "pnl_coefficient": 1.0,
"duration_ratio": 0.2,
"params": {"exit_attenuation_mode": "linear", "exit_linear_slope": 0.5},
"expectation": "safe_zero",
{
"base_factor": 10.0,
"pnl": float("nan"),
- "pnl_factor": 1.0,
+ "pnl_coefficient": 1.0,
"duration_ratio": 0.2,
"params": {"exit_attenuation_mode": "linear", "exit_linear_slope": 0.5},
"expectation": "safe_zero",
{
"base_factor": 10.0,
"pnl": 0.01,
- "pnl_factor": 1.0,
+ "pnl_coefficient": 1.0,
"duration_ratio": float("nan"),
"params": {"exit_attenuation_mode": "linear", "exit_linear_slope": 0.5},
"expectation": "safe_zero",
{
"base_factor": 10.0,
"pnl": 0.02,
- "pnl_factor": float("inf"),
+ "pnl_coefficient": float("inf"),
"duration_ratio": 0.5,
"params": {
"exit_attenuation_mode": "linear",
{
"base_factor": 10.0,
"pnl": 0.015,
- "pnl_factor": -2.5,
+ "pnl_coefficient": -2.5,
"duration_ratio": 2.0,
"params": {
"exit_attenuation_mode": "legacy",
self,
base_factor=self.TEST_BASE_FACTOR,
pnl=0.05,
- pnl_factor=1.0,
+ pnl_coefficient=1.0,
attenuation_modes=modes,
base_params_fn=self.base_params,
tolerance_relaxed=self.TOL_IDENTITY_RELAXED,
"""Negative exit_linear_slope is sanitized to 1.0; resulting exit factors must match slope=1.0 within tolerance."""
base_factor = 100.0
pnl = 0.03
- pnl_factor = 1.0
+ pnl_coefficient = 1.0
duration_ratios = [0.0, 0.2, 0.5, 1.0, 1.5]
params_bad = self.base_params(
exit_attenuation_mode="linear", exit_linear_slope=-5.0, exit_plateau=False
exit_attenuation_mode="linear", exit_linear_slope=1.0, exit_plateau=False
)
for dr in duration_ratios:
- f_bad = _get_exit_factor(base_factor, pnl, pnl_factor, dr, params_bad)
- f_ref = _get_exit_factor(base_factor, pnl, pnl_factor, dr, params_ref)
+ f_bad = _get_exit_factor(base_factor, pnl, pnl_coefficient, dr, params_bad)
+ f_ref = _get_exit_factor(base_factor, pnl, pnl_coefficient, dr, params_ref)
self.assertAlmostEqualFloat(
f_bad,
f_ref,
"""Power mode attenuation: ratio f(dr=1)/f(dr=0) must equal 1/(1+1)^alpha with alpha=-log(tau)/log(2)."""
base_factor = 200.0
pnl = 0.04
- pnl_factor = 1.0
+ pnl_coefficient = 1.0
duration_ratio = 1.0
taus = [0.9, 0.5, 0.25, 1.0]
for tau in taus:
params = self.base_params(
exit_attenuation_mode="power", exit_power_tau=tau, exit_plateau=False
)
- f0 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.0, params)
- f1 = _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params)
+ f0 = _get_exit_factor(base_factor, pnl, pnl_coefficient, 0.0, params)
+ f1 = _get_exit_factor(base_factor, pnl, pnl_coefficient, duration_ratio, params)
if 0.0 < tau <= 1.0:
alpha = -math.log(tau) / math.log(2.0)
else:
"""Test parameter edge cases: tau extrema, plateau grace edges, slope zero."""
base_factor = 50.0
pnl = 0.02
- pnl_factor = 1.0
+ pnl_coefficient = 1.0
params_hi = self.base_params(exit_attenuation_mode="power", exit_power_tau=0.999999)
params_lo = self.base_params(
exit_attenuation_mode="power", exit_power_tau=self.MIN_EXIT_POWER_TAU
)
r = 1.5
- hi_val = _get_exit_factor(base_factor, pnl, pnl_factor, r, params_hi)
- lo_val = _get_exit_factor(base_factor, pnl, pnl_factor, r, params_lo)
+ hi_val = _get_exit_factor(base_factor, pnl, pnl_coefficient, r, params_hi)
+ lo_val = _get_exit_factor(base_factor, pnl, pnl_coefficient, r, params_lo)
self.assertGreater(
hi_val, lo_val, "Power mode: higher tau (≈1) should attenuate less than tiny tau"
)
exit_plateau_grace=1.0,
exit_linear_slope=1.0,
)
- val_g0 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g0)
- val_g1 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g1)
+ val_g0 = _get_exit_factor(base_factor, pnl, pnl_coefficient, 0.5, params_g0)
+ val_g1 = _get_exit_factor(base_factor, pnl, pnl_coefficient, 0.5, params_g1)
self.assertGreater(
val_g1, val_g0, "Plateau grace=1.0 should delay attenuation vs grace=0.0"
)
params_lin1 = self.base_params(
exit_attenuation_mode="linear", exit_linear_slope=2.0, exit_plateau=False
)
- val_lin0 = _get_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin0)
- val_lin1 = _get_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin1)
+ val_lin0 = _get_exit_factor(base_factor, pnl, pnl_coefficient, 1.0, params_lin0)
+ val_lin1 = _get_exit_factor(base_factor, pnl, pnl_coefficient, 1.0, params_lin1)
self.assertGreater(
val_lin0, val_lin1, "Linear slope=0 should yield no attenuation vs slope>0"
)
)
base_factor = self.TEST_BASE_FACTOR
pnl = 0.04
- pnl_factor = 1.2
+ pnl_coefficient = 1.2
ratios = [0.3, 0.6, 1.0, 1.4]
- values = [_get_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios]
+ values = [_get_exit_factor(base_factor, pnl, pnl_coefficient, r, params) for r in ratios]
first = values[0]
for v in values[1:]:
self.assertAlmostEqualFloat(
)
base_factor = 80.0
pnl = self.TEST_PROFIT_TARGET
- pnl_factor = 1.1
+ pnl_coefficient = 1.1
ratios = [0.8, 1.0, 1.2, 1.4, 1.6]
- vals = [_get_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios]
+ vals = [_get_exit_factor(base_factor, pnl, pnl_coefficient, r, params) for r in ratios]
ref = vals[0]
for i, r in enumerate(ratios[:-1]):
self.assertAlmostEqualFloat(
eps = self.CONTINUITY_EPS_SMALL
base_factor = self.TEST_BASE_FACTOR
pnl = 0.01
- pnl_factor = 1.0
+ pnl_coefficient = 1.0
tau = 0.5
half_life = 0.5
slope = 1.3
"exit_half_life": half_life,
}
)
- left = _get_exit_factor(base_factor, pnl, pnl_factor, grace - eps, params)
- boundary = _get_exit_factor(base_factor, pnl, pnl_factor, grace, params)
- right = _get_exit_factor(base_factor, pnl, pnl_factor, grace + eps, params)
+ left = _get_exit_factor(base_factor, pnl, pnl_coefficient, grace - eps, params)
+ boundary = _get_exit_factor(base_factor, pnl, pnl_coefficient, grace, params)
+ right = _get_exit_factor(base_factor, pnl, pnl_coefficient, grace + eps, params)
self.assertAlmostEqualFloat(
left,
boundary,
)
base_factor = 75.0
pnl = 0.05
- pnl_factor = 1.0
+ pnl_coefficient = 1.0
duration_ratio = 0.8
with assert_diagnostic_warning(["Unknown exit_attenuation_mode"]):
- f_unknown = _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params)
+ f_unknown = _get_exit_factor(base_factor, pnl, pnl_coefficient, duration_ratio, params)
linear_params = self.base_params(exit_attenuation_mode="linear", exit_plateau=False)
- f_linear = _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, linear_params)
+ f_linear = _get_exit_factor(
+ base_factor, pnl, pnl_coefficient, duration_ratio, linear_params
+ )
self.assertAlmostEqualFloat(
f_unknown,
f_linear,
)
base_factor = PARAMS.BASE_FACTOR
pnl = 0.03
- pnl_factor = 1.0
+ pnl_coefficient = 1.0
duration_ratio = 0.5
with assert_diagnostic_warning(["exit_plateau_grace < 0"]):
- f_neg = _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params)
+ f_neg = _get_exit_factor(base_factor, pnl, pnl_coefficient, duration_ratio, params)
# Reference with grace=0.0 (since negative should clamp)
ref_params = self.base_params(
exit_attenuation_mode="linear",
exit_plateau_grace=0.0,
exit_linear_slope=1.2,
)
- f_ref = _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, ref_params)
+ f_ref = _get_exit_factor(base_factor, pnl, pnl_coefficient, duration_ratio, ref_params)
self.assertAlmostEqualFloat(
f_neg,
f_ref,
invalid_taus = [0.0, -0.5, 2.0, float("nan")]
base_factor = 120.0
pnl = 0.04
- pnl_factor = 1.0
+ pnl_coefficient = 1.0
duration_ratio = 1.0
# Explicit alpha=1 expected ratio: f(dr)/f(0)=1/(1+dr)^1 with plateau disabled to observe attenuation.
expected_ratio_alpha1 = 1.0 / (1.0 + duration_ratio)
exit_attenuation_mode="power", exit_power_tau=tau, exit_plateau=False
)
with assert_diagnostic_warning(["exit_power_tau"]):
- f0 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.0, params)
- f1 = _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params)
+ f0 = _get_exit_factor(base_factor, pnl, pnl_coefficient, 0.0, params)
+ f1 = _get_exit_factor(base_factor, pnl, pnl_coefficient, duration_ratio, params)
ratio = f1 / max(f0, self.TOL_NUMERIC_GUARD)
self.assertAlmostEqual(
ratio,
"""Invariant 105: Near-zero exit_half_life warns and returns factor≈base_factor (no attenuation)."""
base_factor = 60.0
pnl = 0.02
- pnl_factor = 1.0
+ pnl_coefficient = 1.0
duration_ratio = 0.7
near_zero_values = [1e-15, 1e-12, 5e-14]
for hl in near_zero_values:
params = self.base_params(exit_attenuation_mode="half_life", exit_half_life=hl)
with assert_diagnostic_warning(["exit_half_life", "close to 0"]):
- _ = _get_exit_factor(base_factor, pnl, pnl_factor, 0.0, params)
- fdr = _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params)
+ _ = _get_exit_factor(base_factor, pnl, pnl_coefficient, 0.0, params)
+ fdr = _get_exit_factor(base_factor, pnl, pnl_coefficient, duration_ratio, params)
self.assertAlmostEqualFloat(
fdr,
- 1.0 * pnl_factor, # Kernel returns 1.0 then * pnl_factor
+ base_factor
+ * 1.0
+ * pnl_coefficient, # base_factor * time_coefficient (1.0) * pnl_coefficient
tolerance=self.TOL_IDENTITY_RELAXED,
msg=f"Near-zero half-life attenuation mismatch hl={hl} fdr={fdr}",
)
DEFAULT_HOLD_PENALTY_POWER: Final[float] = 1.025
DEFAULT_CHECK_INVARIANTS: Final[bool] = True
- DEFAULT_EXIT_FACTOR_THRESHOLD: Final[float] = 10_000.0
+ DEFAULT_EXIT_FACTOR_THRESHOLD: Final[float] = 1_000.0
_MODEL_TYPES: Final[Tuple[ModelType, ...]] = (
"PPO",
self._last_exit_reward = 0.0
return observation, history
- def _compute_time_attenuation_factor(
+ def _compute_time_attenuation_coefficient(
self,
- factor: float,
duration_ratio: float,
model_reward_parameters: Mapping[str, Any],
) -> float:
"""
- Apply time-based decay to reward factor using configurable strategy
+ Calculate time-based attenuation coefficient using configurable strategy
(legacy/sqrt/linear/power/half_life). Optionally apply plateau grace period.
"""
if duration_ratio < 0.0:
)
)
if exit_plateau_grace < 0.0:
+ logger.warning("exit_plateau_grace < 0; falling back to 0.0")
exit_plateau_grace = 0.0
- def _legacy(f: float, dr: float, p: Mapping[str, Any]) -> float:
- return f * (1.5 if dr <= 1.0 else 0.5)
+ def _legacy(dr: float, p: Mapping[str, Any]) -> float:
+ return 1.5 if dr <= 1.0 else 0.5
- def _sqrt(f: float, dr: float, p: Mapping[str, Any]) -> float:
- return f / math.sqrt(1.0 + dr)
+ def _sqrt(dr: float, p: Mapping[str, Any]) -> float:
+ return 1.0 / math.sqrt(1.0 + dr)
- def _linear(f: float, dr: float, p: Mapping[str, Any]) -> float:
+ def _linear(dr: float, p: Mapping[str, Any]) -> float:
slope = float(
p.get("exit_linear_slope", ReforceXY.DEFAULT_EXIT_LINEAR_SLOPE)
)
if slope < 0.0:
+ logger.warning("exit_linear_slope < 0; falling back to 1.0")
slope = 1.0
- return f / (1.0 + slope * dr)
+ return 1.0 / (1.0 + slope * dr)
- def _power(f: float, dr: float, p: Mapping[str, Any]) -> float:
+ def _power(dr: float, p: Mapping[str, Any]) -> float:
tau = p.get("exit_power_tau")
if isinstance(tau, (int, float)):
tau = float(tau)
alpha = 1.0
else:
alpha = 1.0
- return f / math.pow(1.0 + dr, alpha)
+ return 1.0 / math.pow(1.0 + dr, alpha)
- def _half_life(f: float, dr: float, p: Mapping[str, Any]) -> float:
+ def _half_life(dr: float, p: Mapping[str, Any]) -> float:
hl = float(p.get("exit_half_life", ReforceXY.DEFAULT_EXIT_HALF_LIFE))
if np.isclose(hl, 0.0) or hl < 0.0:
return 1.0
- return f * math.pow(2.0, -dr / hl)
+ return math.pow(2.0, -dr / hl)
- strategies: Dict[str, Callable[[float, float, Mapping[str, Any]], float]] = {
+ strategies: Dict[str, Callable[[float, Mapping[str, Any]], float]] = {
ReforceXY._EXIT_ATTENUATION_MODES[0]: _legacy,
ReforceXY._EXIT_ATTENUATION_MODES[1]: _sqrt,
ReforceXY._EXIT_ATTENUATION_MODES[2]: _linear,
strategy_fn = _linear
try:
- factor = strategy_fn(factor, effective_dr, model_reward_parameters)
+ time_attenuation_coefficient = strategy_fn(
+ effective_dr, model_reward_parameters
+ )
except Exception as e:
logger.warning(
"exit_attenuation_mode '%s' failed (%r); fallback to %s (effective_dr=%.5f)",
ReforceXY._EXIT_ATTENUATION_MODES[2], # "linear"
effective_dr,
)
- factor = _linear(factor, effective_dr, model_reward_parameters)
+ time_attenuation_coefficient = _linear(
+ effective_dr, model_reward_parameters
+ )
- return factor
+ return time_attenuation_coefficient
def _get_exit_factor(
self,
- factor: float,
+ base_factor: float,
pnl: float,
duration_ratio: float,
model_reward_parameters: Mapping[str, Any],
) -> float:
"""
- Compute exit reward factor combining time attenuation and PnL factors
+ Compute exit factor: base_factor × time_attenuation_coefficient × pnl_coefficient.
"""
if not (
- np.isfinite(factor) and np.isfinite(pnl) and np.isfinite(duration_ratio)
+ np.isfinite(base_factor)
+ and np.isfinite(pnl)
+ and np.isfinite(duration_ratio)
):
return 0.0
- time_attenuation_factor = self._compute_time_attenuation_factor(
- factor,
+
+ time_attenuation_coefficient = self._compute_time_attenuation_coefficient(
duration_ratio,
model_reward_parameters,
)
-
- factor *= time_attenuation_factor * self._get_pnl_factor(
+ pnl_coefficient = self._get_pnl_coefficient(
pnl, self._pnl_target, model_reward_parameters
)
+ exit_factor = base_factor * time_attenuation_coefficient * pnl_coefficient
+
check_invariants = model_reward_parameters.get(
"check_invariants", ReforceXY.DEFAULT_CHECK_INVARIANTS
)
check_invariants if isinstance(check_invariants, bool) else True
)
if check_invariants:
- if not np.isfinite(factor):
+ if not np.isfinite(exit_factor):
logger.debug(
"_get_exit_factor produced non-finite factor; resetting to 0.0"
)
return 0.0
- if factor < 0.0 and pnl >= 0.0:
+ if exit_factor < 0.0 and pnl >= 0.0:
logger.debug(
- "_get_exit_factor negative with positive pnl (factor=%.5f, pnl=%.5f); clamping to 0.0",
- factor,
+ "_get_exit_factor negative with positive pnl (exit_factor=%.5f, pnl=%.5f); clamping to 0.0",
+ exit_factor,
pnl,
)
- factor = 0.0
+ exit_factor = 0.0
exit_factor_threshold = float(
model_reward_parameters.get(
"exit_factor_threshold", ReforceXY.DEFAULT_EXIT_FACTOR_THRESHOLD
)
)
- if exit_factor_threshold > 0 and abs(factor) > exit_factor_threshold:
+ if exit_factor_threshold > 0 and abs(exit_factor) > exit_factor_threshold:
logger.warning(
- "_get_exit_factor |factor|=%.2f exceeds threshold %.2f",
- factor,
+ "_get_exit_factor |exit_factor|=%.2f exceeds threshold %.2f",
+ exit_factor,
exit_factor_threshold,
)
- return factor
+ return exit_factor
- def _compute_pnl_target_factor(
+ def _compute_pnl_target_coefficient(
self, pnl: float, pnl_target: float, model_reward_parameters: Mapping[str, Any]
) -> float:
"""
- Scale reward based on PnL/target ratio using tanh (≥ 1.0 for good trades).
+ Compute PnL target coefficient (typically 0.5-2.0) using tanh on PnL/target ratio.
"""
- pnl_target_factor = 1.0
+ pnl_target_coefficient = 1.0
if pnl_target > 0.0:
pnl_factor_beta = float(
pnl_ratio = pnl / pnl_target
if abs(pnl_ratio) > 1.0:
- base_pnl_target_factor = math.tanh(
+ base_pnl_target_coefficient = math.tanh(
pnl_factor_beta * (abs(pnl_ratio) - 1.0)
)
win_reward_factor = float(
)
if pnl_ratio > 1.0:
- pnl_target_factor = 1.0 + win_reward_factor * base_pnl_target_factor
+ pnl_target_coefficient = (
+ 1.0 + win_reward_factor * base_pnl_target_coefficient
+ )
elif pnl_ratio < -(1.0 / self.rr):
loss_penalty_factor = win_reward_factor * self.rr
- pnl_target_factor = (
- 1.0 + loss_penalty_factor * base_pnl_target_factor
+ pnl_target_coefficient = (
+ 1.0 + loss_penalty_factor * base_pnl_target_coefficient
)
- return pnl_target_factor
+ return pnl_target_coefficient
- def _compute_efficiency_factor(
+ def _compute_efficiency_coefficient(
self, pnl: float, model_reward_parameters: Mapping[str, Any]
) -> float:
"""
- Scale reward based on exit efficiency (distance from max unrealized PnL).
+ Compute exit efficiency coefficient (typically 0.5-1.5) based on exit timing quality.
"""
efficiency_weight = float(
model_reward_parameters.get(
)
)
- efficiency_factor = 1.0
+ efficiency_coefficient = 1.0
if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0):
max_pnl = max(self.get_max_unrealized_profit(), pnl)
min_pnl = min(self.get_min_unrealized_profit(), pnl)
if np.isfinite(range_pnl) and not np.isclose(range_pnl, 0.0):
efficiency_ratio = (pnl - min_pnl) / range_pnl
if pnl > 0.0:
- efficiency_factor = 1.0 + efficiency_weight * (
+ efficiency_coefficient = 1.0 + efficiency_weight * (
efficiency_ratio - efficiency_center
)
elif pnl < 0.0:
- efficiency_factor = 1.0 + efficiency_weight * (
+ efficiency_coefficient = 1.0 + efficiency_weight * (
efficiency_center - efficiency_ratio
)
- return efficiency_factor
+ return efficiency_coefficient
- def _get_pnl_factor(
+ def _get_pnl_coefficient(
self, pnl: float, pnl_target: float, model_reward_parameters: Mapping[str, Any]
) -> float:
"""
- Combine PnL target and efficiency factors (>= 0.0)
+ Combine PnL target and efficiency coefficients (typically 0.25-4.0).
"""
- pnl_target_factor = self._compute_pnl_target_factor(
+ pnl_target_coefficient = self._compute_pnl_target_coefficient(
pnl, pnl_target, model_reward_parameters
)
- efficiency_factor = self._compute_efficiency_factor(
+ efficiency_coefficient = self._compute_efficiency_coefficient(
pnl, model_reward_parameters
)
- return max(0.0, pnl_target_factor * efficiency_factor)
+ return max(0.0, pnl_target_coefficient * efficiency_coefficient)
def calculate_reward(self, action: int) -> float:
"""Compute per-step reward and apply potential-based reward shaping (PBRS).