#!/usr/bin/env python3
"""Synthetic reward space analysis utilities for ReforceXY.
-Features:
-* Sample generation + reward computation (incl. PBRS).
-* Statistical tests, bootstrap CIs, distribution & shift metrics.
-* Feature importance + optional partial dependence.
-* CLI producing report + manifest (hashed parameters for reproducibility).
+Implements sample generation, reward computation with PBRS, statistical analysis,
+and feature importance calculation with reproducible parameter hashing.
"""
from __future__ import annotations
# Default discount factor γ for potential-based reward shaping
POTENTIAL_GAMMA_DEFAULT: float = 0.95
-# Attenuation mode sets (centralized for tests and validation)
+# Supported attenuation modes
ATTENUATION_MODES: Tuple[str, ...] = ("sqrt", "linear", "power", "half_life")
ATTENUATION_MODES_WITH_LEGACY: Tuple[str, ...] = ATTENUATION_MODES + ("legacy",)
-# Centralized internal numeric guards & behavior toggles
+# Internal numeric guards and behavior toggles
INTERNAL_GUARDS: dict[str, float] = {
"degenerate_ci_epsilon": 1e-9,
"distribution_constant_fallback_moment": 0.0,
raise ValueError("Unsupported trading mode. Expected one of: spot, margin, futures")
-# Internal safe fallback helper for numeric failures (centralizes semantics)
def _fail_safely(reason: str) -> float:
- """Return 0.0 on recoverable numeric failure (reason available for future debug hooks)."""
- # Silent fallback; hook logging if diagnostic visibility required.
+ """Return 0.0 on recoverable numeric failure."""
_ = reason
return 0.0
params: RewardParams,
) -> float:
"""Exit factor (kernel + optional plateau) * pnl_factor with invariants."""
- # Basic finiteness checks
if not np.isfinite(base_factor) or not np.isfinite(pnl) or not np.isfinite(duration_ratio):
return _fail_safely("non_finite_exit_factor_inputs")
- # Guard: duration ratio should never be negative
if duration_ratio < 0.0:
duration_ratio = 0.0
rf_n_jobs: int = 1,
perm_n_jobs: int = 1,
) -> Tuple[pd.DataFrame, Dict[str, Any], Dict[str, pd.DataFrame], Optional[RandomForestRegressor]]:
- """Run RandomForest-based feature analysis defensively.
+ """Compute feature importances using RandomForestRegressor.
- Purpose
- -------
- Provide permutation feature importances and optional partial dependence plots for the
- synthetic reward space while remaining robust to incomplete or degenerate data.
-
- Inputs
- ------
+ Parameters
+ ----------
df : pd.DataFrame
- Sample frame containing canonical reward + feature columns (subset acceptable).
+ Sample data with 'reward' column and feature columns.
seed : int
- Random seed used for train/test split and model reproducibility.
+ Random seed for reproducibility.
skip_partial_dependence : bool, default False
- If True, skip partial dependence computation entirely (faster runs).
+ Skip partial dependence computation.
rf_n_jobs : int, default 1
- Parallel jobs for the RandomForestRegressor.
+ Parallel jobs for RandomForestRegressor.
perm_n_jobs : int, default 1
Parallel jobs for permutation_importance.
- Behavior & Guarantees
- ---------------------
- - Dynamically selects available features from canonical list.
- - Gracefully handles: empty frame, missing reward column, <2 usable features, any NaNs.
- - Casts integer duration columns to float only if present (avoid unintended coercions).
- - Drops wholly NaN or constant columns (reported via ``dropped_features``).
- - All sklearn operations guarded by try/except; failures yield NaN importances.
- - ``skip_partial_dependence`` returns an empty ``partial_deps`` dict without computing PD.
- - Sets ``model_fitted`` flag (False for all stub paths and fitting failures).
- - Raises ImportError early if scikit-learn components are unavailable (fast-fail semantics).
-
Returns
-------
importance_df : pd.DataFrame
- Columns: ``feature``, ``importance_mean``, ``importance_std`` (NaNs on failure paths).
+ Feature importances with columns: feature, importance_mean, importance_std.
analysis_stats : Dict[str, Any]
- Keys:
- ``r2_score`` : float (NaN if model not fitted)
- ``n_features`` : int (usable feature count after drops)
- ``n_samples_train`` : int (0 on stub paths)
- ``n_samples_test`` : int (0 on stub paths)
- ``top_feature`` : Optional[str] (None or first usable feature)
- ``top_importance`` : float (NaN on failure paths)
- ``dropped_features`` : list[str] (NaN/constant removed columns)
- ``model_fitted`` : bool (True only if RF fit succeeded)
+ Model performance metrics and feature counts.
partial_deps : Dict[str, pd.DataFrame]
- Mapping feature -> DataFrame with columns ``<feature>`` and ``partial_dependence``;
- empty when skipped or failures occur.
+ Partial dependence plots per feature (empty if skipped).
model : Optional[RandomForestRegressor]
- Fitted model instance when successful; ``None`` otherwise.
-
- Failure Modes
- -------------
- Returns stub outputs (NaN importances, ``model_fitted`` False, empty ``partial_deps``) for:
- - Missing ``reward`` column
- - No rows
- - Zero available canonical features
- - <2 usable (post-drop) features
- - Any NaNs after preprocessing
- - Train/test split failures
- - Model fitting failures
- - Permutation importance failures (partial dependence may still be attempted)
-
- Notes
- -----
- Optimized for interpretability and robustness rather than raw model performance. The
- n_estimators choice balances stability of importance estimates with runtime.
+ Fitted model or None on failure.
"""
if (
RandomForestRegressor is None
def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFrame:
- """Load serialized episodes / transitions into a normalized DataFrame.
-
- Accepted payload topologies (pickled):
- - DataFrame directly.
- - Dict with key ``'transitions'`` whose value is:
- * a DataFrame OR
- * an iterable of transition dicts.
- - List of episode dicts where each dict with key ``'transitions'`` provides:
- * a DataFrame OR
- * an iterable of transition dicts.
- - Fallback: any object convertible to ``pd.DataFrame`` (best‑effort).
-
- Normalization steps:
- 1. Flatten nested episode structures.
- 2. Coerce numeric columns (expected + optional) with safe NaN coercion tracking.
- 3. Ensure required columns exist (raise or fill with NaN depending on ``enforce_columns``).
- 4. Add missing optional numeric columns (set to NaN) to obtain a stable schema.
- 5. (Light) Deduplication: drop exact duplicate transition rows if any.
-
- Security NOTE:
- Pickle loading executes arbitrary code if the file is malicious. Only load
- trusted artifacts. This helper assumes the calling context enforces trust.
+ """Load serialized episodes into normalized DataFrame.
Parameters
----------
path : Path
Pickle file path.
enforce_columns : bool, default True
- If True, missing required columns cause a ValueError; else they are created
- and filled with NaN and a warning is emitted.
+ Require all expected columns (raise on missing) or fill with NaN.
Returns
-------
pd.DataFrame
- Normalized transition frame including (at minimum) required columns.
-
- Raises
- ------
- ValueError
- On unpickle failure, structure mismatch (when ``enforce_columns`` is True),
- or irrecoverable conversion errors.
+ Normalized transitions with required columns.
"""
try:
) -> Dict[str, float]:
"""Compute distribution shift metrics between synthetic and real samples.
- Metrics:
- - KL divergence (Kullback-Leibler, direction: synthetic || real) using discretized histograms.
- - JS distance (square root of Jensen-Shannon divergence; bounded in [0,1]).
- - 1D Wasserstein (Earth Mover's distance) per feature.
- - Kolmogorov-Smirnov statistic & p-value.
-
- Notes
- -----
- - Features with fewer than 10 non-NaN observations in either dataset are skipped.
- - Degenerate (constant) distributions short-circuit to zero shift metrics with KS p=1.
- - Histogram bin count is fixed (50) for comparability; adaptive binning could be explored.
+ Returns KL divergence, JS distance, Wasserstein distance, and KS test
+ results for continuous features (pnl, trade_duration, idle_duration).
"""
metrics = {}
continuous_features = ["pnl", "trade_duration", "idle_duration"]
*,
strict_diagnostics: bool = False,
) -> Dict[str, Tuple[float, float, float]]:
- """Bootstrap mean CIs (percentile) per metric; skips sparse; adjusts degenerate unless strict."""
+ """Compute bootstrap confidence intervals for metric means.
+
+ Returns percentile-based CIs, skipping metrics with <10 samples.
+ """
alpha = 1 - confidence_level
lower_percentile = 100 * alpha / 2
upper_percentile = 100 * (1 - alpha / 2)