_Exit attenuation configuration:_
-- `exit_attenuation_mode` (default: linear) - Selects attenuation kernel (see table below: legacy|sqrt|linear|power|half_life).
+- `exit_attenuation_mode` (default: linear) - Selects attenuation kernel (see table below: legacy|sqrt|linear|power|half_life). Fallback to linear.
- `exit_plateau` (default: true) - Enables plateau (no attenuation until `exit_plateau_grace`).
- `exit_plateau_grace` (default: 1.0) - Duration ratio boundary of full‑strength region (may exceed 1.0).
- `exit_linear_slope` (default: 1.0) - Slope parameter used only when mode = linear.
| Component | Controlled By | Notes |
|-----------|---------------|-------|
-| Sample simulation | `--seed` | Drives action sampling, PnL noise, force actions. |
+| Sample simulation | `--seed` | Drives action sampling, PnL noise generation. |
| Statistical tests / bootstrap | `--stats_seed` (fallback `--seed`) | Local RNG; isolation prevents side‑effects in user code. |
| RandomForest & permutation importance | `--seed` | Ensures identical splits and tree construction. |
| Partial dependence grids | Deterministic | Depends only on fitted model & data. |
- Percentile bootstrap confidence intervals (BCa not yet implemented).
- Distribution diagnostics (Shapiro, Anderson, skewness, kurtosis, Q-Q R²).
- Distribution shift metrics (KL divergence, JS distance, Wasserstein, KS test) with
- degenerate (constant) distribution safe‑guards.
+ degenerate (constant) distribution safeguards.
- Unified RandomForest feature importance + partial dependence.
- Heteroscedastic PnL simulation (variance scales with duration).
+Exit attenuation mode normalization:
+- User supplied ``exit_attenuation_mode`` is taken as-is (case-sensitive) and validated
+ against the allowed set. Any invalid value (including casing mismatch) results in a
+ silent fallback to ``'linear'`` (parity with the live environment) – no warning.
+
Architecture principles:
-- Single source of truth: `DEFAULT_MODEL_REWARD_PARAMETERS` for tunables + dynamic CLI.
+- Single source of truth: ``DEFAULT_MODEL_REWARD_PARAMETERS`` (dynamic CLI generation).
- Determinism: explicit seeding, parameter hashing for manifest traceability.
- Extensibility: modular helpers (sampling, reward calculation, statistics, reporting).
"""
import warnings
from enum import Enum, IntEnum
from pathlib import Path
-from typing import Any, Dict, Iterable, List, Optional, Tuple
+from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, Mapping
import numpy as np
import pandas as pd
Neutral = 0.5
-class ForceActions(IntEnum):
- Take_profit = 0
- Stop_loss = 1
- Timeout = 2
-
-
def _to_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
return bool(text)
-def _get_param_float(params: Dict[str, float | str], key: str, default: float) -> float:
+def _get_param_float(
+ params: Mapping[str, RewardParamValue], key: str, default: RewardParamValue
+) -> float:
"""Extract float parameter with type safety and default fallback."""
value = params.get(key, default)
- # None -> default
+ # None -> NaN
if value is None:
- return default
+ return np.nan
# Bool: treat explicitly (avoid surprising True->1.0 unless intentional)
if isinstance(value, bool):
return float(int(value))
try:
fval = float(value)
except (ValueError, TypeError):
- return default
- return fval if np.isfinite(fval) else default
+ return np.nan
+ return fval if np.isfinite(fval) else np.nan
# String parsing
if isinstance(value, str):
stripped = value.strip()
if stripped == "":
- return default
+ return np.nan
try:
fval = float(stripped)
except ValueError:
- return default
- return fval if np.isfinite(fval) else default
+ return np.nan
+ return fval if np.isfinite(fval) else np.nan
# Unsupported type
- return default
+ return np.nan
def _compute_duration_ratio(trade_duration: int, max_trade_duration: int) -> float:
# Mathematical constants pre-computed for performance
_LOG_2 = math.log(2.0)
-DEFAULT_MODEL_REWARD_PARAMETERS: Dict[str, float | str] = {
+RewardParamValue = Union[float, str, bool, None]
+RewardParams = Dict[str, RewardParamValue]
+
+
+# Allowed exit attenuation modes
+ALLOWED_EXIT_MODES = {"legacy", "sqrt", "linear", "power", "half_life"}
+
+DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = {
"invalid_action": -2.0,
"base_factor": 100.0,
# Idle penalty (env defaults)
"idle_penalty_scale": 0.5,
"idle_penalty_power": 1.025,
- # Fallback semantics: if <=0 or unset → 2 * max_trade_duration_candles (grace window before full idle penalty)
- "max_idle_duration_candles": 0,
+ # Fallback semantics: 2 * max_trade_duration_candles
+ "max_idle_duration_candles": None,
# Holding keys (env defaults)
"holding_penalty_scale": 0.25,
"holding_penalty_power": 1.025,
"base_factor": "Base reward factor used inside the environment.",
"idle_penalty_power": "Power applied to idle penalty scaling.",
"idle_penalty_scale": "Scale of idle penalty.",
- "max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling; 0 = use 2 * max_trade_duration_candles.",
+ "max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling.",
"holding_penalty_scale": "Scale of holding penalty.",
"holding_penalty_power": "Power applied to holding penalty scaling.",
"exit_attenuation_mode": "Attenuation kernel (legacy|sqrt|linear|power|half_life).",
def validate_reward_parameters(
- params: Dict[str, float | str],
-) -> Tuple[Dict[str, float | str], Dict[str, Dict[str, Any]]]:
+ params: RewardParams,
+) -> Tuple[RewardParams, Dict[str, Dict[str, Any]]]:
"""Validate and clamp reward parameter values.
- Returns
- -------
- sanitized_params : dict
- Potentially adjusted copy of input params.
- adjustments : dict
- Mapping param -> {original, adjusted, reason} for every modification.
+ This function enforces numeric bounds declared in ``_PARAMETER_BOUNDS``. Values
+ outside their allowed range are clamped and an entry is recorded in the
+ ``adjustments`` mapping describing the original value, the adjusted value and the
+ reason (which bound triggered the change). Non‑finite values are reset to the
+ minimum bound (or 0.0 if no explicit minimum is defined).
- Validation
- ----------
- After loading and (if applicable) flattening, the function will validate the
- presence of a set of required columns and raise a ValueError if any are missing.
- This provides an early, clear error message instead of letting downstream code fail
- with a less informative exception.
+ It does NOT perform schema validation of any DataFrame (legacy text removed).
- Required columns (validator):
- - "pnl", "trade_duration", "idle_duration", "position", "action", "reward_total"
+ Parameters
+ ----------
+ params : dict
+ Raw user supplied reward parameter overrides (already merged with defaults
+ upstream). The dict is not mutated in‑place; a sanitized copy is returned.
Returns
-------
- pd.DataFrame
- DataFrame containing the transitions (one transition per row).
-
- Raises
- ------
- ValueError
- If the pickled payload cannot be converted to a DataFrame with the required columns.
-
+ sanitized_params : dict
+ Possibly adjusted copy of the provided parameters.
+ adjustments : dict[str, dict]
+ Mapping: param -> {original, adjusted, reason} for every modified entry.
"""
sanitized = dict(params)
adjustments: Dict[str, Dict[str, Any]] = {}
return sanitized, adjustments
+def _normalize_and_validate_mode(params: RewardParams) -> None:
+ """Align normalization of ``exit_attenuation_mode`` with ReforceXY environment.
+
+ Behaviour (mirrors in-env logic):
+ - Do not force lowercase or strip user formatting; use the value as provided.
+ - Supported modes (case-sensitive): {legacy, sqrt, linear, power, half_life}.
+ - If the value is not among supported keys, silently fall back to 'linear'
+ without emitting a warning (environment side performs a silent fallback).
+ - If the key is absent or value is ``None``: leave untouched (upstream defaults
+ will inject 'linear').
+ """
+ exit_attenuation_mode = params.get("exit_attenuation_mode")
+ if exit_attenuation_mode is None:
+ return
+ exit_attenuation_mode = str(exit_attenuation_mode)
+ if exit_attenuation_mode not in ALLOWED_EXIT_MODES:
+ params["exit_attenuation_mode"] = "linear"
+
+
def add_tunable_cli_args(parser: argparse.ArgumentParser) -> None:
"""Dynamically add CLI options for each tunable in DEFAULT_MODEL_REWARD_PARAMETERS.
Rules:
- Use the same underscored names as option flags (e.g., --idle_penalty_scale).
- Defaults are None so only user-provided values override params.
- - For exit_attenuation_mode, enforce allowed choices and lowercase conversion.
+ - For exit_attenuation_mode, enforce allowed choices (case-sensitive; invalid value will later silently fallback to 'linear').
- Skip keys already managed as top-level options (e.g., base_factor) to avoid duplicates.
"""
skip_keys = {"base_factor"} # already defined as top-level
if key == "exit_attenuation_mode":
parser.add_argument(
f"--{key}",
- type=str.lower,
- choices=["legacy", "sqrt", "linear", "power", "half_life"],
+ type=str, # case preserved; validation + silent fallback occurs before factor computation
+ choices=sorted(ALLOWED_EXIT_MODES),
default=None,
help=help_text,
)
min_unrealized_profit: float
position: Positions
action: Actions
- force_action: Optional[ForceActions]
@dataclasses.dataclass
pnl: float,
pnl_factor: float,
duration_ratio: float,
- params: Dict[str, float | str],
+ params: RewardParams,
) -> float:
- """Compute exit factor = time attenuation kernel (with optional plateau) * pnl_factor.
+ """Compute exit factor = time attenuation kernel (with optional plateau) * ``pnl_factor``.
+
+ Parity: mirrors the live environment's logic (``ReforceXY._get_exit_factor``).
- Parity: mirrors `ReforceXY._get_exit_factor`.
+ Assumptions:
+ - ``_normalize_and_validate_mode`` has already run (invalid modes replaced by 'linear').
+ - ``exit_attenuation_mode`` is therefore either a member of ``ALLOWED_EXIT_MODES`` or 'linear'.
+ - All numeric tunables are accessed through ``_get_param_float`` for safety.
- Steps:
- 1. Sanitize inputs (finite, non-negative duration_ratio).
- 2. Derive effective duration ratio: if plateau enabled and r <= grace ⇒ 0 else r' = r - grace.
- 3. Apply kernel (legacy|sqrt|linear|power|half_life). Unknown ⇒ linear.
- 4. Multiply by externally supplied pnl_factor (includes profit amplification & efficiency).
- 5. Enforce invariants (finite, non-negative when pnl ≥ 0, warn if |factor| exceeds threshold).
+ Algorithm steps:
+ 1. Finiteness & non-negative guard on inputs.
+ 2. Plateau handling: effective duration ratio = 0 within grace region else (r - grace).
+ 3. Kernel application (legacy|sqrt|linear|power|half_life).
+ 4. Multiply by externally supplied ``pnl_factor`` (already includes profit & efficiency effects).
+ 5. Invariants: ensure finiteness; clamp negative factor when pnl >= 0; emit threshold warning.
"""
# Basic finiteness checks
if (
if duration_ratio < 0.0:
duration_ratio = 0.0
- exit_attenuation_mode = str(params.get("exit_attenuation_mode", "linear")).lower()
+ exit_attenuation_mode = str(params.get("exit_attenuation_mode", "linear"))
exit_plateau = _to_bool(params.get("exit_plateau", True))
exit_plateau_grace = _get_param_float(params, "exit_plateau_grace", 1.0)
def _get_pnl_factor(
- params: Dict[str, float | str], context: RewardContext, profit_target: float
+ params: RewardParams, context: RewardContext, profit_target: float
) -> float:
"""Env-aligned PnL factor combining profit amplification and exit efficiency."""
pnl = context.pnl
action: Actions,
*,
short_allowed: bool,
- force_action: Optional[ForceActions],
) -> bool:
- if force_action is not None and position in (Positions.Long, Positions.Short):
- if position == Positions.Long:
- return action == Actions.Long_exit
- return action == Actions.Short_exit
-
if action == Actions.Neutral:
return True
if action == Actions.Long_enter:
def _idle_penalty(
- context: RewardContext, idle_factor: float, params: Dict[str, float | str]
+ context: RewardContext, idle_factor: float, params: RewardParams
) -> float:
"""Mirror the environment's idle penalty behaviour."""
idle_penalty_scale = _get_param_float(
max_idle_duration = int(max_idle_duration_candles)
except (TypeError, ValueError):
max_idle_duration = 2 * max_trade_duration_candles
- if max_idle_duration <= 0:
- max_idle_duration = 2 * max_trade_duration_candles
idle_duration_ratio = context.idle_duration / max(1, max_idle_duration)
return -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power
def _holding_penalty(
- context: RewardContext, holding_factor: float, params: Dict[str, float | str]
+ context: RewardContext, holding_factor: float, params: RewardParams
) -> float:
"""Mirror the environment's holding penalty behaviour."""
holding_penalty_scale = _get_param_float(
base_factor: float,
pnl_factor: float,
context: RewardContext,
- params: Dict[str, float | str],
+ params: RewardParams,
) -> float:
"""Compose the exit reward: pnl * exit_factor."""
duration_ratio = _compute_duration_ratio(
def calculate_reward(
context: RewardContext,
- params: Dict[str, float | str],
+ params: RewardParams,
base_factor: float,
profit_target: float,
risk_reward_ratio: float,
context.position,
context.action,
short_allowed=short_allowed,
- force_action=context.force_action,
)
if not is_valid and not action_masking:
breakdown.invalid_penalty = _get_param_float(params, "invalid_action", -2.0)
pnl_factor = _get_pnl_factor(params, context, profit_target_final)
holding_factor = idle_factor
- if context.force_action in (
- ForceActions.Take_profit,
- ForceActions.Stop_loss,
- ForceActions.Timeout,
- ):
- exit_reward = _compute_exit_reward(
- factor,
- pnl_factor,
- context,
- params,
- )
- breakdown.exit_component = exit_reward
- breakdown.total = exit_reward
- return breakdown
-
if context.action == Actions.Neutral and context.position == Positions.Neutral:
breakdown.idle_penalty = _idle_penalty(context, idle_factor, params)
breakdown.total = breakdown.idle_penalty
return rng.choices(choices, weights=weights, k=1)[0]
-def parse_overrides(overrides: Iterable[str]) -> Dict[str, float | str]:
- parsed: Dict[str, float | str] = {}
+def parse_overrides(overrides: Iterable[str]) -> RewardParams:
+ parsed: RewardParams = {}
for override in overrides:
if "=" not in override:
raise ValueError(f"Invalid override format: '{override}'")
- key, raw_value = override.split("=", 1)
+ key, value = override.split("=", 1)
try:
- parsed[key] = float(raw_value)
+ parsed[key] = float(value)
except ValueError:
- parsed[key] = raw_value
+ parsed[key] = value
return parsed
def simulate_samples(
num_samples: int,
seed: int,
- params: Dict[str, float | str],
+ params: RewardParams,
max_trade_duration: int,
base_factor: float,
profit_target: float,
position_weights = [0.6, 0.4]
position = rng.choices(position_choices, weights=position_weights, k=1)[0]
- force_action: Optional[ForceActions]
- if position != Positions.Neutral and rng.random() < 0.08:
- force_action = rng.choice(
- [ForceActions.Take_profit, ForceActions.Stop_loss, ForceActions.Timeout]
- )
- else:
- force_action = None
-
- if (
- action_masking
- and force_action is not None
- and position != Positions.Neutral
- ):
- action = (
- Actions.Long_exit if position == Positions.Long else Actions.Short_exit
- )
- else:
- action = _sample_action(position, rng, short_allowed=short_allowed)
+ action = _sample_action(position, rng, short_allowed=short_allowed)
if position == Positions.Neutral:
trade_duration = 0
except (TypeError, ValueError):
max_idle_duration_candles = int(max_trade_duration * max_duration_ratio)
- if max_idle_duration_candles <= 0:
- max_idle_duration_candles = int(max_trade_duration * max_duration_ratio)
-
idle_duration = int(rng.uniform(0, max_idle_duration_candles))
else:
trade_duration = int(
elif position == Positions.Short:
pnl -= 0.005 * duration_factor
- # Force actions should correlate with PnL sign
- if force_action == ForceActions.Take_profit:
- # Take profit exits should have positive PnL
- pnl = abs(pnl) + rng.uniform(0.01, 0.05)
- elif force_action == ForceActions.Stop_loss:
- # Stop loss exits should have negative PnL
- pnl = -abs(pnl) - rng.uniform(0.01, 0.05)
-
# Clip PnL to realistic range
pnl = max(min(pnl, 0.15), -0.15)
min_unrealized_profit=min_unrealized_profit,
position=position,
action=action,
- force_action=force_action,
)
breakdown = calculate_reward(
"idle_ratio": context.idle_duration / max(1, max_trade_duration),
"position": float(context.position.value),
"action": float(context.action.value),
- "force_action": float(
- -1 if context.force_action is None else context.force_action.value
- ),
"reward_total": breakdown.total,
"reward_invalid": breakdown.invalid_penalty,
"reward_idle": breakdown.idle_penalty,
"reward_holding": breakdown.holding_penalty,
"reward_exit": breakdown.exit_component,
- "is_force_exit": float(context.force_action is not None),
"is_invalid": float(breakdown.invalid_penalty != 0.0),
}
)
idle_activated = float((df["reward_idle"] != 0).mean())
holding_activated = float((df["reward_holding"] != 0).mean())
exit_activated = float((df["reward_exit"] != 0).mean())
- force_exit_share = float(df["is_force_exit"].mean())
return {
"total": total,
"idle_activated": idle_activated,
"holding_activated": holding_activated,
"exit_activated": exit_activated,
- "force_exit_share": force_exit_share,
}
"idle_ratio",
"position",
"action",
- "force_action",
- "is_force_exit",
"is_invalid",
]
X = df[feature_cols]
+ for col in ("trade_duration", "idle_duration"):
+ if col in X.columns and pd.api.types.is_integer_dtype(X[col]):
+ X[col] = X[col].astype(float)
y = df["reward_total"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.25, random_state=seed
"idle_ratio",
"max_unrealized_profit",
"min_unrealized_profit",
- "is_force_exit",
- "force_action",
}
for col in list(numeric_expected | numeric_optional):
"n_groups": len(position_groups),
}
- # Test 3: Force vs regular exits
- force_exits = df[df["is_force_exit"] == 1]["reward_exit"].dropna()
- regular_exits = df[(df["is_force_exit"] == 0) & (df["reward_exit"] != 0)][
- "reward_exit"
- ].dropna()
-
- if len(force_exits) >= 30 and len(regular_exits) >= 30:
- u_stat, p_val = stats.mannwhitneyu(
- force_exits, regular_exits, alternative="two-sided"
- )
- n1, n2 = len(force_exits), len(regular_exits)
- # Rank-biserial correlation (directional): r = 2*U1/(n1*n2) - 1
- # Compute U1 from sum of ranks of the first group for a robust sign.
- combined = np.concatenate([force_exits.values, regular_exits.values])
- ranks = stats.rankdata(combined, method="average")
- R1 = float(ranks[:n1].sum())
- U1 = R1 - n1 * (n1 + 1) / 2.0
- r_rb = (2.0 * U1) / (n1 * n2) - 1.0
-
- results["force_vs_regular_exits"] = {
- "test": "Mann-Whitney U",
- "statistic": float(u_stat),
- "p_value": float(p_val),
- "significant": bool(p_val < alpha),
- "effect_size_rank_biserial": float(r_rb),
- "median_force": float(force_exits.median()),
- "median_regular": float(regular_exits.median()),
- "n_force": len(force_exits),
- "n_regular": len(regular_exits),
- }
-
- # Test 4: PnL sign differences
+ # Test 3: PnL sign differences
pnl_positive = df[df["pnl"] > 0]["reward_total"].dropna()
pnl_negative = df[df["pnl"] < 0]["reward_total"].dropna()
f"| Holding penalty | {representativity_stats['holding_activated']:.1%} |\n"
)
f.write(f"| Exit reward | {representativity_stats['exit_activated']:.1%} |\n")
- f.write(f"| Force exit | {representativity_stats['force_exit_share']:.1%} |\n")
f.write("\n")
# Section 3: Reward Component Relationships
)
f.write(f"- **Interpretation:** {h['interpretation']} effect\n\n")
- if "force_vs_regular_exits" in hypothesis_tests:
- h = hypothesis_tests["force_vs_regular_exits"]
- f.write("#### 5.1.3 Force vs Regular Exits Comparison\n\n")
- f.write(f"**Test Method:** {h['test']}\n\n")
- f.write(f"- U-statistic: **{h['statistic']:.4f}**\n")
- f.write(f"- p-value: {h['p_value']:.4g}\n")
- if "p_value_adj" in h:
- f.write(
- f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅' if h['significant_adj'] else '❌'} (α=0.05)\n"
- )
- f.write(
- f"- Effect size (rank-biserial): {h['effect_size_rank_biserial']:.4f}\n"
- )
- f.write(f"- Median (force): {h['median_force']:.4f}\n")
- f.write(f"- Median (regular): {h['median_regular']:.4f}\n")
- f.write(
- f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n\n"
- )
-
if "pnl_sign_reward_difference" in hypothesis_tests:
h = hypothesis_tests["pnl_sign_reward_difference"]
f.write("#### 5.1.4 Positive vs Negative PnL Comparison\n\n")
# Early parameter validation (moved before simulation for alignment with docs)
params_validated, adjustments = validate_reward_parameters(params)
params = params_validated
+ # Normalize attenuation mode
+ _normalize_and_validate_mode(params)
base_factor = _get_param_float(params, "base_factor", float(args.base_factor))
profit_target = _get_param_float(params, "profit_target", float(args.profit_target))
from reward_space_analysis import (
DEFAULT_MODEL_REWARD_PARAMETERS,
Actions,
- ForceActions,
Positions,
RewardContext,
_get_exit_factor,
min_unrealized_profit=0.015,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
breakdown = calculate_reward(
breakdown.exit_component, 0, "Profitable exit should have positive reward"
)
- def test_force_action_logic(self):
- """Validate forced exits (take profit, stop loss, timeout) produce consistent exit rewards.
-
- Algorithmic expectations:
- - ForceActions override the provided action and trigger exit reward path.
- - Exit reward sign should match PnL sign (exit_factor is positive under invariants).
- - Take profit reward magnitude > stop loss reward magnitude for comparable |PnL|.
- - Timeout uses current PnL (can be positive or negative); we assert sign consistency only.
- """
- profit_target = 0.06
-
- # Take profit (positive pnl)
- tp_context = RewardContext(
- pnl=0.05,
- trade_duration=50,
- idle_duration=0,
- max_trade_duration=100,
- max_unrealized_profit=0.07,
- min_unrealized_profit=0.01,
- position=Positions.Long,
- action=Actions.Neutral, # action ignored due to force_action
- force_action=ForceActions.Take_profit,
- )
- tp_breakdown = calculate_reward(
- tp_context,
- self.DEFAULT_PARAMS,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=profit_target,
- risk_reward_ratio=self.TEST_RR_HIGH,
- short_allowed=True,
- action_masking=True,
- )
- self.assertGreater(
- tp_breakdown.exit_component,
- 0.0,
- "Take profit should yield positive exit reward",
- )
- # Exit reward should be the only active component
- self.assertEqual(tp_breakdown.invalid_penalty, 0.0)
- self.assertEqual(tp_breakdown.idle_penalty, 0.0)
- self.assertEqual(tp_breakdown.holding_penalty, 0.0)
- self.assertEqual(tp_breakdown.total, tp_breakdown.exit_component)
- self.assertAlmostEqualFloat(
- math.copysign(1, tp_breakdown.exit_component),
- 1.0,
- msg="TP reward sign mismatch",
- )
-
- # Stop loss (negative pnl)
- sl_context = RewardContext(
- pnl=-0.03,
- trade_duration=50,
- idle_duration=0,
- max_trade_duration=100,
- max_unrealized_profit=0.01,
- min_unrealized_profit=-0.05,
- position=Positions.Long,
- action=Actions.Neutral,
- force_action=ForceActions.Stop_loss,
- )
- sl_breakdown = calculate_reward(
- sl_context,
- self.DEFAULT_PARAMS,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=profit_target,
- risk_reward_ratio=self.TEST_RR_HIGH,
- short_allowed=True,
- action_masking=True,
- )
- self.assertLess(
- sl_breakdown.exit_component,
- 0.0,
- "Stop loss should yield negative exit reward",
- )
- self.assertEqual(sl_breakdown.invalid_penalty, 0.0)
- self.assertEqual(sl_breakdown.idle_penalty, 0.0)
- self.assertEqual(sl_breakdown.holding_penalty, 0.0)
- self.assertEqual(sl_breakdown.total, sl_breakdown.exit_component)
- self.assertAlmostEqualFloat(
- math.copysign(1, sl_breakdown.exit_component),
- -1.0,
- msg="SL reward sign mismatch",
- )
-
- # Timeout (use small positive pnl)
- to_context = RewardContext(
- pnl=0.01,
- trade_duration=120, # beyond default max
- idle_duration=0,
- max_trade_duration=100,
- max_unrealized_profit=0.02,
- min_unrealized_profit=-0.01,
- position=Positions.Long,
- action=Actions.Neutral,
- force_action=ForceActions.Timeout,
- )
- to_breakdown = calculate_reward(
- to_context,
- self.DEFAULT_PARAMS,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=profit_target,
- risk_reward_ratio=self.TEST_RR_HIGH,
- short_allowed=True,
- action_masking=True,
- )
- self.assertGreaterEqual(
- to_breakdown.exit_component,
- 0.0,
- "Timeout reward should be non-negative with positive PnL",
- )
- self.assertEqual(to_breakdown.invalid_penalty, 0.0)
- self.assertEqual(to_breakdown.idle_penalty, 0.0)
- self.assertEqual(to_breakdown.holding_penalty, 0.0)
- self.assertEqual(to_breakdown.total, to_breakdown.exit_component)
-
- # Magnitude ordering: TP reward magnitude > SL reward magnitude (absolute values, given larger |pnl| for TP)
- self.assertGreater(
- abs(tp_breakdown.exit_component),
- abs(sl_breakdown.exit_component),
- "Take profit reward magnitude should exceed stop loss reward magnitude",
- )
-
def test_efficiency_zero_policy(self):
"""Ensure pnl == 0 with max_unrealized_profit == 0 does not get boosted.
min_unrealized_profit=-0.02,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
params = self.DEFAULT_PARAMS.copy()
min_unrealized_profit=0.0,
position=Positions.Neutral,
action=Actions.Neutral,
- force_action=None,
)
breakdown_small = calculate_reward(
"""Fallback & proportionality validation.
Semantics:
- - When max_idle_duration_candles <= 0, fallback must be 2 * max_trade_duration (updated rule).
+ - When max_idle_duration_candles is unset, fallback must be 2 * max_trade_duration.
- Idle penalty scales ~ linearly with idle_duration (power=1), so doubling idle_duration doubles penalty magnitude.
- We also infer the implicit denominator from a mid-range idle duration (>1x and <2x trade duration) to ensure the
2x fallback.
"""
params = self.DEFAULT_PARAMS.copy()
- params["max_idle_duration_candles"] = 0 # force fallback
+ params["max_idle_duration_candles"] = None
base_factor = 90.0
profit_target = self.TEST_PROFIT_TARGET
risk_reward_ratio = 1.0
min_unrealized_profit=0.0,
position=Positions.Neutral,
action=Actions.Neutral,
- force_action=None,
)
ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
# Baseline with moderate base_factor
min_unrealized_profit=0.0,
position=Positions.Neutral,
action=Actions.Neutral,
- force_action=None,
)
br = calculate_reward(
context,
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
br = calculate_reward(
context,
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
),
# Losing exit
RewardContext(
min_unrealized_profit=-0.04,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
),
# Idle penalty
RewardContext(
min_unrealized_profit=0.0,
position=Positions.Neutral,
action=Actions.Neutral,
- force_action=None,
),
# Holding penalty (maintained position)
RewardContext(
min_unrealized_profit=-0.01,
position=Positions.Long,
action=Actions.Neutral,
- force_action=None,
),
]
min_unrealized_profit=pnl if pnl < 0 else -0.01,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
ctx_short = RewardContext(
pnl=pnl,
min_unrealized_profit=pnl if pnl < 0 else -0.01,
position=Positions.Short,
action=Actions.Short_exit,
- force_action=None,
)
br_long = calculate_reward(
ctx_long,
min_unrealized_profit=0.04,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
params = self.DEFAULT_PARAMS.copy()
"""Test reward calculation scenarios."""
# Test different reward scenarios
test_cases = [
- # (position, action, force_action, expected_reward_type)
- (Positions.Neutral, Actions.Neutral, None, "idle_penalty"),
- (Positions.Long, Actions.Long_exit, None, "exit_component"),
- (Positions.Short, Actions.Short_exit, None, "exit_component"),
- (
- Positions.Long,
- Actions.Neutral,
- ForceActions.Take_profit,
- "exit_component",
- ),
- (
- Positions.Short,
- Actions.Neutral,
- ForceActions.Stop_loss,
- "exit_component",
- ),
+ # (position, action, expected_reward_type)
+ (Positions.Neutral, Actions.Neutral, "idle_penalty"),
+ (Positions.Long, Actions.Long_exit, "exit_component"),
+ (Positions.Short, Actions.Short_exit, "exit_component"),
]
- for position, action, force_action, expected_type in test_cases:
- with self.subTest(
- position=position, action=action, force_action=force_action
- ):
+ for position, action, expected_type in test_cases:
+ with self.subTest(position=position, action=action):
context = RewardContext(
- pnl=0.02 if force_action == ForceActions.Take_profit else -0.02,
+ pnl=0.02 if expected_type == "exit_component" else 0.0,
trade_duration=50 if position != Positions.Neutral else 0,
idle_duration=10 if position == Positions.Neutral else 0,
max_trade_duration=100,
min_unrealized_profit=-0.01,
position=position,
action=action,
- force_action=force_action,
)
breakdown = calculate_reward(
min_unrealized_profit=0.02,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
breakdown = calculate_reward(
min_unrealized_profit=0.01,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
breakdown = calculate_reward(
min_unrealized_profit=0.0,
position=Positions.Neutral,
action=Actions.Neutral,
- force_action=None,
)
breakdown = calculate_reward(
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Neutral,
- force_action=None,
)
breakdown = calculate_reward(
min_unrealized_profit=min(pnl - 0.01, -0.01),
position=position,
action=action,
- force_action=None,
)
breakdown = calculate_reward(
min_unrealized_profit=0.01,
position=Positions.Short,
action=Actions.Long_exit,
- force_action=None, # Invalid: can't long_exit from short
)
breakdown = calculate_reward(
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Neutral,
- force_action=None,
)
breakdown = calculate_reward(
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Neutral,
- force_action=None,
)
breakdown = calculate_reward(
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
breakdown = calculate_reward(
context,
min_unrealized_profit: float = 0.01,
position: Positions = Positions.Long,
action: Actions = Actions.Long_exit,
- force_action: ForceActions | None = None,
) -> RewardContext:
return RewardContext(
pnl=pnl,
min_unrealized_profit=min_unrealized_profit,
position=position,
action=action,
- force_action=force_action,
)
def test_decomposition_integrity(self):
min_unrealized_profit=0.0,
position=Positions.Neutral,
action=Actions.Neutral,
- force_action=None,
),
active="idle_penalty",
),
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Neutral,
- force_action=None,
),
active="holding_penalty",
),
min_unrealized_profit=0.0,
position=Positions.Short,
action=Actions.Long_exit, # invalid
- force_action=None,
),
active="invalid_penalty",
),
min_unrealized_profit=0.0,
position=Positions.Neutral,
action=Actions.Neutral,
- force_action=None,
)
ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
br_a = calculate_reward(
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Neutral,
- force_action=None,
)
ctx_h2 = dataclasses.replace(ctx_h1, trade_duration=140)
# Compute baseline and comparison holding penalties
min_unrealized_profit=0.0,
position=Positions.Long,
action=Actions.Long_exit,
- force_action=None,
)
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
// "trading_mode": "futures",
// "margin_mode": "isolated",
"trading_mode": "spot",
- // "minimal_roi": {
- // "0": 0.03
- // }, // Take_profit exit value used with force_actions
- // "stoploss": -0.02, // Stop_loss exit value used with force_actions
"stoploss": -0.99,
"unfilledtimeout": {
"entry": 10,
"add_state_info": true,
"cpu_count": 4,
"max_training_drawdown_pct": 0.02,
- "max_trade_duration_candles": 96, // Timeout exit value used with force_actions
- "force_actions": false, // Utilize minimal_roi, stoploss, and max_trade_duration_candles as TP/SL/Timeout in the environment
+ "max_trade_duration_candles": 96, // Maximum trade duration in candles
"n_envs": 8, // Number of DummyVecEnv or SubProcVecEnv training environments
"multiprocessing": true, // Use SubprocVecEnv if n_envs>1 (otherwise DummyVecEnv)
"frame_stacking": 2, // Number of VecFrameStack stacks (set > 1 to use)
import warnings
from collections import defaultdict
from collections.abc import Mapping
-from enum import IntEnum
+
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Type, Union
logger = logging.getLogger(__name__)
-class ForceActions(IntEnum):
- Take_profit = 0
- Stop_loss = 1
- Timeout = 2
-
-
class ReforceXY(BaseReinforcementLearningModel):
"""
Custom Freqtrade Freqai reinforcement learning prediction model.
{
"freqaimodel": "ReforceXY",
"strategy": "RLAgentStrategy",
- "minimal_roi": {"0": 0.03}, // Take_profit exit value used with force_actions
- "stoploss": -0.02, // Stop_loss exit value used with force_actions
...
"freqai": {
...
"rl_config": {
...
- "max_trade_duration_candles": 96, // Timeout exit value used with force_actions
- "force_actions": false, // Utilize minimal_roi, stoploss, and max_trade_duration_candles as TP/SL/Timeout in the environment
+ "max_trade_duration_candles": 96, // Maximum trade duration in candles
"n_envs": 1, // Number of DummyVecEnv or SubProcVecEnv training environments
"n_eval_envs": 1, // Number of DummyVecEnv or SubProcVecEnv evaluation environments
"multiprocessing": false, // Use SubprocVecEnv if n_envs>1 (otherwise DummyVecEnv)
"""
_LOG_2 = math.log(2.0)
- _action_masks_cache: Dict[Tuple[str, int, Optional[int]], NDArray[np.bool_]] = {}
+ _action_masks_cache: Dict[Tuple[bool, int], NDArray[np.bool_]] = {}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def get_action_masks(
can_short: bool,
position: Positions,
- force_action: Optional[ForceActions] = None,
) -> NDArray[np.bool_]:
position = ReforceXY._normalize_position(position)
cache_key = (
can_short,
position.value,
- force_action.value if force_action else None,
)
if cache_key in ReforceXY._action_masks_cache:
return ReforceXY._action_masks_cache[cache_key]
action_masks = np.zeros(len(Actions), dtype=np.bool_)
- if force_action is not None and position in (Positions.Long, Positions.Short):
- if position == Positions.Long:
- action_masks[Actions.Long_exit.value] = True
- else:
- action_masks[Actions.Short_exit.value] = True
- ReforceXY._action_masks_cache[cache_key] = action_masks
- return ReforceXY._action_masks_cache[cache_key]
-
action_masks[Actions.Neutral.value] = True
if position == Positions.Neutral:
action_masks[Actions.Long_enter.value] = True
super().__init__(*args, **kwargs)
self._set_observation_space()
self.action_masking: bool = self.rl_config.get("action_masking", False)
- self.force_actions: bool = self.rl_config.get("force_actions", False)
- self._force_action: Optional[ForceActions] = None
- self.take_profit: float = self.config.get("minimal_roi", {}).get("0", 0.03)
- self.stop_loss: float = self.config.get("stoploss", -0.02)
- self.timeout: int = self.rl_config.get("max_trade_duration_candles", 128)
+ self.max_trade_duration_candles: int = self.rl_config.get(
+ "max_trade_duration_candles", 128
+ )
self._last_closed_position: Optional[Positions] = None
self._last_closed_trade_tick: int = 0
self._max_unrealized_profit: float = -np.inf
self._min_unrealized_profit: float = np.inf
- if self.force_actions:
- logger.info(
- "%s - take_profit: %s, stop_loss: %s, timeout: %s candles (%s days), observation_space: %s",
- self.id,
- self.take_profit,
- self.stop_loss,
- self.timeout,
- steps_to_days(self.timeout, self.config.get("timeframe")),
- self.observation_space,
- )
def _set_observation_space(self) -> None:
"""
)
def _is_valid(self, action: int) -> bool:
- return ReforceXY.get_action_masks(
- self.can_short, self._position, self._force_action
- )[action]
+ return ReforceXY.get_action_masks(self.can_short, self._position)[action]
def reset_env(
self,
Reset is called at the beginning of every episode
"""
observation, history = super().reset(seed, **kwargs)
- self._force_action: Optional[ForceActions] = None
self._last_closed_position: Optional[Positions] = None
self._last_closed_trade_tick: int = 0
self._max_unrealized_profit = -np.inf
# mrr = self.get_most_recent_return()
# mrp = self.get_most_recent_profit()
- max_trade_duration = max(1, self.timeout)
+ max_trade_duration = max(self.max_trade_duration_candles, 1)
trade_duration = self.get_trade_duration()
duration_ratio = trade_duration / max_trade_duration
idle_factor = base_factor * pnl_target / 3.0
holding_factor = idle_factor
- # Force exits
- if self._force_action in (
- ForceActions.Take_profit,
- ForceActions.Stop_loss,
- ForceActions.Timeout,
- ):
- return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
-
# # you can use feature values from dataframe
# rsi_now = self.get_feature_value(
# name="%-rsi",
"max_idle_duration_candles", 2 * max_trade_duration
)
)
- if max_idle_duration <= 0:
- max_idle_duration = 2 * max_trade_duration
idle_penalty_scale = float(
model_reward_parameters.get("idle_penalty_scale", 0.5)
)
return np.ascontiguousarray(observations)
- def _get_force_action(self) -> Optional[ForceActions]:
- if not self.force_actions or self._position == Positions.Neutral:
- return None
-
- trade_duration = self.get_trade_duration()
- if trade_duration <= 1:
- return None
- if trade_duration >= self.timeout:
- return ForceActions.Timeout
-
- pnl = self.get_unrealized_profit()
- if pnl >= self.take_profit:
- return ForceActions.Take_profit
- if pnl <= self.stop_loss:
- return ForceActions.Stop_loss
- return None
-
def _get_position(self, action: int) -> Positions:
return {
Actions.Long_enter.value: Positions.Long,
"""
Execute trade based on the given action
"""
- # Force exit trade
- if self._force_action is not None:
- self._exit_trade()
- self.tensorboard_log(f"{self._force_action.name}", category="actions/force")
- return f"{self._force_action.name}"
if not self.is_tradesignal(action):
return None
self._update_unrealized_total_profit()
pre_pnl = self.get_unrealized_profit()
self._update_portfolio_log_returns()
- self._force_action = self._get_force_action()
reward = self.calculate_reward(action)
self.total_reward += reward
self.tensorboard_log(Actions._member_names_[action], category="actions")
"tick": self._current_tick,
"position": self._position.value,
"action": action,
- "force_action": (self._force_action.name if self._force_action else None),
"pre_pnl": round(pre_pnl, 5),
"pnl": round(pnl, 5),
"delta_pnl": round(delta_pnl, 5),
)
def action_masks(self) -> NDArray[np.bool_]:
- return ReforceXY.get_action_masks(
- self.can_short, self._position, self._force_action
- )
+ return ReforceXY.get_action_masks(self.can_short, self._position)
def get_feature_value(
self,
self._safe_logger_record(f"info/{metric}", value, exclude=logger_exclude)
if isinstance(infos_list, list) and infos_list:
- cat_keys = ("force_action", "action", "position")
+ cat_keys = ("action", "position")
cat_counts: Dict[str, Dict[Any, int]] = {
k: defaultdict(int) for k in cat_keys
}