"exit_half_life": 0.5,
# Efficiency keys (env defaults)
"efficiency_weight": 1.0,
- "efficiency_center": 0.35,
+ "efficiency_center": 0.5,
# Profit factor params (env defaults)
"win_reward_factor": 2.0,
"pnl_factor_beta": 0.5,
def _get_exit_factor(
- factor: float,
+ base_factor: float,
pnl: float,
pnl_factor: float,
duration_ratio: float,
"""
# Basic finiteness checks
if (
- not math.isfinite(factor)
+ not math.isfinite(base_factor)
or not math.isfinite(pnl)
or not math.isfinite(duration_ratio)
):
kernel = _linear_kernel
try:
- factor = kernel(factor, effective_dr)
+ base_factor = kernel(base_factor, effective_dr)
except Exception as e:
warnings.warn(
f"exit_attenuation_mode '{exit_attenuation_mode}' failed ({e!r}); fallback linear (effective_dr={effective_dr:.5f})",
RuntimeWarning,
stacklevel=2,
)
- factor = _linear_kernel(factor, effective_dr)
+ base_factor = _linear_kernel(base_factor, effective_dr)
# Apply pnl_factor after time attenuation
- factor *= pnl_factor
+ base_factor *= pnl_factor
# Invariant & safety checks
if _to_bool(params.get("check_invariants", True)):
- if not math.isfinite(factor):
+ if not math.isfinite(base_factor):
return 0.0
- if factor < 0.0 and pnl >= 0.0:
+ if base_factor < 0.0 and pnl >= 0.0:
# Clamp: avoid negative amplification on non-negative pnl
- factor = 0.0
+ base_factor = 0.0
thr = params.get("exit_factor_threshold")
if isinstance(thr, (int, float)) and thr > 0 and math.isfinite(thr):
- if abs(factor) > thr:
+ if abs(base_factor) > thr:
warnings.warn(
(
- f"_get_exit_factor |factor|={abs(factor):.2f} exceeds threshold {thr:.2f}"
+ f"_get_exit_factor |factor|={abs(base_factor):.2f} exceeds threshold {thr:.2f}"
),
RuntimeWarning,
stacklevel=2,
)
- return factor
+ return base_factor
def _get_pnl_factor(
efficiency_factor = 1.0
efficiency_weight = float(params.get("efficiency_weight", 1.0))
- efficiency_center = float(params.get("efficiency_center", 0.35))
+ efficiency_center = float(params.get("efficiency_center", 0.5))
if efficiency_weight != 0.0 and pnl >= 0.0:
max_pnl = max(context.max_unrealized_profit, pnl)
min_pnl = min(context.min_unrealized_profit, pnl)
return context.pnl * exit_factor
-def compute_exit_factor(
- base_factor: float,
- pnl: float,
- pnl_factor: float,
- duration_ratio: float,
- params: Dict[str, float | str],
-) -> float:
- """Public wrapper to compute the time-attenuated + pnl-scaled exit factor."""
- return _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params)
-
-
def calculate_reward(
context: RewardContext,
params: Dict[str, float | str],
}
-def write_summary(df: pd.DataFrame, output_dir: Path) -> None:
- """Legacy function - kept for backward compatibility."""
- output_dir.mkdir(parents=True, exist_ok=True)
- summary_path = output_dir / "reward_summary.md"
- stats = _compute_summary_stats(df)
-
- with summary_path.open("w", encoding="utf-8") as handle:
- handle.write("# Reward space summary\n\n")
- handle.write("## Global statistics\n\n")
- handle.write(stats["global_stats"].to_frame(name="reward_total").to_string())
- handle.write("\n\n")
- handle.write("## Action-wise reward statistics\n\n")
- handle.write(stats["action_summary"].to_string())
- handle.write("\n\n")
- handle.write("## Component activation ratio\n\n")
- handle.write(
- stats["component_share"].to_frame(name="activation_rate").to_string()
- )
- handle.write("\n\n")
- handle.write("## Component bounds (min/mean/max)\n\n")
- handle.write(stats["component_bounds"].to_string())
- handle.write("\n")
-
-
def _binned_stats(
df: pd.DataFrame,
column: str,
}
-def write_relationship_reports(
- df: pd.DataFrame,
- output_dir: Path,
- max_trade_duration: int,
-) -> None:
- """Legacy function - kept for backward compatibility."""
- output_dir.mkdir(parents=True, exist_ok=True)
- relationships_path = output_dir / "reward_relationships.md"
- stats = _compute_relationship_stats(df, max_trade_duration)
-
- with relationships_path.open("w", encoding="utf-8") as handle:
- handle.write("# Reward component relationships\n\n")
-
- handle.write("## Idle penalty by idle duration bins\n\n")
- if stats["idle_stats"].empty:
- handle.write("_No idle samples present._\n\n")
- else:
- handle.write(stats["idle_stats"].to_string())
- handle.write("\n\n")
-
- handle.write("## Holding penalty by trade duration bins\n\n")
- if stats["holding_stats"].empty:
- handle.write("_No holding samples present._\n\n")
- else:
- handle.write(stats["holding_stats"].to_string())
- handle.write("\n\n")
-
- handle.write("## Exit reward by PnL bins\n\n")
- if stats["exit_stats"].empty:
- handle.write("_No exit samples present._\n\n")
- else:
- handle.write(stats["exit_stats"].to_string())
- handle.write("\n\n")
-
- handle.write("## Correlation matrix\n\n")
- handle.write(stats["correlation"].to_csv(sep="\t", float_format="%.4f"))
- handle.write("\n")
-
-
def _compute_representativity_stats(
df: pd.DataFrame, profit_target: float, max_trade_duration: int | None = None
) -> Dict[str, Any]:
}
-def write_representativity_report(
- df: pd.DataFrame,
- output_dir: Path,
- profit_target: float,
- max_trade_duration: int,
-) -> None:
- output_dir.mkdir(parents=True, exist_ok=True)
- path = output_dir / "representativity.md"
-
- stats = _compute_representativity_stats(df, profit_target)
-
- with path.open("w", encoding="utf-8") as h:
- h.write("# Representativity diagnostics\n\n")
- h.write(f"Total samples: {stats['total']}\n\n")
- h.write("## Position distribution\n\n")
- h.write(stats["pos_counts"].to_frame(name="count").to_string())
- h.write("\n\n")
- h.write("## Action distribution\n\n")
- h.write(stats["act_counts"].to_frame(name="count").to_string())
- h.write("\n\n")
- h.write("## Key regime coverage\n\n")
- h.write(f"pnl > target fraction: {stats['pnl_above_target']:.4f}\n")
- h.write(f"pnl near target [0.8,1.2] fraction: {stats['pnl_near_target']:.4f}\n")
- h.write(
- f"duration overage (>1.0) fraction: {stats['duration_overage_share']:.4f}\n"
- )
- h.write(f"idle activated fraction: {stats['idle_activated']:.4f}\n")
- h.write(f"holding activated fraction: {stats['holding_activated']:.4f}\n")
- h.write(f"exit activated fraction: {stats['exit_activated']:.4f}\n")
- h.write(f"force exit fraction: {stats['force_exit_share']:.4f}\n")
- h.write(f"extreme pnl (|pnl|>=0.14) fraction: {stats['pnl_extreme']:.4f}\n")
- h.write("\n")
- h.write(
- "Notes: Coverage of critical regimes (pnl≈target, overage>1) and component activation\n"
- )
- h.write(
- "are indicators of sufficient reward space representativity for the analysis.\n"
- )
-
-
def _perform_feature_analysis(
df: pd.DataFrame, seed: int
) -> Tuple[
return importance_df, analysis_stats, partial_deps, model
-def model_analysis(df: pd.DataFrame, output_dir: Path, seed: int) -> None:
- """Legacy wrapper for backward compatibility."""
- importance_df, analysis_stats, partial_deps, model = _perform_feature_analysis(
- df, seed
- )
-
- # Save feature importance
- importance_df.to_csv(output_dir / "feature_importance.csv", index=False)
-
- # Save diagnostics
- diagnostics_path = output_dir / "model_diagnostics.md"
- top_features = importance_df.head(10)
- with diagnostics_path.open("w", encoding="utf-8") as handle:
- handle.write("# Random forest diagnostics\n\n")
- handle.write(f"R^2 score on hold-out set: {analysis_stats['r2_score']:.4f}\n\n")
- handle.write("## Feature importance (top 10)\n\n")
- handle.write(top_features.to_string(index=False))
- handle.write("\n\n")
- handle.write(
- "Partial dependence data exported to CSV files for trade_duration, "
- "idle_duration, and pnl.\n"
- )
-
- # Save partial dependence data
- for feature, pd_df in partial_deps.items():
- pd_df.to_csv(
- output_dir / f"partial_dependence_{feature}.csv",
- index=False,
- )
-
-
def load_real_episodes(path: Path) -> pd.DataFrame:
"""Load real episodes transitions from pickle file."""
with path.open("rb") as f:
raise AssertionError(f"Q-Q R^2 {key} must be in [0,1], got {value}")
-def write_enhanced_statistical_report(
- df: pd.DataFrame,
- output_dir: Path,
- real_df: Optional[pd.DataFrame] = None,
- *,
- adjust_method: str = "none",
-) -> None:
- """Generate enhanced statistical report with hypothesis tests and CI."""
- output_dir.mkdir(parents=True, exist_ok=True)
- report_path = output_dir / "enhanced_statistical_report.md"
-
- # Derive a deterministic seed for statistical tests: prefer provided seed if present in df attrs else fallback
- test_seed = 42
- if (
- hasattr(df, "attrs")
- and "seed" in df.attrs
- and isinstance(df.attrs["seed"], int)
- ):
- test_seed = int(df.attrs["seed"])
- hypothesis_tests = statistical_hypothesis_tests(
- df, adjust_method=adjust_method, seed=test_seed
- )
-
- metrics_for_ci = [
- "reward_total",
- "reward_idle",
- "reward_holding",
- "reward_exit",
- "pnl",
- ]
- confidence_intervals = bootstrap_confidence_intervals(df, metrics_for_ci)
-
- dist_diagnostics = distribution_diagnostics(df)
-
- shift_metrics = {}
- if real_df is not None:
- shift_metrics = compute_distribution_shift_metrics(df, real_df)
-
- with report_path.open("w", encoding="utf-8") as f:
- f.write("# Enhanced Statistical Report\n\n")
- f.write("**Generated with rigorous scientific methodology**\n\n")
-
- f.write("## 1. Statistical Hypothesis Tests\n\n")
- for test_name, test_result in hypothesis_tests.items():
- f.write(f"### {test_name.replace('_', ' ').title()}\n\n")
- f.write(f"- **Test:** {test_result['test']}\n")
- f.write(
- f"- **Statistic:** {test_result.get('statistic', test_result.get('rho', 'N/A')):.4f}\n"
- )
- f.write(f"- **p-value:** {test_result['p_value']:.4e}\n")
- if "p_value_adj" in test_result:
- f.write(
- f"- **p-value (adj BH):** {test_result['p_value_adj']:.4e} -> {'✅' if test_result['significant_adj'] else '❌'} (α=0.05)\n"
- )
- f.write(
- f"- **Significant (α=0.05):** {'✅ Yes' if test_result['significant'] else '❌ No'}\n"
- )
-
- if "ci_95" in test_result:
- ci = test_result["ci_95"]
- f.write(f"- **95% CI:** [{ci[0]:.4f}, {ci[1]:.4f}]\n")
-
- if "effect_size_epsilon_sq" in test_result:
- f.write(
- f"- **Effect Size (ε²):** {test_result['effect_size_epsilon_sq']:.4f}\n"
- )
-
- if "interpretation" in test_result:
- f.write(f"- **Interpretation:** {test_result['interpretation']}\n")
-
- f.write("\n")
-
- f.write("## 2. Bootstrap Confidence Intervals (95%)\n\n")
- f.write("| Metric | Point Estimate | CI Lower | CI Upper | Width |\n")
- f.write("|--------|----------------|----------|----------|-------|\n")
-
- for metric, (est, low, high) in confidence_intervals.items():
- width = high - low
- f.write(
- f"| {metric} | {est:.4f} | {low:.4f} | {high:.4f} | {width:.4f} |\n"
- )
-
- f.write("\n## 3. Distribution Diagnostics\n\n")
-
- for col in ["reward_total", "pnl", "trade_duration"]:
- if f"{col}_mean" in dist_diagnostics:
- f.write(f"### {col}\n\n")
- f.write(f"- **Mean:** {dist_diagnostics[f'{col}_mean']:.4f}\n")
- f.write(f"- **Std:** {dist_diagnostics[f'{col}_std']:.4f}\n")
- f.write(f"- **Skewness:** {dist_diagnostics[f'{col}_skewness']:.4f}\n")
- f.write(f"- **Kurtosis:** {dist_diagnostics[f'{col}_kurtosis']:.4f}\n")
-
- if f"{col}_shapiro_pval" in dist_diagnostics:
- is_normal = (
- "✅ Yes"
- if dist_diagnostics[f"{col}_is_normal_shapiro"]
- else "❌ No"
- )
- f.write(
- f"- **Normal (Shapiro-Wilk):** {is_normal} (p={dist_diagnostics[f'{col}_shapiro_pval']:.4e})\n"
- )
-
- if f"{col}_qq_r_squared" in dist_diagnostics:
- f.write(
- f"- **Q-Q R²:** {dist_diagnostics[f'{col}_qq_r_squared']:.4f}\n"
- )
-
- f.write("\n")
-
- if shift_metrics:
- f.write("## 4. Distribution Shift Metrics (Synthetic vs Real)\n\n")
- f.write("| Feature | KL Div | JS Dist | Wasserstein | KS p-value |\n")
- f.write("|---------|--------|---------|-------------|------------|\n")
-
- for feature in ["pnl", "trade_duration", "idle_duration"]:
- kl_key = f"{feature}_kl_divergence"
- if kl_key in shift_metrics:
- f.write(f"| {feature} | {shift_metrics[kl_key]:.4f} | ")
- f.write(f"{shift_metrics[f'{feature}_js_distance']:.4f} | ")
- f.write(f"{shift_metrics[f'{feature}_wasserstein']:.4f} | ")
- f.write(f"{shift_metrics[f'{feature}_ks_pvalue']:.4e} |\n")
-
- f.write("\n**Interpretation:**\n")
- f.write("- KL/JS (distance) < 0.2: Acceptable similarity\n")
- f.write("- Wasserstein: Lower is better\n")
- f.write(
- "- KS p-value > 0.05: Distributions not significantly different\n\n"
- )
-
- f.write("## 5. Methodological Recommendations\n\n")
-
- has_issues = []
-
- if "reward_total_is_normal_shapiro" in dist_diagnostics:
- if not dist_diagnostics["reward_total_is_normal_shapiro"]:
- has_issues.append(
- "⚠️ **Non-normal reward distribution:** Use non-parametric tests"
- )
-
- if shift_metrics:
- high_divergence = any(
- shift_metrics.get(f"{feat}_kl_divergence", 0) > 0.5
- for feat in ["pnl", "trade_duration", "idle_duration"]
- )
- if high_divergence:
- has_issues.append(
- "🔴 **High distribution shift:** Consider real episode sampling"
- )
-
- if has_issues:
- f.write("**Issues identified:**\n\n")
- for issue in has_issues:
- f.write(f"- {issue}\n")
- else:
- f.write("✅ **No major methodological issues detected.**\n")
-
- f.write("\n---\n\n")
- f.write(
- "**References:** Efron & Tibshirani (1993), Henderson et al. (2018), Pineau et al. (2021)\n"
- )
-
-
def build_argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Synthetic stress-test of the ReforceXY reward shaping logic."
import sys
import tempfile
import unittest
+import warnings
from pathlib import Path
import numpy as np
ForceActions,
Positions,
RewardContext,
+ _compute_relationship_stats,
+ _compute_representativity_stats,
+ _compute_summary_stats,
+ _get_exit_factor,
+ _perform_feature_analysis,
bootstrap_confidence_intervals,
+ build_argument_parser,
calculate_reward,
compute_distribution_shift_metrics,
- compute_exit_factor,
distribution_diagnostics,
+ load_real_episodes,
parse_overrides,
simulate_samples,
+ statistical_hypothesis_tests,
+ validate_reward_parameters,
+ write_complete_statistical_analysis,
)
except ImportError as e:
print(f"Import error: {e}")
- Take profit reward magnitude > stop loss reward magnitude for comparable |PnL|.
- Timeout uses current PnL (can be positive or negative); we assert sign consistency only.
"""
- base_factor = TEST_BASE_FACTOR
- profit_target = 0.06
# Take profit (positive pnl)
tp_context = RewardContext(
tp_breakdown = calculate_reward(
tp_context,
self.DEFAULT_PARAMS,
- base_factor=base_factor,
- profit_target=profit_target,
- risk_reward_ratio=2.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=0.06, # Scenario-specific larger target kept explicit
+ risk_reward_ratio=TEST_RR_HIGH,
short_allowed=True,
action_masking=True,
)
sl_breakdown = calculate_reward(
sl_context,
self.DEFAULT_PARAMS,
- base_factor=base_factor,
- profit_target=profit_target,
- risk_reward_ratio=2.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=0.06,
+ risk_reward_ratio=TEST_RR_HIGH,
short_allowed=True,
action_masking=True,
)
to_breakdown = calculate_reward(
to_context,
self.DEFAULT_PARAMS,
- base_factor=base_factor,
- profit_target=profit_target,
- risk_reward_ratio=2.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=0.06,
+ risk_reward_ratio=TEST_RR_HIGH,
short_allowed=True,
action_masking=True,
)
context,
params_small,
base_factor,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
breakdown_large = calculate_reward(
context,
params_large,
- base_factor,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=0.06,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
br_a = calculate_reward(
ctx_a,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
risk_reward_ratio=risk_reward_ratio,
short_allowed=True,
br_b = calculate_reward(
ctx_b,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
risk_reward_ratio=risk_reward_ratio,
short_allowed=True,
br_mid = calculate_reward(
ctx_mid,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
risk_reward_ratio=risk_reward_ratio,
short_allowed=True,
for mode in modes_to_test:
test_params = self.DEFAULT_PARAMS.copy()
test_params["exit_attenuation_mode"] = mode
- factor = compute_exit_factor(
+ factor = _get_exit_factor(
base_factor=1.0,
pnl=0.02,
pnl_factor=1.5,
"exit_linear_slope": 1.0,
}
)
- plateau_factor_pre = compute_exit_factor(
+ plateau_factor_pre = _get_exit_factor(
base_factor=1.0,
pnl=0.02,
pnl_factor=1.5,
duration_ratio=0.4, # inside grace
params=plateau_params,
)
- plateau_factor_post = compute_exit_factor(
+ plateau_factor_post = _get_exit_factor(
base_factor=1.0,
pnl=0.02,
pnl_factor=1.5,
params_lin_pos.update(
{"exit_attenuation_mode": "linear", "exit_linear_slope": 1.0}
)
- val_lin_neg = compute_exit_factor(
+ val_lin_neg = _get_exit_factor(
base_factor, pnl, pnl_factor, duration_ratio_linear, params_lin_neg
)
- val_lin_pos = compute_exit_factor(
+ val_lin_pos = _get_exit_factor(
base_factor, pnl, pnl_factor, duration_ratio_linear, params_lin_pos
)
self.assertAlmostEqualFloat(
"exit_linear_slope": 1.0,
}
)
- val_pl_neg = compute_exit_factor(
+ val_pl_neg = _get_exit_factor(
base_factor, pnl, pnl_factor, duration_ratio_plateau, params_pl_neg
)
- val_pl_pos = compute_exit_factor(
+ val_pl_pos = _get_exit_factor(
base_factor, pnl, pnl_factor, duration_ratio_plateau, params_pl_pos
)
self.assertAlmostEqualFloat(
self.DEFAULT_PARAMS,
base_factor=TEST_BASE_FACTOR,
profit_target=0.0, # critical case
- risk_reward_ratio=1.0,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
"exit_plateau": False,
}
)
- observed = compute_exit_factor(base_factor, pnl, pnl_factor, r, params)
+ observed = _get_exit_factor(base_factor, pnl, pnl_factor, r, params)
expected = base_factor / (1.0 + r) ** alpha
self.assertAlmostEqualFloat(
observed,
br1 = calculate_reward(
ctx,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
risk_reward_ratio=rr,
short_allowed=True,
br2 = calculate_reward(
ctx,
params,
- base_factor=base_factor * k,
+ base_factor=TEST_BASE_FACTOR * k,
profit_target=profit_target,
risk_reward_ratio=rr,
short_allowed=True,
br_long = calculate_reward(
ctx_long,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
risk_reward_ratio=rr,
short_allowed=True,
br_short = calculate_reward(
ctx_short,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
risk_reward_ratio=rr,
short_allowed=True,
def test_statistical_hypothesis_tests_seed_reproducibility(self):
"""Ensure statistical_hypothesis_tests + bootstrap CIs are reproducible with stats_seed."""
- from reward_space_analysis import (
- bootstrap_confidence_intervals,
- statistical_hypothesis_tests,
- )
np.random.seed(123)
# Create idle_duration with variability throughout to avoid constant Spearman warnings
max_trade_duration=100,
base_factor=TEST_BASE_FACTOR,
profit_target=TEST_PROFIT_TARGET,
- risk_reward_ratio=1.0,
+ risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="margin",
pnl_base_std=TEST_PNL_STD,
def test_exit_factor_mathematical_formulas(self):
"""Test mathematical correctness of exit factor calculations."""
- from reward_space_analysis import (
- Actions,
- Positions,
- RewardContext,
- calculate_reward,
- )
-
# Test context with known values
context = RewardContext(
pnl=0.05,
)
# Test hypothesis tests results bounds
- from reward_space_analysis import statistical_hypothesis_tests
-
hypothesis_results = statistical_hypothesis_tests(df, seed=42)
for test_name, result in hypothesis_results.items():
def test_benjamini_hochberg_adjustment(self):
"""Benjamini-Hochberg adjustment adds p_value_adj & significant_adj fields with valid bounds."""
- from reward_space_analysis import statistical_hypothesis_tests
# Use simulation to trigger multiple tests
df = simulate_samples(
max_trade_duration=100,
base_factor=TEST_BASE_FACTOR,
profit_target=TEST_PROFIT_TARGET,
- risk_reward_ratio=1.0,
+ risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="margin",
pnl_base_std=TEST_PNL_STD,
risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="spot",
- pnl_base_std=0.02,
- pnl_duration_vol_scale=0.5,
+ pnl_base_std=TEST_PNL_STD,
+ pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
)
# Should not have any short positions
seed=42,
params=self.DEFAULT_PARAMS,
max_trade_duration=100,
- base_factor=100.0,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="margin",
- pnl_base_std=0.02,
- pnl_duration_vol_scale=0.5,
+ pnl_base_std=TEST_PNL_STD,
+ pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
)
# Should have required columns
breakdown = calculate_reward(
context,
self.DEFAULT_PARAMS,
- base_factor=100.0,
- profit_target=0.03,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=1.0,
short_allowed=True,
action_masking=True,
context,
extreme_params,
base_factor=10000.0,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
breakdown = calculate_reward(
context,
test_params,
- base_factor=100.0,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
seed=42,
params={"action_masking": "true"},
max_trade_duration=50,
- base_factor=100.0,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="spot",
- pnl_base_std=0.02,
- pnl_duration_vol_scale=0.5,
+ pnl_base_std=TEST_PNL_STD,
+ pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
)
self.assertIsInstance(df1, pd.DataFrame)
seed=42,
params={"action_masking": "false"},
max_trade_duration=50,
- base_factor=100.0,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="spot",
- pnl_base_std=0.02,
- pnl_duration_vol_scale=0.5,
+ pnl_base_std=TEST_PNL_STD,
+ pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
)
self.assertIsInstance(df2, pd.DataFrame)
seed=42,
params=self.DEFAULT_PARAMS,
max_trade_duration=50,
- base_factor=100.0,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="futures",
- pnl_base_std=0.02,
- pnl_duration_vol_scale=0.5,
+ pnl_base_std=TEST_PNL_STD,
+ pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
)
# Should have some short positions
def test_model_analysis_function(self):
"""Test model_analysis function."""
- from reward_space_analysis import model_analysis
# Create test data
test_data = simulate_samples(
seed=42,
params=self.DEFAULT_PARAMS,
max_trade_duration=50,
- base_factor=100.0,
- profit_target=0.03,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=1.0,
max_duration_ratio=2.0,
trading_mode="spot",
- pnl_base_std=0.02,
- pnl_duration_vol_scale=0.5,
+ pnl_base_std=TEST_PNL_STD,
+ pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
)
# Create temporary output directory
with tempfile.TemporaryDirectory() as tmp_dir:
output_path = Path(tmp_dir)
- model_analysis(test_data, output_path, seed=42)
+ # Use the internal helper to compute analysis and persist a feature file
+ importance_df, analysis_stats, partial_deps, model = (
+ _perform_feature_analysis(test_data, seed=42)
+ )
- # Check that feature importance file is created
+ output_path.mkdir(parents=True, exist_ok=True)
feature_file = output_path / "feature_importance.csv"
+ importance_df.to_csv(feature_file, index=False)
self.assertTrue(
feature_file.exists(), "Feature importance file should be created"
)
def test_write_functions(self):
"""Test various write functions."""
- from reward_space_analysis import (
- write_relationship_reports,
- write_representativity_report,
- write_summary,
- )
# Create test data
test_data = simulate_samples(
seed=42,
params=self.DEFAULT_PARAMS,
max_trade_duration=50,
- base_factor=100.0,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="spot",
- pnl_base_std=0.02,
- pnl_duration_vol_scale=0.5,
+ pnl_base_std=TEST_PNL_STD,
+ pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
)
with tempfile.TemporaryDirectory() as tmp_dir:
output_path = Path(tmp_dir)
- # Test write_summary
- write_summary(test_data, output_path)
+ # Create a minimal summary file using the computation helper
+ output_path.mkdir(parents=True, exist_ok=True)
+ stats = _compute_summary_stats(test_data)
summary_file = output_path / "reward_summary.md"
+ with summary_file.open("w", encoding="utf-8") as h:
+ h.write("# Reward space summary\n\n")
+ h.write(stats["global_stats"].to_frame(name="reward_total").to_string())
+
self.assertTrue(summary_file.exists(), "Summary file should be created")
- # Test write_relationship_reports
- write_relationship_reports(test_data, output_path, max_trade_duration=50)
+ # Relationship reports: compute and write a simple markdown
+ rel_stats = _compute_relationship_stats(test_data, max_trade_duration=50)
relationship_file = output_path / "reward_relationships.md"
+ with relationship_file.open("w", encoding="utf-8") as h:
+ h.write("# Relationship diagnostics\n\n")
+ h.write(
+ "Idle stats present: "
+ + str(not rel_stats["idle_stats"].empty)
+ + "\n"
+ )
+
self.assertTrue(
relationship_file.exists(), "Relationship file should be created"
)
- # Test write_representativity_report
- write_representativity_report(
- test_data, output_path, profit_target=0.03, max_trade_duration=50
+ # Representativity report: compute and write a simple markdown
+ repr_stats = _compute_representativity_stats(
+ test_data, profit_target=TEST_PROFIT_TARGET
)
repr_file = output_path / "representativity.md"
+ with repr_file.open("w", encoding="utf-8") as h:
+ h.write("# Representativity diagnostics\n\n")
+ h.write(f"Total samples: {repr_stats['total']}\n")
+
self.assertTrue(
repr_file.exists(), "Representativity file should be created"
)
def test_load_real_episodes(self):
"""Test load_real_episodes function."""
- from reward_space_analysis import load_real_episodes
# Create a temporary pickle file with test data
test_episodes = pd.DataFrame(
def test_statistical_functions(self):
"""Test statistical functions."""
- from reward_space_analysis import (
- statistical_hypothesis_tests,
- write_enhanced_statistical_report,
- )
# Create test data with specific patterns
np.random.seed(42)
results = statistical_hypothesis_tests(test_data)
self.assertIsInstance(results, dict)
- # Test enhanced statistical report
- with tempfile.TemporaryDirectory() as tmp_dir:
- output_path = Path(tmp_dir)
- write_enhanced_statistical_report(test_data, output_path)
- report_file = output_path / "enhanced_statistical_report.md"
- self.assertTrue(
- report_file.exists(), "Enhanced statistical report should be created"
- )
-
def test_argument_parser_construction(self):
"""Test build_argument_parser function."""
- from reward_space_analysis import build_argument_parser
parser = build_argument_parser()
self.assertIsNotNone(parser)
def test_complete_statistical_analysis_writer(self):
"""Test write_complete_statistical_analysis function."""
- from reward_space_analysis import write_complete_statistical_analysis
+ # imports consolidated at top of file
# Create comprehensive test data
test_data = simulate_samples(
seed=42,
params=self.DEFAULT_PARAMS,
max_trade_duration=100,
- base_factor=100.0,
+ base_factor=TEST_BASE_FACTOR,
profit_target=0.03,
risk_reward_ratio=1.0,
max_duration_ratio=2.0,
trading_mode="margin",
- pnl_base_std=0.02,
- pnl_duration_vol_scale=0.5,
+ pnl_base_std=TEST_PNL_STD,
+ pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
)
with tempfile.TemporaryDirectory() as tmp_dir:
test_data,
output_path,
max_trade_duration=100,
- profit_target=0.03,
+ profit_target=TEST_PROFIT_TARGET,
seed=42,
real_df=None,
)
breakdown = calculate_reward(
context,
self.DEFAULT_PARAMS,
- base_factor=100.0,
- profit_target=0.03,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=1.0,
short_allowed=True,
action_masking=True,
breakdown = calculate_reward(
context,
self.DEFAULT_PARAMS,
- base_factor=100.0,
- profit_target=0.03,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=1.0,
short_allowed=True,
action_masking=True,
breakdown = calculate_reward(
context,
self.DEFAULT_PARAMS,
- base_factor=100.0,
+ base_factor=TEST_BASE_FACTOR,
profit_target=0.03,
risk_reward_ratio=1.0,
short_allowed=True,
breakdown = calculate_reward(
context,
self.DEFAULT_PARAMS,
- base_factor=100.0,
+ base_factor=TEST_BASE_FACTOR,
profit_target=0.03,
risk_reward_ratio=1.0,
short_allowed=True,
breakdown = calculate_reward(
context,
self.DEFAULT_PARAMS,
- base_factor=100.0,
+ base_factor=TEST_BASE_FACTOR,
profit_target=0.03,
- risk_reward_ratio=1.0,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
breakdown = calculate_reward(
context,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=0.03,
- risk_reward_ratio=1.0,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
- Exit factor monotonic attenuation per mode where mathematically expected
- Boundary parameter conditions (tau extremes, plateau grace edges, linear slope = 0)
- Non-linear power tests for idle & holding penalties (power != 1)
- - Public wrapper `compute_exit_factor` (avoids private function usage in new tests)
+ - Public wrapper `_get_exit_factor` (avoids private function usage in new tests)
- Warning emission (exit_factor_threshold) without capping
"""
br = calculate_reward(
ctx_obj,
self.DEFAULT_PARAMS,
- base_factor=100.0,
+ base_factor=TEST_BASE_FACTOR,
profit_target=0.03,
risk_reward_ratio=1.0,
short_allowed=True,
Modes covered: sqrt, linear, power, half_life, plateau+linear (after grace).
Legacy is excluded (non-monotonic by design). Plateau+linear includes flat grace then monotonic.
"""
- from reward_space_analysis import compute_exit_factor
modes = ["sqrt", "linear", "power", "half_life", "plateau_linear"]
- base_factor = 100.0
+ base_factor = TEST_BASE_FACTOR
pnl = 0.05
pnl_factor = 1.0
for mode in modes:
ratios = np.linspace(0, 2, 15)
values = [
- compute_exit_factor(base_factor, pnl, pnl_factor, r, params)
+ _get_exit_factor(base_factor, pnl, pnl_factor, r, params)
for r in ratios
]
# Plateau+linear: ignore initial flat region when checking monotonic decrease
def test_exit_factor_boundary_parameters(self):
"""Test parameter edge cases: tau extremes, plateau grace edges, slope zero."""
- from reward_space_analysis import compute_exit_factor
base_factor = 50.0
pnl = 0.02
params_lo = self.DEFAULT_PARAMS.copy()
params_lo.update({"exit_attenuation_mode": "power", "exit_power_tau": 1e-6})
r = 1.5
- hi_val = compute_exit_factor(base_factor, pnl, pnl_factor, r, params_hi)
- lo_val = compute_exit_factor(base_factor, pnl, pnl_factor, r, params_lo)
+ hi_val = _get_exit_factor(base_factor, pnl, pnl_factor, r, params_hi)
+ lo_val = _get_exit_factor(base_factor, pnl, pnl_factor, r, params_lo)
self.assertGreater(
hi_val,
lo_val,
"exit_linear_slope": 1.0,
}
)
- val_g0 = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g0)
- val_g1 = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g1)
+ val_g0 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g0)
+ val_g1 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g1)
# With grace=1.0 no attenuation up to 1.0 ratio → value should be higher
self.assertGreater(
val_g1,
"exit_plateau": False,
}
)
- val_lin0 = compute_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin0)
- val_lin1 = compute_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin1)
+ val_lin0 = _get_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin0)
+ val_lin1 = _get_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin1)
self.assertGreater(
val_lin0,
val_lin1,
def test_plateau_linear_slope_zero_constant_after_grace(self):
"""Plateau+linear slope=0 should yield flat factor after grace boundary (no attenuation)."""
- from reward_space_analysis import compute_exit_factor
params = self.DEFAULT_PARAMS.copy()
params.update(
"exit_linear_slope": 0.0,
}
)
- base_factor = 100.0
+ base_factor = TEST_BASE_FACTOR
pnl = 0.04
pnl_factor = 1.2
ratios = [0.3, 0.6, 1.0, 1.4]
values = [
- compute_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios
+ _get_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios
]
# All factors should be (approximately) identical after grace (no attenuation)
first = values[0]
def test_plateau_grace_extends_beyond_one(self):
"""Plateau grace >1.0 should keep full strength (no attenuation) past duration_ratio=1."""
- from reward_space_analysis import compute_exit_factor
params = self.DEFAULT_PARAMS.copy()
params.update(
# Ratios straddling 1.0 but below grace=1.5 plus one beyond grace
ratios = [0.8, 1.0, 1.2, 1.4, 1.6]
vals = [
- compute_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios
+ _get_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios
]
# All ratios <=1.5 should yield identical factor
ref = vals[0]
def test_legacy_step_non_monotonic(self):
"""Legacy mode applies step change at duration_ratio=1 (should not be monotonic)."""
- from reward_space_analysis import compute_exit_factor
params = self.DEFAULT_PARAMS.copy()
params["exit_attenuation_mode"] = "legacy"
params["exit_plateau"] = False
- base_factor = 100.0
+ base_factor = TEST_BASE_FACTOR
pnl = 0.02
pnl_factor = 1.0
# ratio below 1 vs above 1
- below = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params)
- above = compute_exit_factor(base_factor, pnl, pnl_factor, 1.5, params)
+ below = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params)
+ above = _get_exit_factor(base_factor, pnl, pnl_factor, 1.5, params)
# Legacy multiplies by 1.5 then 0.5 -> below should be > above * 2 (since (1.5)/(0.5)=3)
self.assertGreater(
below, above, "Legacy pre-threshold factor should exceed post-threshold"
def test_exit_factor_non_negative_with_positive_pnl(self):
"""Exit factor must not be negative when pnl >= 0 (invariant clamp)."""
- from reward_space_analysis import compute_exit_factor
params = self.DEFAULT_PARAMS.copy()
# Try multiple modes / extreme params
modes = ["linear", "power", "half_life", "sqrt", "legacy", "linear_plateau"]
- base_factor = 100.0
+ base_factor = TEST_BASE_FACTOR
pnl = 0.05
pnl_factor = 2.0 # amplified
for mode in modes:
params_mode["exit_plateau_grace"] = 0.4
else:
params_mode["exit_attenuation_mode"] = mode
- val = compute_exit_factor(base_factor, pnl, pnl_factor, 2.0, params_mode)
+ val = _get_exit_factor(base_factor, pnl, pnl_factor, 2.0, params_mode)
self.assertGreaterEqual(
val,
0.0,
"""Tests for validate_reward_parameters adjustments and reasons."""
def test_validate_reward_parameters_adjustments(self):
- from reward_space_analysis import validate_reward_parameters
-
raw = self.DEFAULT_PARAMS.copy()
# Introduce out-of-bound values
raw["idle_penalty_scale"] = -5.0 # < min 0
br_a = calculate_reward(
ctx_a,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
- risk_reward_ratio=1.0,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
br_b = calculate_reward(
ctx_b,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
- risk_reward_ratio=1.0,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
br_h1 = calculate_reward(
ctx_h1,
params,
- base_factor=base_factor,
+ base_factor=TEST_BASE_FACTOR,
profit_target=profit_target,
- risk_reward_ratio=1.0,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
def test_exit_factor_threshold_warning_emission(self):
"""Ensure a RuntimeWarning is emitted when exit_factor exceeds threshold (no capping)."""
- import warnings as _warnings
params = self.DEFAULT_PARAMS.copy()
params["exit_factor_threshold"] = 10.0 # low threshold to trigger easily
# Remove base_factor to allow argument override
params.pop("base_factor", None)
- from reward_space_analysis import Actions, Positions, RewardContext
context = RewardContext(
pnl=0.06,
action=Actions.Long_exit,
force_action=None,
)
- with _warnings.catch_warnings(record=True) as w:
- _warnings.simplefilter("always")
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
br = calculate_reward(
context,
params,
base_factor=5000.0, # large enough to exceed threshold
profit_target=0.03,
- risk_reward_ratio=2.0,
+ risk_reward_ratio=TEST_RR_HIGH,
short_allowed=True,
action_masking=True,
)
"Warning message should indicate threshold exceedance",
)
- def test_public_wrapper_compute_exit_factor(self):
- """Basic sanity check of newly exposed compute_exit_factor wrapper."""
- from reward_space_analysis import compute_exit_factor
+ def test_public_wrapper__get_exit_factor(self):
+ """Basic sanity check of newly exposed _get_exit_factor wrapper."""
params = self.DEFAULT_PARAMS.copy()
params["exit_attenuation_mode"] = "sqrt"
params["exit_plateau"] = False
- f1 = compute_exit_factor(100.0, 0.02, 1.0, 0.0, params)
- f2 = compute_exit_factor(100.0, 0.02, 1.0, 1.0, params)
+ f1 = _get_exit_factor(TEST_BASE_FACTOR, 0.02, 1.0, 0.0, params)
+ f2 = _get_exit_factor(TEST_BASE_FACTOR, 0.02, 1.0, 1.0, params)
self.assertGreater(
f1, f2, "Attenuation should reduce factor at higher duration ratio"
)
"""Continuity tests for plateau-enabled exit attenuation (excluding legacy)."""
def test_plateau_continuity_at_grace_boundary(self):
- import math
-
- from reward_space_analysis import compute_exit_factor
-
modes = ["sqrt", "linear", "power", "half_life"]
grace = 0.8
eps = 1e-4
}
)
- left = compute_exit_factor(
+ left = _get_exit_factor(
base_factor, pnl, pnl_factor, grace - eps, params
)
- boundary = compute_exit_factor(
- base_factor, pnl, pnl_factor, grace, params
- )
- right = compute_exit_factor(
+ boundary = _get_exit_factor(base_factor, pnl, pnl_factor, grace, params)
+ right = _get_exit_factor(
base_factor, pnl, pnl_factor, grace + eps, params
)
def test_plateau_continuity_multiple_eps_scaling(self):
"""Verify attenuation difference scales approximately linearly with epsilon (first-order continuity heuristic)."""
- from reward_space_analysis import compute_exit_factor
mode = "linear"
grace = 0.6
"exit_linear_slope": 1.1,
}
)
- f_boundary = compute_exit_factor(base_factor, pnl, 1.0, grace, params)
- f1 = compute_exit_factor(base_factor, pnl, 1.0, grace + eps1, params)
- f2 = compute_exit_factor(base_factor, pnl, 1.0, grace + eps2, params)
+ f_boundary = _get_exit_factor(base_factor, pnl, 1.0, grace, params)
+ f1 = _get_exit_factor(base_factor, pnl, 1.0, grace + eps1, params)
+ f2 = _get_exit_factor(base_factor, pnl, 1.0, grace + eps2, params)
diff1 = f_boundary - f1
diff2 = f_boundary - f2