Neutral = 0.5
+# Tolerance for PBRS invariance classification (canonical if |Σ shaping| < PBRS_INVARIANCE_TOL)
+PBRS_INVARIANCE_TOL: float = 1e-6
+# Default discount factor γ for potential-based reward shaping
+POTENTIAL_GAMMA_DEFAULT: float = 0.95
+
+# Attenuation mode sets (centralized for tests and validation)
+ATTENUATION_MODES: Tuple[str, ...] = ("sqrt", "linear", "power", "half_life")
+ATTENUATION_MODES_WITH_LEGACY: Tuple[str, ...] = ATTENUATION_MODES + ("legacy",)
+
+# Centralized internal numeric guards & behavior toggles (single source of truth for internal tunables)
+INTERNAL_GUARDS: dict[str, float] = {
+ "degenerate_ci_epsilon": 1e-9,
+ "distribution_constant_fallback_moment": 0.0,
+ "distribution_constant_fallback_qq_r2": 1.0,
+ "moment_extreme_threshold": 1e4,
+ "bootstrap_min_recommended": 200,
+}
+
+
+class RewardDiagnosticsWarning(RuntimeWarning):
+ """Warning category for reward space diagnostic graceful fallbacks.
+
+ Enables selective filtering in tests or CLI harness while leaving other
+ runtime warnings unaffected.
+ """
+
+ pass
+
+
def _to_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
# Internal safe fallback helper for numeric failures (centralizes semantics)
def _fail_safely(reason: str) -> float:
"""Return 0.0 on recoverable numeric failure (reason available for future debug hooks)."""
- # NOTE: presently silent to preserve legacy behaviour; hook logging here if needed.
+ # NOTE: presently silent to preserve legacy behavior; hook logging here if needed.
_ = reason
return 0.0
-# Allowed exit attenuation modes
-ALLOWED_EXIT_MODES = {"legacy", "sqrt", "linear", "power", "half_life"}
-
# PBRS constants
ALLOWED_TRANSFORMS = {
"tanh",
# === PBRS PARAMETERS ===
# Potential-based reward shaping core parameters
# Discount factor γ for potential term (0 ≤ γ ≤ 1)
- "potential_gamma": 0.95,
+ "potential_gamma": POTENTIAL_GAMMA_DEFAULT,
"potential_softsign_sharpness": 1.0,
# Exit potential modes: canonical | progressive_release | spike_cancel | retain_previous
"exit_potential_mode": "canonical",
def _normalize_and_validate_mode(params: RewardParams) -> None:
"""Align normalization of ``exit_attenuation_mode`` with ReforceXY environment.
- Behaviour (mirrors in-env logic):
+ Behavior (mirrors in-env logic):
- Do not force lowercase or strip user formatting; use the value as provided.
- Supported modes (case-sensitive): {legacy, sqrt, linear, power, half_life}.
- If the value is not among supported keys, silently fallback to 'linear'
return
exit_attenuation_mode = _get_str_param(params, "exit_attenuation_mode", "linear")
- if exit_attenuation_mode not in ALLOWED_EXIT_MODES:
+ if exit_attenuation_mode not in ATTENUATION_MODES_WITH_LEGACY:
params["exit_attenuation_mode"] = "linear"
parser.add_argument(
f"--{key}",
type=str,
- choices=sorted(ALLOWED_EXIT_MODES),
+ choices=sorted(ATTENUATION_MODES_WITH_LEGACY),
default=None,
help=help_text,
)
duration_ratio: float,
params: RewardParams,
) -> float:
- """Compute exit factor = time attenuation kernel (with optional plateau) * ``pnl_factor``.
+ """Compute exit factor controlling time attenuation of exit reward.
+
+ Purpose
+ -------
+ Produces a multiplicative factor applied to raw PnL at exit:
+ exit_reward = pnl * exit_factor
+ where:
+ exit_factor = time_kernel(base_factor, effective_duration_ratio) * pnl_factor
- Parity: mirrors the live environment's logic (``ReforceXY._get_exit_factor``).
+ Parity
+ ------
+ Mirrors environment method ``ReforceXY._get_exit_factor`` for offline / synthetic analysis.
+
+ Algorithm
+ ---------
+ 1. Validate finiteness & clamp negative duration to 0.
+ 2. Apply optional plateau: effective_dr = 0 while duration_ratio <= grace else (dr - grace).
+ 3. Select kernel: legacy | sqrt | linear | power | half_life (all monotonic except legacy discontinuity at dr=1).
+ 4. Multiply by externally supplied ``pnl_factor`` (profit & efficiency modulation).
+ 5. Invariants & safety: non-finite -> 0; prevent negative factor on non-negative pnl; warn on large magnitude.
- Assumptions:
- - ``_normalize_and_validate_mode`` has already run (invalid modes replaced by 'linear').
- - ``exit_attenuation_mode`` is therefore either a member of ``ALLOWED_EXIT_MODES`` or 'linear'.
- - All numeric tunables are accessed through ``_get_float_param`` for safety.
+ Parameters
+ ----------
+ base_factor : float
+ Base scaling constant before temporal attenuation.
+ pnl : float
+ Realized (or unrealized at exit decision) profit/loss.
+ pnl_factor : float
+ PnL modulation factor (win amplification + efficiency) computed separately.
+ duration_ratio : float
+ trade_duration / max_trade_duration (pre-clamped upstream).
+ params : dict
+ Reward parameter mapping.
- Algorithm steps:
- 1. Finiteness & non-negative guard on inputs.
- 2. Plateau handling: effective duration ratio = 0 within grace region else (r - grace).
- 3. Kernel application (legacy|sqrt|linear|power|half_life).
- 4. Multiply by externally supplied ``pnl_factor`` (already includes profit & efficiency effects).
- 5. Invariants: ensure finiteness; clamp negative factor when pnl >= 0; emit threshold warning.
+ Returns
+ -------
+ float
+ Exit factor (>=0 unless pnl < 0 and invariants disabled).
+
+ Notes
+ -----
+ - Legacy kernel is discontinuous and maintained for backward compatibility only.
+ - Combine with ``_get_pnl_factor`` for full exit reward shaping (non-PBRS, path dependent).
+ - Plateau introduces a derivative kink at the grace boundary.
"""
# Basic finiteness checks
if (
kernel = kernels.get(exit_attenuation_mode, None)
if kernel is None:
+ warnings.warn(
+ (
+ f"Unknown exit_attenuation_mode '{exit_attenuation_mode}'; defaulting to 'linear' "
+ f"(effective_dr={effective_dr:.5f})"
+ ),
+ RuntimeWarning,
+ stacklevel=2,
+ )
kernel = _linear_kernel
try:
def _get_pnl_factor(
- params: RewardParams, context: RewardContext, profit_target: float
+ params: RewardParams,
+ context: RewardContext,
+ profit_target: float,
+ risk_reward_ratio: float,
) -> float:
- """Env-aligned PnL factor combining profit amplification and exit efficiency."""
- pnl = context.pnl
+ """Compute PnL amplification factor (profit target over/under performance + efficiency).
- if not np.isfinite(pnl) or not np.isfinite(profit_target):
- return _fail_safely("non_finite_pnl_or_target")
+ Purpose
+ -------
+ Encapsulates profit overshoot bonus and controlled loss penalty plus an efficiency tilt
+ based on intra-trade utilization of observed profit range.
- profit_target_factor = 1.0
- if profit_target > 0.0 and pnl > profit_target:
- win_reward_factor = _get_float_param(
- params,
- "win_reward_factor",
- DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0),
- )
- pnl_factor_beta = _get_float_param(
- params,
- "pnl_factor_beta",
- DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
- )
- pnl_ratio = pnl / profit_target
- profit_target_factor = 1.0 + win_reward_factor * math.tanh(
- pnl_factor_beta * (pnl_ratio - 1.0)
- )
+ Parity
+ ------
+ Mirrors environment method ``MyRLEnv._get_pnl_factor``.
+
+ Algorithm
+ ---------
+ 1. Compute pnl_ratio = pnl / profit_target (profit_target already includes RR upstream).
+ 2. If |pnl_ratio| <= 1: pnl_target_factor = 1.0.
+ 3. Else compute base = tanh(beta * (|pnl_ratio| - 1)).
+ a. Gain branch (pnl_ratio > 1): 1 + win_reward_factor * base
+ b. Loss branch (pnl_ratio < -1/rr): 1 + (win_reward_factor * rr) * base
+ 4. Efficiency: derive efficiency_ratio within [0,1] from intra-trade min/max; add linear tilt
+ around efficiency_center scaled by efficiency_weight (sign-flipped for losses).
+ 5. Return max(0, pnl_target_factor * efficiency_factor).
+
+ Parameters
+ ----------
+ params : dict
+ Reward parameter mapping.
+ context : RewardContext
+ Current PnL and intra-trade extrema.
+ profit_target : float
+ Profit objective (already RR-scaled when called from calculate_reward).
+ risk_reward_ratio : float
+ RR used to set asymmetric loss trigger threshold (pnl_ratio < -1/RR).
+
+ Returns
+ -------
+ float
+ Non-negative factor (0 if invalid inputs or degenerate target).
+
+ Notes
+ -----
+ - Symmetric tanh avoids unbounded amplification.
+ - Loss penalty magnitude scaled by RR to keep incentive structure consistent across setups.
+ - Efficiency tilt introduces path dependence (non-PBRS component).
+ """
+ pnl = context.pnl
+ if (
+ not np.isfinite(pnl)
+ or not np.isfinite(profit_target)
+ or not np.isfinite(risk_reward_ratio)
+ ):
+ return _fail_safely("non_finite_inputs_pnl_factor")
+ if profit_target <= 0.0:
+ return 0.0
+
+ win_reward_factor = _get_float_param(
+ params,
+ "win_reward_factor",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0),
+ )
+ pnl_factor_beta = _get_float_param(
+ params,
+ "pnl_factor_beta",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
+ )
+ rr = risk_reward_ratio if risk_reward_ratio > 0 else 1.0
+
+ pnl_ratio = pnl / profit_target
+ pnl_target_factor = 1.0
+ if abs(pnl_ratio) > 1.0:
+ # Environment uses tanh(beta * (abs(ratio)-1)) for magnitude beyond target.
+ base_pnl_target_factor = math.tanh(pnl_factor_beta * (abs(pnl_ratio) - 1.0))
+ if pnl_ratio > 1.0:
+ pnl_target_factor = 1.0 + win_reward_factor * base_pnl_target_factor
+ elif pnl_ratio < -(1.0 / rr):
+ loss_penalty_factor = win_reward_factor * rr
+ pnl_target_factor = 1.0 + loss_penalty_factor * base_pnl_target_factor
efficiency_factor = 1.0
efficiency_weight = _get_float_param(
efficiency_center - efficiency_ratio
)
- return max(0.0, profit_target_factor * efficiency_factor)
+ return max(0.0, pnl_target_factor * efficiency_factor)
def _is_valid_action(
def _idle_penalty(
context: RewardContext, idle_factor: float, params: RewardParams
) -> float:
- """Mirror the environment's idle penalty behaviour."""
+ """Mirror the environment's idle penalty behavior."""
idle_penalty_scale = _get_float_param(
params,
"idle_penalty_scale",
def _hold_penalty(
context: RewardContext, hold_factor: float, params: RewardParams
) -> float:
- """Mirror the environment's hold penalty behaviour."""
+ """Mirror the environment's hold penalty behavior."""
hold_penalty_scale = _get_float_param(
params,
"hold_penalty_scale",
params, "risk_reward_ratio", float(risk_reward_ratio)
)
- # Scale profit target by risk-reward ratio (reward multiplier)
- # E.g., profit_target=0.03, RR=2.0 → profit_target_final=0.06
profit_target_final = profit_target * risk_reward_ratio
idle_factor = factor * profit_target_final / 4.0
- pnl_factor = _get_pnl_factor(params, context, profit_target_final)
+ pnl_factor = _get_pnl_factor(
+ params,
+ context,
+ profit_target_final,
+ risk_reward_ratio,
+ )
hold_factor = idle_factor
# Base reward calculation (existing logic)
target: str,
bins: Iterable[float],
) -> pd.DataFrame:
+ """Compute aggregated statistics of a target variable across value bins.
+
+ Purpose
+ -------
+ Provide consistent binned descriptive statistics (count, mean, std, min, max)
+ for exploratory diagnostics of reward component relationships.
+
+ Parameters
+ ----------
+ df : pd.DataFrame
+ Source dataframe containing at minimum the ``column`` and ``target`` fields.
+ column : str
+ Name of the column whose values are used to create bins.
+ target : str
+ Column whose statistics are aggregated per bin.
+ bins : Iterable[float]
+ Monotonic sequence of bin edges (len >= 2). Values outside the range are
+ clipped to the boundary edges prior to bin assignment.
+
+ Returns
+ -------
+ pd.DataFrame
+ Dataframe indexed by stringified interval with columns:
+ ``count``, ``mean``, ``std``, ``min``, ``max``.
+
+ Notes
+ -----
+ - Duplicate bin edges are dropped via pandas ``cut(duplicates='drop')``.
+ - Non-finite or missing bins after clipping are excluded prior to grouping.
+ - This helper is deterministic and side-effect free.
+ """
bins_arr = np.asarray(list(bins), dtype=float)
if bins_arr.ndim != 1 or bins_arr.size < 2:
raise ValueError("bins must contain at least two edges")
def _compute_relationship_stats(
df: pd.DataFrame, max_trade_duration: int
) -> Dict[str, Any]:
- """Compute relationship statistics without writing to file."""
+ """Compute binned relationship statistics among core reward drivers.
+
+ Purpose
+ -------
+ Generate uniformly binned summaries for idle duration, trade duration and
+ realized PnL to facilitate downstream comparative or visual analyses.
+
+ Parameters
+ ----------
+ df : pd.DataFrame
+ Input dataframe containing ``idle_duration``, ``trade_duration``, ``pnl`` and
+ corresponding reward component columns (``reward_idle``, ``reward_hold``,
+ ``reward_exit``).
+ max_trade_duration : int
+ Maximum configured trade duration used to scale bin ranges.
+
+ Returns
+ -------
+ Dict[str, Any]
+ Dictionary with keys ``idle_stats``, ``hold_stats`` and ``exit_stats`` each
+ containing a binned statistics dataframe.
+
+ Notes
+ -----
+ - PnL bin upper bound is adjusted by a tiny epsilon when min ≈ max to avoid
+ degenerate intervals.
+ - All statistics are rounded to 6 decimal places for compactness.
+ """
idle_bins = np.linspace(0, max_trade_duration * 3.0, 13)
trade_bins = np.linspace(0, max_trade_duration * 3.0, 13)
pnl_min = float(df["pnl"].min())
"trade_duration",
"idle_duration",
]
- correlation = df[correlation_fields].corr().round(4)
+ # Drop columns that are constant (std == 0) to avoid all-NaN correlation rows
+ numeric_subset = df[correlation_fields]
+ constant_cols = [
+ c for c in numeric_subset.columns if numeric_subset[c].nunique() <= 1
+ ]
+ if constant_cols:
+ filtered = numeric_subset.drop(columns=constant_cols)
+ else:
+ filtered = numeric_subset
+ correlation = filtered.corr().round(4)
return {
"idle_stats": idle_stats,
"hold_stats": hold_stats,
"exit_stats": exit_stats,
"correlation": correlation,
+ "correlation_dropped": constant_cols,
}
def _perform_feature_analysis(
- df: pd.DataFrame, seed: int
+ df: pd.DataFrame, seed: int, *, skip_partial_dependence: bool = False
) -> Tuple[
pd.DataFrame, Dict[str, Any], Dict[str, pd.DataFrame], RandomForestRegressor
]:
X = df[feature_cols]
for col in ("trade_duration", "idle_duration"):
if col in X.columns and pd.api.types.is_integer_dtype(X[col]):
- X[col] = X[col].astype(float)
+ X.loc[:, col] = X[col].astype(float)
y = df["reward_total"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.25, random_state=seed
.reset_index(drop=True)
)
- # Compute partial dependence for key features
+ # Compute partial dependence for key features unless skipped
partial_deps = {}
- for feature in ["trade_duration", "idle_duration", "pnl"]:
- pd_result = partial_dependence(
- model,
- X_test,
- [feature],
- grid_resolution=50,
- kind="average",
- )
- value_key = "values" if "values" in pd_result else "grid_values"
- values = pd_result[value_key][0]
- averaged = pd_result["average"][0]
- partial_deps[feature] = pd.DataFrame(
- {feature: values, "partial_dependence": averaged}
- )
+ if not skip_partial_dependence:
+ for feature in ["trade_duration", "idle_duration", "pnl"]:
+ pd_result = partial_dependence(
+ model,
+ X_test,
+ [feature],
+ grid_resolution=50,
+ kind="average",
+ )
+ value_key = "values" if "values" in pd_result else "grid_values"
+ values = pd_result[value_key][0]
+ averaged = pd_result["average"][0]
+ partial_deps[feature] = pd.DataFrame(
+ {feature: values, "partial_dependence": averaged}
+ )
analysis_stats = {
"r2_score": r2,
def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFrame:
- """Load transitions from a pickle into a pandas.DataFrame.
-
- Accepted inputs: a pickled DataFrame, a list of transition dicts, a list of
- episode dicts each containing a 'transitions' iterable, or a dict with key
- 'transitions'.
+ """Load serialized episodes / transitions into a normalized DataFrame.
+
+ Accepted payload topologies (pickled):
+ - DataFrame directly.
+ - Dict with key ``'transitions'`` whose value is:
+ * a DataFrame OR
+ * an iterable of transition dicts.
+ - List of episode dicts where each dict with key ``'transitions'`` provides:
+ * a DataFrame OR
+ * an iterable of transition dicts.
+ - Fallback: any object convertible to ``pd.DataFrame`` (best‑effort).
+
+ Normalization steps:
+ 1. Flatten nested episode structures.
+ 2. Coerce numeric columns (expected + optional) with safe NaN coercion tracking.
+ 3. Ensure required columns exist (raise or fill with NaN depending on ``enforce_columns``).
+ 4. Add missing optional numeric columns (set to NaN) to obtain a stable schema.
+ 5. (Light) Deduplication: drop exact duplicate transition rows if any.
+
+ Security NOTE:
+ Pickle loading executes arbitrary code if the file is malicious. Only load
+ trusted artifacts. This helper assumes the calling context enforces trust.
Parameters
----------
- path: Path
- Path to the pickle file.
- enforce_columns: bool
- If True require required columns, else fill missing with NaN and warn.
+ path : Path
+ Pickle file path.
+ enforce_columns : bool, default True
+ If True, missing required columns cause a ValueError; else they are created
+ and filled with NaN and a warning is emitted.
+
+ Returns
+ -------
+ pd.DataFrame
+ Normalized transition frame including (at minimum) required columns.
Raises
------
ValueError
- On unpickle failure or when the payload cannot be converted to a valid
- transitions DataFrame (and enforce_columns is True).
+ On unpickle failure, structure mismatch (when ``enforce_columns`` is True),
+ or irrecoverable conversion errors.
"""
try:
"reward_total",
}
+ # Keep optional list stable and explicit
numeric_optional = {
"reward_exit",
"reward_idle",
"idle_ratio",
"max_unrealized_profit",
"min_unrealized_profit",
+ # Additive / shaping components
+ "reward_entry_additive",
+ "reward_exit_additive",
+ "current_potential",
+ "next_potential",
+ "is_invalid",
}
for col in list(numeric_expected | numeric_optional):
"action",
"reward_total",
}
- missing = required - set(df.columns)
- if missing:
+ missing_required = required - set(df.columns)
+ if missing_required:
if enforce_columns:
raise ValueError(
- f"Loaded episodes data is missing required columns: {sorted(missing)}. "
+ f"Loaded episodes data is missing required columns: {sorted(missing_required)}. "
f"Found columns: {sorted(list(df.columns))}."
)
- else:
- warnings.warn(
- f"Loaded episodes data is missing columns {sorted(missing)}; filling with NaN (enforce_columns=False)",
- RuntimeWarning,
- stacklevel=2,
- )
- for col in missing:
- df[col] = np.nan
+ warnings.warn(
+ f"Loaded episodes data is missing columns {sorted(missing_required)}; filling with NaN (enforce_columns=False)",
+ RuntimeWarning,
+ stacklevel=2,
+ )
+ for col in missing_required:
+ df[col] = np.nan
+
+ # Ensure all optional numeric columns exist for schema stability
+ for opt_col in numeric_optional:
+ if opt_col not in df.columns:
+ df[opt_col] = np.nan
+
+ # Drop exact duplicates (rare but can appear after flattening)
+ before_dupes = len(df)
+ df = df.drop_duplicates()
+ if len(df) != before_dupes:
+ warnings.warn(
+ f"Removed {before_dupes - len(df)} duplicate transition row(s) while loading '{path}'.",
+ RuntimeWarning,
+ stacklevel=2,
+ )
return df
n_bootstrap: int = 10000,
confidence_level: float = 0.95,
seed: int = 42,
+ *,
+ strict_diagnostics: bool = False,
) -> Dict[str, Tuple[float, float, float]]:
- """Bootstrap confidence intervals for key metrics."""
+ """Estimate confidence intervals for mean of selected metrics via bootstrap.
+
+ Graceful mode policy (``strict_diagnostics=False``):
+ - If the computed CI has zero or negative width (including inverted bounds),
+ it is automatically widened symmetrically around the sample mean by an
+ epsilon (``INTERNAL_GUARDS['degenerate_ci_epsilon']``) so the reporting
+ pipeline is not interrupted. A ``RewardDiagnosticsWarning`` is emitted to
+ make the adjustment transparent.
+ Strict mode policy (``strict_diagnostics=True``):
+ - The same anomalous condition triggers an immediate ``AssertionError``
+ (fail-fast) to surface upstream causes such as a constant column or a
+ prior data sanitization issue.
+ Rationale: This dual mode avoids silently masking structural problems while
+ still supporting uninterrupted exploratory / CLI smoke runs.
+
+ Purpose
+ -------
+ Provide non-parametric uncertainty estimates for the mean of reward-related
+ metrics, robust to unknown or asymmetric distributions.
+
+ Parameters
+ ----------
+ df : pd.DataFrame
+ Source dataframe containing metric columns.
+ metrics : List[str]
+ Names of numeric columns to evaluate; silently skipped if absent.
+ n_bootstrap : int, default 10000
+ Number of bootstrap resamples (with replacement).
+ confidence_level : float, default 0.95
+ Nominal coverage probability for the two-sided interval.
+ seed : int, default 42
+ Seed for local reproducible RNG (does not mutate global state).
+
+ Returns
+ -------
+ Dict[str, Tuple[float, float, float]]
+ Mapping metric -> (mean_estimate, ci_lower, ci_upper).
+
+ Notes
+ -----
+ - Metrics with < 10 non-null observations are skipped to avoid unstable CIs.
+ - Percentile method used; no bias correction or acceleration applied.
+ - Validation enforces finite, ordered, positive-width intervals.
+ """
alpha = 1 - confidence_level
lower_percentile = 100 * alpha / 2
upper_percentile = 100 * (1 - alpha / 2)
results = {}
+ # Advisory: very low bootstrap counts produce unstable CI widths
+ min_rec = int(INTERNAL_GUARDS.get("bootstrap_min_recommended", 200))
+ if n_bootstrap < min_rec:
+ warnings.warn(
+ f"n_bootstrap={n_bootstrap} < recommended minimum {min_rec}; confidence intervals may be unstable",
+ RewardDiagnosticsWarning,
+ )
+
# Local RNG to avoid mutating global NumPy RNG state
rng = np.random.default_rng(seed)
results[metric] = (point_est, ci_low, ci_high)
# Validate bootstrap confidence intervals
- _validate_bootstrap_results(results)
+ _validate_bootstrap_results(results, strict_diagnostics=strict_diagnostics)
return results
-def _validate_bootstrap_results(results: Dict[str, Tuple[float, float, float]]) -> None:
- """Validate mathematical properties of bootstrap confidence intervals."""
+def _validate_bootstrap_results(
+ results: Dict[str, Tuple[float, float, float]], *, strict_diagnostics: bool
+) -> None:
+ """Validate structural and numerical integrity of bootstrap CI outputs.
+
+ Purpose
+ -------
+ Fail fast if any generated confidence interval violates expected invariants.
+
+ Parameters
+ ----------
+ results : Dict[str, Tuple[float, float, float]]
+ Mapping from metric name to (mean, ci_low, ci_high).
+
+ Returns
+ -------
+ None
+
+ Notes
+ -----
+ - Raises AssertionError on first violation (finite bounds, ordering, width > 0).
+ - Intentionally internal; external callers should rely on exceptions for flow.
+ """
for metric, (mean, ci_low, ci_high) in results.items():
# CI bounds must be finite
if not (np.isfinite(mean) and np.isfinite(ci_low) and np.isfinite(ci_high)):
# CI width should be positive (non-degenerate)
width = ci_high - ci_low
if width <= 0:
- raise AssertionError(
- f"Bootstrap CI for {metric}: non-positive width {width:.6f}"
+ if strict_diagnostics:
+ raise AssertionError(
+ f"Bootstrap CI for {metric}: non-positive width {width:.6f}"
+ )
+ # Graceful mode: expand interval symmetrically
+ epsilon = (
+ INTERNAL_GUARDS.get("degenerate_ci_epsilon", 1e-9)
+ if width == 0
+ else abs(width) * 1e-6
+ )
+ center = mean
+ # Adjust only if current bounds are identical; otherwise enforce ordering minimally.
+ if ci_low == ci_high:
+ ci_low = center - epsilon
+ ci_high = center + epsilon
+ else:
+ # Ensure proper ordering if inverted or collapsed negatively.
+ lower = min(ci_low, ci_high) - epsilon
+ upper = max(ci_low, ci_high) + epsilon
+ ci_low, ci_high = lower, upper
+ results[metric] = (mean, ci_low, ci_high)
+ warnings.warn(
+ f"Degenerate bootstrap CI for {metric} adjusted to maintain positive width;"
+ f" original width={width:.6e}, epsilon={epsilon:.1e}",
+ RewardDiagnosticsWarning,
)
def distribution_diagnostics(
- df: pd.DataFrame, *, seed: int | None = None
+ df: pd.DataFrame, *, seed: int | None = None, strict_diagnostics: bool = False
) -> Dict[str, Any]:
- """Distribution diagnostics: normality tests, skewness, kurtosis.
+ """Compute distributional diagnostics for selected numeric columns.
+
+ Purpose
+ -------
+ Aggregate normality test statistics and moment-based shape descriptors to
+ support assessment of modelling assumptions or transformation needs.
Parameters
----------
df : pd.DataFrame
- Input samples.
+ Dataframe containing relevant numeric columns.
seed : int | None, optional
- Reserved for future stochastic diagnostic extensions; currently unused.
+ Reserved for potential future stochastic extensions; unused presently.
+
+ Returns
+ -------
+ Dict[str, Any]
+ Mapping of column name -> diagnostic results (tests, moments, p-values).
+
+ Notes
+ -----
+ - Skips columns absent from the dataframe.
+ - Applies Shapiro-Wilk for n <= 5000 else D'Agostino's K2 due to cost.
+ - All numeric outputs are floats; non-finite intermediate results are ignored.
"""
diagnostics = {}
_ = seed # placeholder to keep signature for future reproducibility extensions
diagnostics[f"{col}_mean"] = float(np.mean(data))
diagnostics[f"{col}_std"] = float(np.std(data, ddof=1))
- diagnostics[f"{col}_skewness"] = float(stats.skew(data))
- diagnostics[f"{col}_kurtosis"] = float(stats.kurtosis(data, fisher=True))
+ skew_v = float(stats.skew(data))
+ kurt_v = float(stats.kurtosis(data, fisher=True))
+ diagnostics[f"{col}_skewness"] = skew_v
+ diagnostics[f"{col}_kurtosis"] = kurt_v
+ thr = INTERNAL_GUARDS.get("moment_extreme_threshold", 1e4)
+ if abs(skew_v) > thr or abs(kurt_v) > thr:
+ msg = f"Extreme moment(s) for {col}: skew={skew_v:.3e}, kurtosis={kurt_v:.3e} exceeds threshold {thr}."
+ if strict_diagnostics:
+ raise AssertionError(msg)
+ warnings.warn(msg, RewardDiagnosticsWarning)
if len(data) < 5000:
sw_stat, sw_pval = stats.shapiro(data)
(_osm, _osr), (_slope, _intercept, r) = probplot(data, dist="norm", plot=None)
diagnostics[f"{col}_qq_r_squared"] = float(r**2)
- _validate_distribution_diagnostics(diagnostics)
+ _validate_distribution_diagnostics(
+ diagnostics, strict_diagnostics=strict_diagnostics
+ )
return diagnostics
-def _validate_distribution_diagnostics(diag: Dict[str, Any]) -> None:
+def _validate_distribution_diagnostics(
+ diag: Dict[str, Any], *, strict_diagnostics: bool
+) -> None:
"""Validate mathematical properties of distribution diagnostics.
Ensures all reported statistics are finite and within theoretical bounds where applicable.
Invoked automatically inside distribution_diagnostics(); raising AssertionError on violation
enforces fail-fast semantics consistent with other validation helpers.
"""
- for key, value in diag.items():
+ # Pre-compute zero-variance flags to allow graceful handling of undefined higher moments.
+ zero_var_columns = set()
+ for k, v in diag.items():
+ if k.endswith("_std") and (not np.isfinite(v) or v == 0):
+ prefix = k[: -len("_std")]
+ zero_var_columns.add(prefix)
+
+ for key, value in list(diag.items()):
if any(suffix in key for suffix in ["_mean", "_std", "_skewness", "_kurtosis"]):
if not np.isfinite(value):
- raise AssertionError(
- f"Distribution diagnostic {key} is not finite: {value}"
+ # Graceful degradation for constant distributions: skewness/kurtosis become NaN.
+ constant_problem = any(
+ key.startswith(prefix)
+ and (key.endswith("_skewness") or key.endswith("_kurtosis"))
+ for prefix in zero_var_columns
)
+ if constant_problem and not strict_diagnostics:
+ fallback = INTERNAL_GUARDS.get(
+ "distribution_constant_fallback_moment", 0.0
+ )
+ diag[key] = fallback
+ warnings.warn(
+ f"Replaced undefined {key} (constant distribution) with {fallback}",
+ RewardDiagnosticsWarning,
+ )
+ else:
+ raise AssertionError(
+ f"Distribution diagnostic {key} is not finite: {value}"
+ )
if key.endswith("_shapiro_pval"):
if not (0 <= value <= 1):
raise AssertionError(
)
if key.endswith("_anderson_stat") or key.endswith("_anderson_critical_5pct"):
if not np.isfinite(value):
+ prefix = key.rsplit("_", 2)[0]
+ if prefix in zero_var_columns and not strict_diagnostics:
+ fallback = INTERNAL_GUARDS.get(
+ "distribution_constant_fallback_moment", 0.0
+ )
+ diag[key] = fallback
+ warnings.warn(
+ f"Replaced undefined Anderson diagnostic {key} (constant distribution) with {fallback}",
+ RewardDiagnosticsWarning,
+ )
+ continue
raise AssertionError(
f"Anderson statistic {key} must be finite, got {value}"
)
if key.endswith("_qq_r_squared"):
- if not (0 <= value <= 1):
- raise AssertionError(f"Q-Q R^2 {key} must be in [0,1], got {value}")
+ if not (
+ isinstance(value, (int, float))
+ and np.isfinite(value)
+ and 0 <= value <= 1
+ ):
+ prefix = key[: -len("_qq_r_squared")]
+ if prefix in zero_var_columns and not strict_diagnostics:
+ fallback_r2 = INTERNAL_GUARDS.get(
+ "distribution_constant_fallback_qq_r2", 1.0
+ )
+ diag[key] = fallback_r2
+ warnings.warn(
+ f"Replaced undefined Q-Q R^2 {key} (constant distribution) with {fallback_r2}",
+ RewardDiagnosticsWarning,
+ )
+ else:
+ raise AssertionError(f"Q-Q R^2 {key} must be in [0,1], got {value}")
def build_argument_parser() -> argparse.ArgumentParser:
default=42,
help="Random seed for reproducibility (default: 42).",
)
+ parser.add_argument(
+ "--skip_partial_dependence",
+ action="store_true",
+ help="Skip partial dependence computation to speed up analysis.",
+ )
parser.add_argument(
"--stats_seed",
type=int,
default="none",
help="Multiple testing correction method for hypothesis tests (default: none).",
)
+ parser.add_argument(
+ "--strict_diagnostics",
+ action="store_true",
+ help=(
+ "Enable fail-fast mode for statistical diagnostics: raise on zero-width bootstrap CIs or undefined "
+ "skewness/kurtosis/Anderson/Q-Q metrics produced by constant distributions instead of applying graceful replacements."
+ ),
+ )
+ parser.add_argument(
+ "--bootstrap_resamples",
+ type=int,
+ default=10000,
+ metavar="N",
+ help=(
+ "Number of bootstrap resamples for confidence intervals (default: 10000). "
+ "Lower this (e.g. 200-1000) for faster smoke tests; increase for more stable CI width estimates."
+ ),
+ )
return parser
*,
adjust_method: str = "none",
stats_seed: Optional[int] = None,
+ strict_diagnostics: bool = False,
+ bootstrap_resamples: int = 10000,
+ skip_partial_dependence: bool = False,
) -> None:
"""Generate a single comprehensive statistical analysis report with enhanced tests."""
output_dir.mkdir(parents=True, exist_ok=True)
# Model analysis
importance_df, analysis_stats, partial_deps, _model = _perform_feature_analysis(
- df, seed
+ df, seed, skip_partial_dependence=skip_partial_dependence
)
# Save feature importance CSV
importance_df.to_csv(output_dir / "feature_importance.csv", index=False)
# Save partial dependence CSVs
- for feature, pd_df in partial_deps.items():
- pd_df.to_csv(
- output_dir / f"partial_dependence_{feature}.csv",
- index=False,
- )
+ if not skip_partial_dependence:
+ for feature, pd_df in partial_deps.items():
+ pd_df.to_csv(
+ output_dir / f"partial_dependence_{feature}.csv",
+ index=False,
+ )
- # Enhanced statistics (always computed)
+ # Enhanced statistics
test_seed = (
stats_seed
if isinstance(stats_seed, int)
"pnl",
]
bootstrap_ci = bootstrap_confidence_intervals(
- df, metrics_for_ci, n_bootstrap=10000, seed=test_seed
+ df,
+ metrics_for_ci,
+ n_bootstrap=int(bootstrap_resamples),
+ seed=test_seed,
+ strict_diagnostics=strict_diagnostics,
+ )
+ dist_diagnostics = distribution_diagnostics(
+ df, seed=test_seed, strict_diagnostics=strict_diagnostics
)
- dist_diagnostics = distribution_diagnostics(df, seed=test_seed)
distribution_shift = None
if real_df is not None:
f.write(f"| Total Samples | {len(df):,} |\n")
f.write(f"| Random Seed | {seed} |\n")
f.write(f"| Max Trade Duration | {max_trade_duration} |\n")
- f.write(f"| Profit Target (effective) | {profit_target:.4f} |\n\n")
+ # Blank separator to visually group core simulation vs PBRS parameters
+ f.write("| | |\n")
+ # Extra core PBRS parameters exposed in run configuration if present
+ _rp = (
+ df.attrs.get("reward_params")
+ if isinstance(df.attrs.get("reward_params"), dict)
+ else {}
+ )
+ exit_mode = _rp.get("exit_potential_mode", "canonical")
+ potential_gamma = _rp.get(
+ "potential_gamma",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get(
+ "potential_gamma", POTENTIAL_GAMMA_DEFAULT
+ ),
+ )
+ f.write(f"| exit_potential_mode | {exit_mode} |\n")
+ f.write(f"| potential_gamma | {potential_gamma} |\n")
+ # Blank separator before overrides block
+ f.write("| | |\n")
+
+ overrides_pairs = []
+ if _rp:
+ for k, default_v in DEFAULT_MODEL_REWARD_PARAMETERS.items():
+ if k in ("exit_potential_mode", "potential_gamma"):
+ continue # already printed explicitly
+ try:
+ if k in _rp and _rp[k] != default_v:
+ overrides_pairs.append(f"{k}={_rp[k]}")
+ except Exception:
+ continue
+ if overrides_pairs:
+ f.write(f"| Overrides | {', '.join(sorted(overrides_pairs))} |\n")
+ else:
+ f.write("| Overrides | (none) |\n")
+ f.write("\n")
f.write("---\n\n")
f.write("### 1.2 Reward Statistics by Action\n\n")
action_df = summary_stats["action_summary"].copy()
+ # Cast index to int for cleaner display (0,1,2,3,4)
+ action_df.index = action_df.index.astype(int)
if action_df.index.name is None:
action_df.index.name = "action"
f.write(_df_to_md(action_df, index_name=action_df.index.name, ndigits=6))
f.write("### 1.3 Component Activation Rates\n\n")
f.write("Percentage of samples where each reward component is non-zero:\n\n")
- f.write(
- _series_to_md(
- summary_stats["component_share"],
- value_name="activation_rate",
- ndigits=6,
- )
- )
+ comp_share = summary_stats["component_share"].copy()
+ formatted_rows = [
+ "| Component | Activation Rate |",
+ "|-----------|----------------|",
+ ]
+ # Enforce deterministic logical ordering of components if present
+ preferred_order = [
+ "reward_invalid",
+ "reward_idle",
+ "reward_hold",
+ "reward_exit",
+ "reward_shaping",
+ "reward_entry_additive",
+ "reward_exit_additive",
+ ]
+ for comp in preferred_order:
+ if comp in comp_share.index:
+ val = comp_share.loc[comp]
+ formatted_rows.append(f"| {comp} | {val:.1%} |")
+ f.write("\n".join(formatted_rows) + "\n\n")
f.write("### 1.4 Component Value Ranges\n\n")
bounds_df = summary_stats["component_bounds"].copy()
)
f.write("\n")
- f.write("### 2.4 Component Activation Coverage\n\n")
+ f.write("### 2.4 Component Activation Rates\n\n")
f.write("| Component | Activation Rate |\n")
f.write("|-----------|----------------|\n")
f.write(f"| Idle penalty | {representativity_stats['idle_activated']:.1%} |\n")
if corr_df.index.name is None:
corr_df.index.name = "feature"
f.write(_df_to_md(corr_df, index_name=corr_df.index.name, ndigits=4))
+ _dropped = relationship_stats.get("correlation_dropped") or []
+ if _dropped:
+ f.write(
+ "\n_Constant features removed (no variance): "
+ + ", ".join(_dropped)
+ + "._\n\n"
+ )
# Section 3.5: PBRS Analysis
f.write("### 3.5 PBRS (Potential-Based Reward Shaping) Analysis\n\n")
# PBRS invariance check (canonical mode)
total_shaping = df["reward_shaping"].sum()
- if abs(total_shaping) < 1e-6:
- f.write(
- "✅ **PBRS Invariance:** Total shaping reward ≈ 0 (canonical mode preserved)\n\n"
- )
+ entry_add_total = df.get("reward_entry_additive", pd.Series([0])).sum()
+ exit_add_total = df.get("reward_exit_additive", pd.Series([0])).sum()
+ # Prepare invariance summary markdown block
+ if abs(total_shaping) < PBRS_INVARIANCE_TOL:
+ invariance_status = "✅ Canonical"
+ invariance_note = "Total shaping ≈ 0 (canonical invariance)"
else:
- f.write(
- f"❌ **PBRS Invariance:** Total shaping reward = {total_shaping:.6f} (non-canonical behavior)\n\n"
- )
+ invariance_status = "❌ Non-canonical"
+ invariance_note = f"Total shaping = {total_shaping:.6f} (non-zero)"
+
+ # Summarize PBRS invariance (added explicit section)
+ f.write("**PBRS Invariance Summary:**\n\n")
+ f.write("| Field | Value |\n")
+ f.write("|-------|-------|\n")
+ f.write(f"| Invariance | {invariance_status} |\n")
+ f.write(f"| Note | {invariance_note} |\n")
+ f.write(f"| Σ Shaping Reward | {total_shaping:.6f} |\n")
+ f.write(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n")
+ f.write(f"| Σ Entry Additive | {entry_add_total:.6f} |\n")
+ f.write(f"| Σ Exit Additive | {exit_add_total:.6f} |\n\n")
else:
f.write("_PBRS components not present in this analysis._\n\n")
f.write(header + sep + "\n".join(rows) + "\n\n")
f.write("**Exported Data:**\n")
f.write("- Full feature importance: `feature_importance.csv`\n")
- f.write("- Partial dependence plots: `partial_dependence_*.csv`\n\n")
+ if not skip_partial_dependence:
+ f.write("- Partial dependence plots: `partial_dependence_*.csv`\n\n")
+ else:
+ f.write(
+ "- Partial dependence plots: (skipped via --skip_partial_dependence)\n\n"
+ )
# Section 5: Statistical Validation
if hypothesis_tests:
f.write(
"| KS p-value | > 0.05 | ✅ Yes: No significant difference |\n\n"
)
+ else:
+ # Placeholder keeps numbering stable and explicit
+ f.write("### 5.4 Distribution Shift Analysis\n\n")
+ f.write("_Not performed (no real episodes provided)._\n\n")
# Footer
f.write("---\n\n")
)
if distribution_shift:
f.write("6. **Distribution Shift** - Comparison with real trading data\n")
+ else:
+ f.write(
+ "6. **Distribution Shift** - Not performed (no real episodes provided)\n"
+ )
+ if "reward_shaping" in df.columns:
+ _total_shaping = df["reward_shaping"].sum()
+ _canonical = abs(_total_shaping) < 1e-6
+ f.write(
+ "7. **PBRS Invariance** - "
+ + (
+ "Canonical (Σ shaping ≈ 0)"
+ if _canonical
+ else f"Non-canonical (Σ shaping = {_total_shaping:.6f})"
+ )
+ + "\n"
+ )
f.write("\n")
f.write("**Generated Files:**\n")
f.write("- `reward_samples.csv` - Raw synthetic samples\n")
pnl_base_std=args.pnl_base_std,
pnl_duration_vol_scale=args.pnl_duration_vol_scale,
)
+ # Post-simulation critical NaN validation (non-PBRS structural columns)
+ critical_cols = [
+ "pnl",
+ "trade_duration",
+ "idle_duration",
+ "position",
+ "action",
+ "reward_total",
+ "reward_invalid",
+ "reward_idle",
+ "reward_hold",
+ "reward_exit",
+ ]
+ nan_issues = {
+ c: int(df[c].isna().sum())
+ for c in critical_cols
+ if c in df.columns and df[c].isna().any()
+ }
+ if nan_issues:
+ raise AssertionError(
+ "NaN values detected in critical simulated columns: "
+ + ", ".join(f"{k}={v}" for k, v in nan_issues.items())
+ )
# Attach simulation parameters for downstream manifest
df.attrs["simulation_params"] = {
"num_samples": args.num_samples,
"pnl_base_std": args.pnl_base_std,
"pnl_duration_vol_scale": args.pnl_duration_vol_scale,
}
+ # Attach resolved reward parameters for inline overrides rendering in report
+ df.attrs["reward_params"] = dict(params)
args.output.mkdir(parents=True, exist_ok=True)
csv_path = args.output / "reward_samples.csv"
stats_seed=args.stats_seed
if getattr(args, "stats_seed", None) is not None
else None,
+ strict_diagnostics=bool(getattr(args, "strict_diagnostics", False)),
+ bootstrap_resamples=getattr(args, "bootstrap_resamples", 10000),
+ skip_partial_dependence=bool(getattr(args, "skip_partial_dependence", False)),
)
print(
f"Complete statistical analysis saved to: {args.output / 'statistical_analysis.md'}"
# Generate manifest summarizing key metrics
try:
manifest_path = args.output / "manifest.json"
- if (args.output / "feature_importance.csv").exists():
- fi_df = pd.read_csv(args.output / "feature_importance.csv")
- top_features = fi_df.head(5)["feature"].tolist()
- else:
- top_features = []
- # Detect reward parameter overrides for traceability.
- reward_param_overrides = {}
- # Step 1: differences
- for k in DEFAULT_MODEL_REWARD_PARAMETERS:
- if k in params and params[k] != DEFAULT_MODEL_REWARD_PARAMETERS[k]:
- reward_param_overrides[k] = params[k]
- # Step 2: explicit flags
- for k in DEFAULT_MODEL_REWARD_PARAMETERS:
- if hasattr(args, k):
- v = getattr(args, k)
- if v is not None:
- # Use the resolved param value for consistency
- reward_param_overrides[k] = params.get(k, v)
-
+ resolved_reward_params = dict(params) # already validated/normalized upstream
manifest = {
"generated_at": pd.Timestamp.now().isoformat(),
"num_samples": int(len(df)),
"seed": int(args.seed),
"max_trade_duration": int(args.max_trade_duration),
"profit_target_effective": float(profit_target * risk_reward_ratio),
- "top_features": top_features,
- "reward_param_overrides": reward_param_overrides,
"pvalue_adjust_method": args.pvalue_adjust,
"parameter_adjustments": adjustments,
+ "reward_params": resolved_reward_params,
}
sim_params = df.attrs.get("simulation_params", {})
if isinstance(sim_params, dict) and sim_params:
import hashlib as _hashlib
import json as _json
+ # Compose hash source from ALL simulation params and ALL resolved reward params for full reproducibility.
_hash_source = {
**{f"sim::{k}": sim_params[k] for k in sorted(sim_params)},
**{
- f"reward::{k}": reward_param_overrides[k]
- for k in sorted(reward_param_overrides)
+ f"reward::{k}": resolved_reward_params[k]
+ for k in sorted(resolved_reward_params)
},
}
serialized = _json.dumps(_hash_source, sort_keys=True)
manifest["params_hash"] = _hashlib.sha256(
serialized.encode("utf-8")
).hexdigest()
- manifest["params"] = sim_params
+ manifest["simulation_params"] = sim_params
with manifest_path.open("w", encoding="utf-8") as mh:
import json as _json
if transform_name not in transforms:
warnings.warn(
f"Unknown potential transform '{transform_name}'; falling back to tanh",
- category=UserWarning,
+ RewardDiagnosticsWarning,
stacklevel=2,
)
return _apply_transform_tanh(value, **kwargs)
def _get_potential_gamma(params: RewardParams) -> float:
- """Return potential_gamma with fallback (missing/invalid -> 0.95 + warning)."""
- value = params.get("potential_gamma", None)
+ """Return validated potential_gamma.
- if value is None:
+ Process:
+ - If NaN -> default POTENTIAL_GAMMA_DEFAULT with warning (missing or unparsable).
+ - If outside [0,1] -> clamp + warning including original value.
+ - Guarantee returned float ∈ [0,1].
+ """
+ gamma = _get_float_param(params, "potential_gamma", np.nan)
+ if not np.isfinite(gamma):
warnings.warn(
- "potential_gamma not found in config, using default value of 0.95. "
- "This parameter controls the discount factor for PBRS potential shaping.",
- category=UserWarning,
+ f"potential_gamma not specified or invalid; defaulting to {POTENTIAL_GAMMA_DEFAULT}",
+ RewardDiagnosticsWarning,
stacklevel=2,
)
- return 0.95
-
- if isinstance(value, (int, float)):
- return float(value)
-
- warnings.warn(
- f"Invalid potential_gamma value: {value}. Using default 0.95. "
- "Expected numeric value in [0, 1].",
- category=UserWarning,
- stacklevel=2,
- )
- return 0.95
+ return POTENTIAL_GAMMA_DEFAULT
+ if gamma < 0.0 or gamma > 1.0:
+ original = gamma
+ gamma = float(np.clip(gamma, 0.0, 1.0))
+ warnings.warn(
+ f"potential_gamma={original} outside [0,1]; clamped to {gamma}",
+ RewardDiagnosticsWarning,
+ stacklevel=2,
+ )
+ return gamma
+ return float(gamma)
def _get_str_param(params: RewardParams, key: str, default: str) -> str:
def _compute_hold_potential(
pnl: float, duration_ratio: float, params: RewardParams
) -> float:
- """
- Compute PBRS hold potential: Φ(s) = scale * 0.5 * [T_pnl(g*pnl_ratio) + T_dur(g*duration_ratio)].
-
- This implements the canonical PBRS potential function from Ng et al. (1999):
- R'(s,a,s') = R_base(s,a,s') + γΦ(s') - Φ(s)
-
- Args:
- pnl: Current profit/loss ratio
- duration_ratio: Current duration as fraction of max_trade_duration
- params: Reward parameters containing PBRS configuration
-
- Returns:
- Potential value Φ(s)
- """
+ """Compute PBRS hold potential Φ(s)."""
if not _get_bool_param(params, "hold_potential_enabled", True):
return _fail_safely("hold_potential_disabled")
-
- scale = _get_float_param(params, "hold_potential_scale", 1.0)
- gain = _get_float_param(params, "hold_potential_gain", 1.0)
- transform_pnl = _get_str_param(params, "hold_potential_transform_pnl", "tanh")
- transform_duration = _get_str_param(
- params, "hold_potential_transform_duration", "tanh"
+ return _compute_bi_component(
+ kind="hold_potential",
+ pnl=pnl,
+ duration_ratio=duration_ratio,
+ params=params,
+ scale_key="hold_potential_scale",
+ gain_key="hold_potential_gain",
+ transform_pnl_key="hold_potential_transform_pnl",
+ transform_dur_key="hold_potential_transform_duration",
+ non_finite_key="non_finite_hold_potential",
)
- sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0)
-
- # Apply transforms
- if transform_pnl == "softsign_sharp":
- t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness)
- else:
- t_pnl = apply_transform(transform_pnl, gain * pnl)
-
- if transform_duration == "softsign_sharp":
- t_dur = apply_transform(
- transform_duration, gain * duration_ratio, sharpness=sharpness
- )
- else:
- t_dur = apply_transform(transform_duration, gain * duration_ratio)
-
- potential = scale * 0.5 * (t_pnl + t_dur)
-
- # Validate numerical safety
- if not np.isfinite(potential):
- return _fail_safely("non_finite_hold_potential")
-
- return float(potential)
def _compute_entry_additive(
pnl: float, duration_ratio: float, params: RewardParams
) -> float:
- """
- Compute entry additive reward (non-PBRS component).
-
- Args:
- pnl: Current profit/loss ratio
- duration_ratio: Current duration as fraction of max_trade_duration
- params: Reward parameters
-
- Returns:
- Entry additive reward
- """
if not _get_bool_param(params, "entry_additive_enabled", False):
return _fail_safely("entry_additive_disabled")
-
- scale = _get_float_param(params, "entry_additive_scale", 1.0)
- gain = _get_float_param(params, "entry_additive_gain", 1.0)
- transform_pnl = _get_str_param(params, "entry_additive_transform_pnl", "tanh")
- transform_duration = _get_str_param(
- params, "entry_additive_transform_duration", "tanh"
+ return _compute_bi_component(
+ kind="entry_additive",
+ pnl=pnl,
+ duration_ratio=duration_ratio,
+ params=params,
+ scale_key="entry_additive_scale",
+ gain_key="entry_additive_gain",
+ transform_pnl_key="entry_additive_transform_pnl",
+ transform_dur_key="entry_additive_transform_duration",
+ non_finite_key="non_finite_entry_additive",
)
- sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0)
-
- # Apply transforms
- if transform_pnl == "softsign_sharp":
- t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness)
- else:
- t_pnl = apply_transform(transform_pnl, gain * pnl)
-
- if transform_duration == "softsign_sharp":
- t_dur = apply_transform(
- transform_duration, gain * duration_ratio, sharpness=sharpness
- )
- else:
- t_dur = apply_transform(transform_duration, gain * duration_ratio)
-
- additive = scale * 0.5 * (t_pnl + t_dur)
-
- # Validate numerical safety
- if not np.isfinite(additive):
- return _fail_safely("non_finite_entry_additive")
-
- return float(additive)
def _compute_exit_additive(
pnl: float, duration_ratio: float, params: RewardParams
) -> float:
- """
- Compute exit additive reward (non-PBRS component).
-
- Args:
- pnl: Final profit/loss ratio at exit
- duration_ratio: Final duration as fraction of max_trade_duration
- params: Reward parameters
-
- Returns:
- Exit additive reward
- """
if not _get_bool_param(params, "exit_additive_enabled", False):
return _fail_safely("exit_additive_disabled")
-
- scale = _get_float_param(params, "exit_additive_scale", 1.0)
- gain = _get_float_param(params, "exit_additive_gain", 1.0)
- transform_pnl = _get_str_param(params, "exit_additive_transform_pnl", "tanh")
- transform_duration = _get_str_param(
- params, "exit_additive_transform_duration", "tanh"
+ return _compute_bi_component(
+ kind="exit_additive",
+ pnl=pnl,
+ duration_ratio=duration_ratio,
+ params=params,
+ scale_key="exit_additive_scale",
+ gain_key="exit_additive_gain",
+ transform_pnl_key="exit_additive_transform_pnl",
+ transform_dur_key="exit_additive_transform_duration",
+ non_finite_key="non_finite_exit_additive",
)
- sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0)
- # Apply transforms
- if transform_pnl == "softsign_sharp":
- t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness)
- else:
- t_pnl = apply_transform(transform_pnl, gain * pnl)
- if transform_duration == "softsign_sharp":
- t_dur = apply_transform(
- transform_duration, gain * duration_ratio, sharpness=sharpness
- )
- else:
- t_dur = apply_transform(transform_duration, gain * duration_ratio)
-
- additive = scale * 0.5 * (t_pnl + t_dur)
-
- # Validate numerical safety
- if not np.isfinite(additive):
- return _fail_safely("non_finite_exit_additive")
-
- return float(additive)
-
-
-def _compute_exit_potential(
- pnl: float, duration_ratio: float, params: RewardParams, last_potential: float = 0.0
-) -> float:
+def _compute_exit_potential(last_potential: float, params: RewardParams) -> float:
"""Compute next potential Φ(s') for closing/exit transitions.
- Mirrors the original environment semantics:
+ Semantics:
- canonical: Φ' = 0.0
- progressive_release: Φ' = Φ * (1 - decay) with decay clamped to [0,1]
- spike_cancel: Φ' = Φ / γ (neutralizes shaping spike ≈ 0 net effect) if γ>0 else Φ
- retain_previous: Φ' = Φ
- Invalid modes fall back to canonical.
- Any non-finite resulting potential is coerced to 0.0.
+
+ Invalid modes fall back to canonical. Any non-finite resulting potential is
+ coerced to 0.0.
"""
mode = _get_str_param(params, "exit_potential_mode", "canonical")
if mode == "canonical":
if mode == "progressive_release":
decay = _get_float_param(params, "exit_potential_decay", 0.5)
- if not np.isfinite(decay) or decay < 0.0:
+ if not np.isfinite(decay):
+ warnings.warn(
+ "exit_potential_decay invalid (NaN/inf); defaulting to 0.5",
+ RewardDiagnosticsWarning,
+ stacklevel=2,
+ )
decay = 0.5
+ if decay < 0.0:
+ warnings.warn(
+ f"exit_potential_decay={decay} < 0; clamped to 0.0",
+ RewardDiagnosticsWarning,
+ stacklevel=2,
+ )
+ decay = 0.0
if decay > 1.0:
+ warnings.warn(
+ f"exit_potential_decay={decay} > 1; clamped to 1.0",
+ RewardDiagnosticsWarning,
+ stacklevel=2,
+ )
decay = 1.0
next_potential = last_potential * (1.0 - decay)
elif mode == "spike_cancel":
# Enforce PBRS invariance (auto-disable additives in canonical mode)
params = _enforce_pbrs_invariance(params)
- # Validate gamma (with None handling matching original environment)
+ # Resolve gamma
gamma = _get_potential_gamma(params)
- if not (0.0 <= gamma <= 1.0):
- raise ValueError(f"Invalid gamma: {gamma}. Must be in [0, 1]")
# Compute current potential Φ(s)
current_potential = _compute_hold_potential(
# Compute next potential Φ(s')
if is_terminal:
- next_potential = _compute_exit_potential(
- next_pnl, next_duration_ratio, params, last_potential
- )
+ next_potential = _compute_exit_potential(last_potential, params)
else:
next_potential = _compute_hold_potential(next_pnl, next_duration_ratio, params)
else 0.0
)
- # Invariance diagnostic
- _log_pbrs_invariance_warning(params)
-
# Total reward
total_reward = base_reward + shaping_reward + entry_additive + exit_additive
def _enforce_pbrs_invariance(params: RewardParams) -> RewardParams:
- """Enforce PBRS invariance by auto-disabling additives in canonical mode.
-
- Matches original environment behavior: canonical mode automatically
- disables entry/exit additives to preserve theoretical invariance.
-
- PBRS invariance (Ng et al. 1999) requires:
- - canonical exit_potential_mode (terminal Φ=0)
- - No path-dependent additive reward components enabled.
-
- Returns modified params dict with invariance enforced.
- """
+ """Enforce PBRS invariance by auto-disabling additives in canonical mode."""
mode = _get_str_param(params, "exit_potential_mode", "canonical")
- if mode == "canonical":
- # Make a copy to avoid mutating input
- enforced_params = dict(params)
- entry_enabled = _get_bool_param(params, "entry_additive_enabled", False)
- exit_enabled = _get_bool_param(params, "exit_additive_enabled", False)
-
- if entry_enabled:
- warnings.warn(
- "Disabling entry additive to preserve PBRS invariance (canonical mode).",
- category=UserWarning,
- stacklevel=2,
- )
- enforced_params["entry_additive_enabled"] = False
-
- if exit_enabled:
- warnings.warn(
- "Disabling exit additive to preserve PBRS invariance (canonical mode).",
- category=UserWarning,
- stacklevel=2,
- )
- enforced_params["exit_additive_enabled"] = False
-
- return enforced_params
+ if mode != "canonical":
+ return params
+ if params.get("_pbrs_invariance_applied"):
+ return params
+
+ entry_enabled = _get_bool_param(params, "entry_additive_enabled", False)
+ exit_enabled = _get_bool_param(params, "exit_additive_enabled", False)
+ if entry_enabled:
+ warnings.warn(
+ "Disabling entry additive to preserve PBRS invariance (canonical mode).",
+ RewardDiagnosticsWarning,
+ stacklevel=2,
+ )
+ params["entry_additive_enabled"] = False
+ if exit_enabled:
+ warnings.warn(
+ "Disabling exit additive to preserve PBRS invariance (canonical mode).",
+ RewardDiagnosticsWarning,
+ stacklevel=2,
+ )
+ params["exit_additive_enabled"] = False
+ params["_pbrs_invariance_applied"] = True
return params
-def _log_pbrs_invariance_warning(params: RewardParams) -> None:
- """Log an informational message if invariance conditions are violated.
+def _compute_bi_component(
+ kind: str,
+ pnl: float,
+ duration_ratio: float,
+ params: RewardParams,
+ scale_key: str,
+ gain_key: str,
+ transform_pnl_key: str,
+ transform_dur_key: str,
+ non_finite_key: str,
+) -> float:
+ """Generic helper for (pnl, duration) bi-component transforms.
- PBRS invariance (Ng et al. 1999) requires:
- - canonical exit_potential_mode (terminal Φ=0)
- - No path-dependent additive reward components enabled.
- This mirrors original environment diagnostic behavior.
+ Consolidates duplicated logic across hold potential & additives.
"""
- mode = _get_str_param(params, "exit_potential_mode", "canonical")
- if mode == "canonical":
- if _get_bool_param(params, "entry_additive_enabled", False) or _get_bool_param(
- params, "exit_additive_enabled", False
- ):
- warnings.warn(
- (
- "PBRS invariance relaxed: canonical mode with additive components enabled "
- f"(entry_additive_enabled={_get_bool_param(params, 'entry_additive_enabled', False)}, "
- f"exit_additive_enabled={_get_bool_param(params, 'exit_additive_enabled', False)})"
- ),
- category=UserWarning,
- stacklevel=2,
- )
+ scale = _get_float_param(params, scale_key, 1.0)
+ gain = _get_float_param(params, gain_key, 1.0)
+ transform_pnl = _get_str_param(params, transform_pnl_key, "tanh")
+ transform_duration = _get_str_param(params, transform_dur_key, "tanh")
+ sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0)
+
+ if transform_pnl == "softsign_sharp":
+ t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness)
+ else:
+ t_pnl = apply_transform(transform_pnl, gain * pnl)
+ if transform_duration == "softsign_sharp":
+ t_dur = apply_transform(
+ transform_duration, gain * duration_ratio, sharpness=sharpness
+ )
+ else:
+ t_dur = apply_transform(transform_duration, gain * duration_ratio)
+ value = scale * 0.5 * (t_pnl + t_dur)
+ if not np.isfinite(value):
+ return _fail_safely(non_finite_key)
+ return float(value)
if __name__ == "__main__":
--- /dev/null
+"""CLI integration smoke test for reward_space_analysis.
+
+Purpose
+-------
+Execute a bounded, optionally shuffled subset of parameter combinations for
+`reward_space_analysis.py` to verify end-to-end execution (smoke / regression
+signal, not correctness proof).
+
+Key features
+------------
+* Deterministic sampling with optional shuffling (`--shuffle-seed`).
+* Optional duplication of first N scenarios under strict diagnostics
+ (`--strict-sample`).
+* Per-scenario timing and aggregate statistics (mean / min / max seconds).
+* Simple warning counting + (patch adds) breakdown of distinct warning lines.
+* Scenario list + seed metadata exported for reproducibility.
+* Direct CLI forwarding of bootstrap resample count to child process.
+
+Usage
+-----
+python test_cli.py --samples 50 --out-dir ../sample_run_output_smoke \
+ --shuffle-seed 123 --strict-sample 3 --bootstrap-resamples 200
+
+JSON Summary fields
+-------------------
+total, ok, failures[], warnings_total, warnings_breakdown, mean_seconds,
+max_seconds, min_seconds, strict_duplicated, scenarios (list), seeds (metadata).
+
+Exit codes
+----------
+0: success, 1: failures present, 130: interrupted (partial summary written).
+"""
+
+from __future__ import annotations
+
+import argparse
+import itertools
+import json
+import os
+import platform
+import random
+import subprocess
+import sys
+import tempfile
+import time
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple, TypedDict
+
+ConfigTuple = Tuple[str, str, float, int, int, int]
+
+
+SUMMARY_FILENAME = "reward_space_cli_smoke_results.json"
+
+
+class ScenarioResult(TypedDict):
+ config: ConfigTuple
+ status: str
+ stdout: str
+ stderr: str
+ strict: bool
+ seconds: Optional[float]
+ warnings: int
+
+
+class SummaryResult(TypedDict):
+ total: int
+ ok: int
+ failures: List[ScenarioResult]
+ warnings_total: int
+ mean_seconds: Optional[float]
+ max_seconds: Optional[float]
+ min_seconds: Optional[float]
+ strict_duplicated: int
+
+
+def build_arg_matrix(
+ max_scenarios: int = 40,
+ shuffle_seed: Optional[int] = None,
+) -> List[ConfigTuple]:
+ exit_potential_modes = [
+ "canonical",
+ "progressive_release",
+ "retain_previous",
+ "spike_cancel",
+ ]
+ exit_attenuation_modes = ["linear", "sqrt", "power", "half_life", "legacy"]
+ potential_gammas = [0.0, 0.5, 0.95, 0.999]
+ hold_enabled = [0, 1]
+ entry_additive_enabled = [0, 1]
+ exit_additive_enabled = [0, 1]
+
+ product_iter = itertools.product(
+ exit_potential_modes,
+ exit_attenuation_modes,
+ potential_gammas,
+ hold_enabled,
+ entry_additive_enabled,
+ exit_additive_enabled,
+ )
+
+ full: List[ConfigTuple] = list(product_iter)
+ if shuffle_seed is not None:
+ rnd = random.Random(shuffle_seed)
+ rnd.shuffle(full)
+ if max_scenarios >= len(full):
+ return full
+ step = len(full) / max_scenarios
+ idx_pos = 0.0
+ selected: List[ConfigTuple] = []
+ for _ in range(max_scenarios):
+ idx = int(idx_pos)
+ if idx >= len(full):
+ idx = len(full) - 1
+ selected.append(full[idx])
+ idx_pos += step
+ return selected
+
+
+def run_scenario(
+ script: Path,
+ out_dir: Path,
+ idx: int,
+ total: int,
+ base_samples: int,
+ conf: ConfigTuple,
+ strict: bool,
+ bootstrap_resamples: int,
+ timeout: int,
+) -> ScenarioResult:
+ (
+ exit_potential_mode,
+ exit_attenuation_mode,
+ potential_gamma,
+ hold_enabled,
+ entry_additive_enabled,
+ exit_additive_enabled,
+ ) = conf
+ scenario_dir = out_dir / f"scenario_{idx:02d}"
+ scenario_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ sys.executable,
+ str(script),
+ "--num_samples",
+ str(base_samples),
+ "--output",
+ str(scenario_dir),
+ "--exit_potential_mode",
+ exit_potential_mode,
+ "--exit_attenuation_mode",
+ exit_attenuation_mode,
+ "--potential_gamma",
+ str(potential_gamma),
+ "--hold_potential_enabled",
+ str(hold_enabled),
+ "--entry_additive_enabled",
+ str(entry_additive_enabled),
+ "--exit_additive_enabled",
+ str(exit_additive_enabled),
+ "--seed",
+ str(100 + idx),
+ ]
+ # Forward bootstrap resamples explicitly
+ cmd += ["--bootstrap_resamples", str(bootstrap_resamples)]
+ if strict:
+ cmd.append("--strict_diagnostics")
+ start = time.perf_counter()
+ try:
+ proc = subprocess.run(
+ cmd, capture_output=True, text=True, check=False, timeout=timeout
+ )
+ except subprocess.TimeoutExpired:
+ return {
+ "config": conf,
+ "status": "timeout",
+ "stderr": "<timeout>",
+ "stdout": "",
+ "strict": strict,
+ "seconds": None,
+ "warnings": 0,
+ }
+ status = "ok" if proc.returncode == 0 else f"error({proc.returncode})"
+ end = time.perf_counter()
+ combined = (proc.stdout + "\n" + proc.stderr).lower()
+ warn_count = combined.count("warning")
+ return {
+ "config": conf,
+ "status": status,
+ "stdout": proc.stdout[-5000:],
+ "stderr": proc.stderr[-5000:],
+ "strict": strict,
+ "seconds": round(end - start, 4),
+ "warnings": warn_count,
+ }
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--samples", type=int, default=40, help="num synthetic samples per scenario"
+ )
+ parser.add_argument(
+ "--out-dir",
+ type=str,
+ default="sample_run_output_smoke",
+ help="output parent directory",
+ )
+ parser.add_argument(
+ "--shuffle-seed",
+ type=int,
+ default=None,
+ help="If set, shuffle full scenario space before sampling a diverse subset",
+ )
+ parser.add_argument(
+ "--strict-sample",
+ type=int,
+ default=0,
+ help="Duplicate the first N scenarios executed again with --strict_diagnostics",
+ )
+ parser.add_argument(
+ "--max-scenarios",
+ type=int,
+ default=40,
+ help="Maximum number of (non-strict) scenarios before strict duplication",
+ )
+ parser.add_argument(
+ "--bootstrap-resamples",
+ type=int,
+ default=120,
+ help="Number of bootstrap resamples to pass to child processes (speed/perf tradeoff)",
+ )
+ parser.add_argument(
+ "--per-scenario-timeout",
+ type=int,
+ default=600,
+ help="Timeout (seconds) per child process (default: 600)",
+ )
+ parser.add_argument(
+ "--store-full-logs",
+ action="store_true",
+ help="If set, store full stdout/stderr (may be large) instead of tail truncation.",
+ )
+ args = parser.parse_args()
+
+ # Basic validation
+ if args.max_scenarios <= 0:
+ parser.error("--max-scenarios must be > 0")
+ if args.samples <= 0:
+ parser.error("--samples must be > 0")
+ if args.strict_sample < 0:
+ parser.error("--strict-sample must be >= 0")
+ if args.bootstrap_resamples <= 0:
+ parser.error("--bootstrap-resamples must be > 0")
+
+ script = Path(__file__).parent / "reward_space_analysis.py"
+ out_dir = Path(args.out_dir)
+ out_dir.mkdir(parents=True, exist_ok=True)
+
+ scenarios = build_arg_matrix(
+ max_scenarios=args.max_scenarios, shuffle_seed=args.shuffle_seed
+ )
+
+ # Prepare list of (conf, strict_flag)
+ scenario_pairs: List[Tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios]
+ strict_n = max(0, min(args.strict_sample, len(scenarios)))
+ for c in scenarios[:strict_n]:
+ scenario_pairs.append((c, True))
+
+ results: List[ScenarioResult] = []
+ total = len(scenario_pairs)
+ interrupted = False
+ try:
+ for i, (conf, strict_flag) in enumerate(scenario_pairs, start=1):
+ # Ensure child process sees the chosen bootstrap resamples via direct CLI args only
+ res = run_scenario(
+ script,
+ out_dir,
+ i,
+ total,
+ args.samples,
+ conf,
+ strict=strict_flag,
+ bootstrap_resamples=args.bootstrap_resamples,
+ timeout=args.per_scenario_timeout,
+ )
+ results.append(res)
+ status = res["status"]
+ tag = "[strict]" if strict_flag else ""
+ secs = res.get("seconds")
+ secs_str = f" {secs:.2f}s" if secs is not None else ""
+ print(f"[{i}/{total}] {conf} {tag} -> {status}{secs_str}")
+ except KeyboardInterrupt:
+ interrupted = True
+ print("\nKeyboardInterrupt received: writing partial summary...")
+
+ ok = sum(1 for r in results if r["status"] == "ok")
+ failures = [r for r in results if r["status"] != "ok"]
+ total_warnings = sum(r["warnings"] for r in results)
+ durations: List[float] = [
+ float(r["seconds"]) for r in results if isinstance(r["seconds"], float)
+ ]
+ summary: SummaryResult = {
+ "total": len(results),
+ "ok": ok,
+ "failures": failures,
+ "warnings_total": total_warnings,
+ "mean_seconds": round(sum(durations) / len(durations), 4)
+ if durations
+ else None,
+ "max_seconds": max(durations) if durations else None,
+ "min_seconds": min(durations) if durations else None,
+ "strict_duplicated": strict_n,
+ }
+ # Build warning breakdown (simple line fingerprinting)
+ warning_counts: Dict[str, int] = {}
+ for r in results:
+ text = (r["stderr"] + "\n" + r["stdout"]).splitlines()
+ for line in text:
+ if "warning" in line.lower():
+ # Fingerprint: trim + collapse whitespace + limit length
+ fp = " ".join(line.strip().split())[:160]
+ warning_counts[fp] = warning_counts.get(fp, 0) + 1
+
+ # Scenario export (list of configs only, excluding strict flag duplication detail)
+ scenario_list = [list(c) for c, _ in scenario_pairs]
+
+ # Collect environment + reproducibility metadata
+ def _git_hash() -> Optional[str]:
+ try:
+ proc = subprocess.run(
+ ["git", "rev-parse", "--short", "HEAD"],
+ capture_output=True,
+ text=True,
+ check=False,
+ timeout=2,
+ )
+ if proc.returncode == 0:
+ return proc.stdout.strip() or None
+ except Exception:
+ return None
+ return None
+
+ summary_extra: Dict[str, Any] = {
+ "warnings_breakdown": warning_counts,
+ "scenarios": scenario_list,
+ "seeds": {
+ "shuffle_seed": args.shuffle_seed,
+ "strict_sample": args.strict_sample,
+ "max_scenarios": args.max_scenarios,
+ "bootstrap_resamples": args.bootstrap_resamples,
+ },
+ "metadata": {
+ "timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
+ "python_version": sys.version.split()[0],
+ "platform": platform.platform(),
+ "git_commit": _git_hash(),
+ "schema_version": 1,
+ "per_scenario_timeout": args.per_scenario_timeout,
+ },
+ }
+ serializable: Dict[str, Any]
+ if interrupted:
+ serializable = {**summary, **summary_extra, "interrupted": True}
+ else:
+ serializable = {**summary, **summary_extra}
+ # Atomic write to avoid corrupt partial files
+ tmp_fd, tmp_path = tempfile.mkstemp(prefix="_tmp_summary_", dir=str(out_dir))
+ try:
+ with os.fdopen(tmp_fd, "w", encoding="utf-8") as fh:
+ json.dump(serializable, fh, indent=2)
+ os.replace(tmp_path, out_dir / SUMMARY_FILENAME)
+ except Exception:
+ # Best effort fallback
+ try:
+ Path(out_dir / SUMMARY_FILENAME).write_text(
+ json.dumps(serializable, indent=2), encoding="utf-8"
+ )
+ finally:
+ if os.path.exists(tmp_path):
+ try:
+ os.remove(tmp_path)
+ except OSError:
+ pass
+ else:
+ if os.path.exists(tmp_path): # Should have been moved; defensive cleanup
+ try:
+ os.remove(tmp_path)
+ except OSError:
+ pass
+ print("Summary written to", out_dir / SUMMARY_FILENAME)
+ if not interrupted and summary["failures"]:
+ print("Failures detected:")
+ for f in summary["failures"]:
+ print(f" - {f['config']}: {f['status']}")
+ sys.exit(1)
+ if interrupted:
+ sys.exit(130)
+
+
+if __name__ == "__main__": # pragma: no cover
+ main()
import json
import math
import pickle
+import re
import shutil
import subprocess
import sys
import unittest
import warnings
from pathlib import Path
-from typing import Iterable, Sequence
+from typing import Iterable, Optional, Sequence, Union
import numpy as np
import pandas as pd
# Import functions to test
try:
from reward_space_analysis import (
+ ATTENUATION_MODES,
+ ATTENUATION_MODES_WITH_LEGACY,
DEFAULT_MODEL_REWARD_PARAMETERS,
+ PBRS_INVARIANCE_TOL,
Actions,
Positions,
RewardContext,
sys.exit(1)
+# --- Helper factories (DRY for contexts and params) ---
+def base_params(**overrides) -> dict:
+ """Return a fresh copy of DEFAULT_MODEL_REWARD_PARAMETERS with overrides applied.
+
+ Ensures tests do not mutate the global defaults inadvertently.
+ """
+ params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+ params.update(overrides)
+ return params
+
+
+def make_ctx(
+ *,
+ pnl: float = 0.0,
+ trade_duration: int = 0,
+ idle_duration: int = 0,
+ max_trade_duration: int = 100,
+ max_unrealized_profit: float = 0.0,
+ min_unrealized_profit: float = 0.0,
+ position: Positions = Positions.Neutral,
+ action: Actions = Actions.Neutral,
+) -> RewardContext:
+ """Factory for RewardContext with neutral defaults.
+
+ Only fields explicitly varied in a test need to be specified; keeps test bodies concise.
+ """
+ return RewardContext(
+ pnl=pnl,
+ trade_duration=trade_duration,
+ idle_duration=idle_duration,
+ max_trade_duration=max_trade_duration,
+ max_unrealized_profit=max_unrealized_profit,
+ min_unrealized_profit=min_unrealized_profit,
+ position=position,
+ action=action,
+ )
+
+
class RewardSpaceTestBase(unittest.TestCase):
"""Base class with common test utilities.
def assertAlmostEqualFloat(
self,
- first: float | int,
- second: float | int,
+ first: Union[float, int],
+ second: Union[float, int],
tolerance: float = 1e-6,
- msg: str | None = None,
+ msg: Union[str, None] = None,
) -> None:
"""Absolute tolerance compare with explicit failure and finite check."""
self.assertFinite(first, name="a")
)
# --- Statistical bounds helpers (factorize redundancy) ---
- def assertPValue(self, value: float | int, msg: str = "") -> None:
+ def assertPValue(self, value: Union[float, int], msg: str = "") -> None:
"""Assert a p-value is finite and within [0,1]."""
self.assertFinite(value, name="p-value")
self.assertGreaterEqual(value, 0.0, msg or f"p-value < 0: {value}")
def assertDistanceMetric(
self,
- value: float | int,
+ value: Union[float, int],
*,
non_negative: bool = True,
- upper: float | None = None,
+ upper: Optional[float] = None,
name: str = "metric",
) -> None:
"""Generic distance/divergence bounds: finite, optional non-negativity and optional upper bound."""
def assertEffectSize(
self,
- value: float | int,
+ value: Union[float, int],
*,
lower: float = -1.0,
upper: float = 1.0,
self.assertGreaterEqual(value, lower, f"{name} < {lower}: {value}")
self.assertLessEqual(value, upper, f"{name} > {upper}: {value}")
- def assertFinite(self, value: float | int, name: str = "value") -> None:
+ def assertFinite(self, value: Union[float, int], name: str = "value") -> None:
"""Assert scalar is finite."""
if not np.isfinite(value): # low-level base check to avoid recursion
self.fail(f"{name} not finite: {value}")
def assertMonotonic(
self,
- seq: Sequence[float | int] | Iterable[float | int],
+ seq: Union[Sequence[Union[float, int]], Iterable[Union[float, int]]],
*,
- non_increasing: bool | None = None,
- non_decreasing: bool | None = None,
+ non_increasing: Optional[bool] = None,
+ non_decreasing: Optional[bool] = None,
tolerance: float = 0.0,
name: str = "sequence",
) -> None:
def assertWithin(
self,
- value: float | int,
- low: float | int,
- high: float | int,
+ value: Union[float, int],
+ low: Union[float, int],
+ high: Union[float, int],
*,
name: str = "value",
inclusive: bool = True,
with open(self.output_path / run_dir / "manifest.json", "r") as f:
manifest = json.load(f)
- required_keys = {"generated_at", "num_samples", "seed", "params_hash"}
- self.assertTrue(required_keys.issubset(manifest.keys()))
+ required_keys = {
+ "generated_at",
+ "num_samples",
+ "seed",
+ "params_hash",
+ "reward_params",
+ "simulation_params",
+ }
+ self.assertTrue(
+ required_keys.issubset(manifest.keys()),
+ f"Missing keys: {required_keys - set(manifest.keys())}",
+ )
+ self.assertIsInstance(manifest["reward_params"], dict)
+ self.assertIsInstance(manifest["simulation_params"], dict)
+ # Legacy fields must be absent
+ self.assertNotIn("top_features", manifest)
+ self.assertNotIn("reward_param_overrides", manifest)
+ self.assertNotIn("params", manifest)
self.assertEqual(manifest["num_samples"], self.TEST_SAMPLES)
self.assertEqual(manifest["seed"], self.SEED)
def test_basic_reward_calculation(self):
"""Test basic reward calculation consistency."""
- context = RewardContext(
+ context = make_ctx(
pnl=self.TEST_PROFIT_TARGET,
trade_duration=10,
- idle_duration=0,
max_trade_duration=100,
max_unrealized_profit=0.025,
min_unrealized_profit=0.015,
"""
# Build context where pnl == 0.0 and max_unrealized_profit == pnl
- ctx = RewardContext(
+ ctx = make_ctx(
pnl=0.0,
trade_duration=1,
- idle_duration=0,
max_trade_duration=100,
max_unrealized_profit=0.0,
min_unrealized_profit=-0.02,
params = self.DEFAULT_PARAMS.copy()
profit_target = self.TEST_PROFIT_TARGET * self.TEST_RR
- pnl_factor = _get_pnl_factor(params, ctx, profit_target)
+ pnl_factor = _get_pnl_factor(params, ctx, profit_target, self.TEST_RR)
# Expect no efficiency modulation: factor should be >= 0 and close to 1.0
self.assertFinite(pnl_factor, name="pnl_factor")
self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=1e-6)
base_factor = self.TEST_BASE_FACTOR
idle_duration = 40 # below large threshold, near small threshold
- context = RewardContext(
+ context = make_ctx(
pnl=0.0,
trade_duration=0,
idle_duration=idle_duration,
params = self.DEFAULT_PARAMS.copy()
params.update(
{
- "potential_gamma": 0.95,
+ "potential_gamma": DEFAULT_MODEL_REWARD_PARAMETERS["potential_gamma"],
"exit_potential_mode": "progressive_release",
"exit_potential_decay": 5.0, # should clamp to 1.0
"hold_potential_enabled": True,
current_dur = 0.4
prev_potential = _compute_hold_potential(current_pnl, current_dur, params)
# Use helper accessor to avoid union type issues
- gamma = _get_float_param(params, "potential_gamma", 0.95)
+ gamma = _get_float_param(
+ params,
+ "potential_gamma",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95),
+ )
expected_next = (
prev_potential / gamma if gamma not in (0.0, None) else prev_potential
)
risk_reward_ratio = 1.0
# Two contexts with different idle durations
- ctx_a = RewardContext(
+ ctx_a = make_ctx(
pnl=0.0,
trade_duration=0,
idle_duration=20,
def test_idle_penalty_zero_when_profit_target_zero(self):
"""If profit_target=0 → idle_factor=0 → idle penalty must be exactly 0 for neutral idle state."""
- context = RewardContext(
+ context = make_ctx(
pnl=0.0,
trade_duration=0,
idle_duration=30,
def test_bootstrap_confidence_intervals(self):
"""Test bootstrap confidence intervals calculation."""
# Create test data
- np.random.seed(42)
+ np.random.seed(self.SEED)
test_data = pd.DataFrame(
{
"reward_total": np.random.normal(0, 1, 100),
def test_distribution_metrics_mathematical_bounds(self):
"""Test mathematical bounds and validity of distribution shift metrics."""
# Create two different distributions for testing
- np.random.seed(42)
+ np.random.seed(self.SEED)
df1 = pd.DataFrame(
{
"pnl": np.random.normal(0, self.TEST_PNL_STD, 500),
def test_statistical_functions_bounds_validation(self):
"""Test that all statistical functions respect mathematical bounds."""
# Create test data with known statistical properties
- np.random.seed(42)
+ np.random.seed(self.SEED)
df = pd.DataFrame(
{
"reward_total": np.random.normal(0, 1, 300),
def test_different_exit_attenuation_modes(self):
"""Test different exit attenuation modes (legacy, sqrt, linear, power, half_life)."""
- modes = ["legacy", "sqrt", "linear", "power", "half_life"]
+ # Use canonical constant (includes legacy) instead of hardcoded literals
+ modes = ATTENUATION_MODES_WITH_LEGACY
for mode in modes:
with self.subTest(mode=mode):
def test_statistical_functions(self):
"""Test statistical functions."""
-
# Create test data with specific patterns
- np.random.seed(42)
+ np.random.seed(self.SEED)
test_data = pd.DataFrame(
{
"reward_total": np.random.normal(0, 1, 200),
Modes covered: sqrt, linear, power, half_life, plateau+linear (after grace).
Legacy is excluded (non-monotonic by design). Plateau+linear includes flat grace then monotonic.
"""
-
- modes = ["sqrt", "linear", "power", "half_life", "plateau_linear"]
+ # Start from canonical modes (excluding legacy already) and add synthetic plateau_linear variant
+ modes = list(ATTENUATION_MODES) + ["plateau_linear"]
base_factor = self.TEST_BASE_FACTOR
pnl = 0.05
pnl_factor = 1.0
params = self.DEFAULT_PARAMS.copy()
# Try multiple modes / extreme params
- modes = ["linear", "power", "half_life", "sqrt", "legacy", "linear_plateau"]
+ # All canonical modes + legacy + synthetic plateau variant
+ modes = list(ATTENUATION_MODES_WITH_LEGACY) + ["plateau_linear"]
base_factor = self.TEST_BASE_FACTOR
pnl = 0.05
pnl_factor = 2.0 # amplified
for mode in modes:
params_mode = params.copy()
- if mode == "linear_plateau":
+ if mode == "plateau_linear":
params_mode["exit_attenuation_mode"] = "linear"
params_mode["exit_plateau"] = True
params_mode["exit_plateau_grace"] = 0.4
def test_exit_potential_canonical(self):
"""Test exit potential in canonical mode."""
params = {"exit_potential_mode": "canonical"}
- val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0)
+ val = _compute_exit_potential(1.0, params)
self.assertEqual(val, 0.0)
def test_exit_potential_progressive_release(self):
"exit_potential_decay": 0.8,
}
# Expected: Φ' = Φ * (1 - decay) = 1 * (1 - 0.8) = 0.2
- val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0)
+ val = _compute_exit_potential(1.0, params)
self.assertAlmostEqual(val, 0.2)
def test_exit_potential_spike_cancel(self):
"""Spike cancel: Φ' = Φ / γ (inversion)."""
- params = {"exit_potential_mode": "spike_cancel", "potential_gamma": 0.95}
- val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0)
+ params = {
+ "exit_potential_mode": "spike_cancel",
+ "potential_gamma": DEFAULT_MODEL_REWARD_PARAMETERS["potential_gamma"],
+ }
+ val = _compute_exit_potential(1.0, params)
self.assertAlmostEqual(val, 1.0 / 0.95, places=7)
def test_exit_potential_retain_previous(self):
"""Test exit potential in retain previous mode."""
params = {"exit_potential_mode": "retain_previous"}
- val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0)
+ val = _compute_exit_potential(1.0, params)
self.assertEqual(val, 1.0)
def test_pbrs_terminal_canonical(self):
- """Test PBRS behavior in canonical mode with terminal state."""
- params = {
- "potential_gamma": 0.95,
- "hold_potential_enabled": True,
- "hold_potential_scale": 1.0,
- "hold_potential_gain": 1.0,
- "hold_potential_transform_pnl": "tanh",
- "hold_potential_transform_duration": "tanh",
- "exit_potential_mode": "canonical",
- "entry_additive_enabled": False,
- "exit_additive_enabled": False,
- }
+ """Canonical mode structural invariance checks.
- current_pnl = 0.5
- current_duration_ratio = 0.3
- expected_current_potential = _compute_hold_potential(
- current_pnl, current_duration_ratio, params
- )
-
- total_reward, shaping_reward, next_potential = apply_potential_shaping(
- base_reward=100.0,
- current_pnl=current_pnl,
- current_duration_ratio=current_duration_ratio,
- next_pnl=0.0,
- next_duration_ratio=0.0,
- is_terminal=True,
- last_potential=0.0,
- params=params,
+ We do NOT enforce Σ shaping ≈ 0 here (finite-horizon drift acceptable). We verify:
+ - Additive components auto-disabled (policy invariance enforcement)
+ - Terminal potential always zero (Φ_terminal = 0)
+ - Shaping magnitudes bounded (safety guard against spikes)
+ """
+ params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+ params.update(
+ {
+ "exit_potential_mode": "canonical",
+ "hold_potential_enabled": True,
+ "entry_additive_enabled": False,
+ "exit_additive_enabled": False,
+ }
)
-
- # Terminal potential should be 0 in canonical mode
- self.assertEqual(next_potential, 0.0)
- # Shaping reward should be negative (releasing potential)
- self.assertTrue(shaping_reward < 0)
- # Check exact formula: γΦ(s') - Φ(s) = 0.95 * 0 - expected_current_potential
- expected_shaping = 0.95 * 0.0 - expected_current_potential
- self.assertAlmostEqual(shaping_reward, expected_shaping, delta=1e-10)
-
- def test_pbrs_invalid_gamma(self):
- """Test PBRS with invalid gamma value."""
- params = {"potential_gamma": 1.5, "hold_potential_enabled": True}
- with self.assertRaises(ValueError):
- apply_potential_shaping(
+ rng = np.random.default_rng(123)
+ last_potential = 0.0
+ terminal_next_potentials: list[float] = []
+ shaping_values: list[float] = []
+ current_pnl = 0.0
+ current_dur = 0.0
+ for _ in range(350):
+ is_terminal = rng.uniform() < 0.08
+ # Sample candidate next state (ignored for terminal potential because canonical sets Φ'=0)
+ next_pnl = 0.0 if is_terminal else float(rng.normal(0, 0.2))
+ inc = rng.uniform(0, 0.12)
+ next_dur = 0.0 if is_terminal else float(min(1.0, current_dur + inc))
+ _total, shaping, next_potential = apply_potential_shaping(
base_reward=0.0,
- current_pnl=0.0,
- current_duration_ratio=0.0,
- next_pnl=0.0,
- next_duration_ratio=0.0,
- is_terminal=True,
- last_potential=0.0,
+ current_pnl=current_pnl,
+ current_duration_ratio=current_dur,
+ next_pnl=next_pnl,
+ next_duration_ratio=next_dur,
+ is_terminal=is_terminal,
+ last_potential=last_potential,
params=params,
)
-
- def test_calculate_reward_with_pbrs_integration(self):
- """Test that PBRS parameters are properly integrated in defaults."""
- # Test that PBRS parameters are in the default parameters
- for param in PBRS_INTEGRATION_PARAMS:
- self.assertIn(
- param,
- DEFAULT_MODEL_REWARD_PARAMETERS,
- f"PBRS parameter {param} missing from defaults",
- )
+ shaping_values.append(shaping)
+ if is_terminal:
+ terminal_next_potentials.append(next_potential)
+ last_potential = 0.0
+ current_pnl = 0.0
+ current_dur = 0.0
+ else:
+ last_potential = next_potential
+ current_pnl = next_pnl
+ current_dur = next_dur
+ # Assertions
+ self.assertFalse(params["entry_additive_enabled"]) # enforced
+ self.assertFalse(params["exit_additive_enabled"]) # enforced
+ if terminal_next_potentials:
+ self.assertTrue(all(abs(p) < 1e-12 for p in terminal_next_potentials))
+ max_abs = max(abs(v) for v in shaping_values) if shaping_values else 0.0
+ self.assertLessEqual(max_abs, 5.0)
# Test basic PBRS function integration works
params = {"hold_potential_enabled": True, "hold_potential_scale": 1.0}
f"Missing PBRS parameter: {param}",
)
+ # --- End of structural PBRS tests (legacy sum test removed) ---
+
+ def test_pbrs_progressive_release_decay_clamped_zero_current(self):
+ """progressive_release with decay>1 clamps to 1 (full release to 0).
+
+ Scenario isolates current Φ(s)=0 and non-zero previous potential carried in last_potential,
+ verifying clamp logic independent of current state potential.
+ """
+ params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+ params.update(
+ {
+ "exit_potential_mode": "progressive_release",
+ "exit_potential_decay": 5.0, # will clamp to 1.0
+ }
+ )
+ _total, shaping, next_potential = apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=0.0,
+ current_duration_ratio=0.0,
+ next_pnl=0.0,
+ next_duration_ratio=0.0,
+ is_terminal=True,
+ last_potential=0.8,
+ params=params,
+ )
+ # After clamp: next_potential = 0 and shaping = γ*0 - 0 (Φ(s)=0) => 0
+ self.assertAlmostEqualFloat(next_potential, 0.0, tolerance=1e-12)
+ self.assertAlmostEqualFloat(shaping, 0.0, tolerance=1e-9)
+
+ def test_pbrs_invalid_transform_fallback_potential(self):
+ """Invalid transform name in potential path must fall back without NaNs."""
+ params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+ params.update({"hold_potential_transform_pnl": "not_a_transform"})
+ _total, shaping, next_potential = apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=0.2,
+ current_duration_ratio=0.3,
+ next_pnl=0.25,
+ next_duration_ratio=0.35,
+ is_terminal=False,
+ last_potential=0.0,
+ params=params,
+ )
+ self.assertTrue(np.isfinite(shaping))
+ self.assertTrue(np.isfinite(next_potential))
+
+ def test_pbrs_gamma_zero_disables_progressive_effect(self):
+ """Gamma=0 => shaping term = -Φ(s); potential still updated for next step (Φ' used only if γ>0).
+
+ We only assert shaping bounded since Φ is transformed & scaled; ensures stability without exceptions.
+ """
+ params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+ params.update({"potential_gamma": 0.0, "hold_potential_enabled": True})
+ base_reward = 1.23
+ _total, shaping, next_potential = apply_potential_shaping(
+ base_reward=base_reward,
+ current_pnl=0.1,
+ current_duration_ratio=0.2,
+ next_pnl=0.15,
+ next_duration_ratio=0.25,
+ is_terminal=False,
+ last_potential=0.0,
+ params=params,
+ )
+ # Shaping bounded (Φ in [-scale, scale] after transforms), conservatively check |shaping| <= 1
+ self.assertLessEqual(abs(shaping), 1.0)
+ self.assertTrue(np.isfinite(next_potential))
+
+
+class TestReportFormatting(RewardSpaceTestBase):
+ """Tests for report formatting elements not previously covered."""
+
+ def test_abs_shaping_line_present_and_constant(self):
+ """Ensure the report contains the Abs Σ Shaping Reward line and tolerance not hard-coded elsewhere.
+
+ Strategy: run a minimal simulation via generate_report setUp artifact.
+ We reuse helper simulate_samples (already tested) to produce output then search in report.
+ """
+ # Generate a tiny dataset directly using existing pathway by invoking analysis main logic would be heavy.
+ # Instead we mimic the invariance block logic locally to ensure formatting; this keeps test fast and stable.
+ # Create a temporary markdown file to exercise the function that writes invariance section.
+ # Sanity check tolerance value
+ self.assertAlmostEqual(PBRS_INVARIANCE_TOL, 1e-6, places=12)
+
+ # Use small synthetic DataFrame with zero shaping sum (pandas imported globally)
+ df = pd.DataFrame(
+ {
+ "reward_shaping": [1e-12, -1e-12],
+ "reward_entry_additive": [0.0, 0.0],
+ "reward_exit_additive": [0.0, 0.0],
+ }
+ )
+ total_shaping = df["reward_shaping"].sum()
+ self.assertTrue(abs(total_shaping) < PBRS_INVARIANCE_TOL)
+ # Reconstruct lines exactly as writer does
+ lines = [
+ f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |",
+ ]
+ content = "\n".join(lines)
+ # Validate formatting pattern using regex
+ m = re.search(
+ r"\| Abs Σ Shaping Reward \| ([0-9]+\.[0-9]{6}e[+-][0-9]{2}) \|", content
+ )
+ self.assertIsNotNone(m, "Abs Σ Shaping Reward line missing or misformatted")
+ # Ensure scientific notation magnitude consistent with small number
+ val = float(m.group(1)) if m else None # type: ignore[arg-type]
+ if val is not None:
+ self.assertLess(val, 1e-8 + 1e-12)
+ # Ensure no stray hard-coded tolerance string inside content
+ self.assertNotIn(
+ "1e-6", content, "Tolerance should not be hard-coded in line output"
+ )
+
+ def test_pbrs_non_canonical_report_generation(self):
+ """Generate synthetic invariance section with non-zero shaping to assert Non-canonical classification."""
+ import re # local lightweight
+
+ df = pd.DataFrame(
+ {
+ "reward_shaping": [0.01, -0.002], # somme = 0.008 (>> tolérance)
+ "reward_entry_additive": [0.0, 0.0],
+ "reward_exit_additive": [0.001, 0.0],
+ }
+ )
+ total_shaping = df["reward_shaping"].sum()
+ self.assertGreater(abs(total_shaping), PBRS_INVARIANCE_TOL)
+ invariance_status = "❌ Non-canonical"
+ section = []
+ section.append("**PBRS Invariance Summary:**\n")
+ section.append("| Field | Value |\n")
+ section.append("|-------|-------|\n")
+ section.append(f"| Invariance | {invariance_status} |\n")
+ section.append(f"| Note | Total shaping = {total_shaping:.6f} (non-zero) |\n")
+ section.append(f"| Σ Shaping Reward | {total_shaping:.6f} |\n")
+ section.append(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n")
+ section.append(
+ f"| Σ Entry Additive | {df['reward_entry_additive'].sum():.6f} |\n"
+ )
+ section.append(
+ f"| Σ Exit Additive | {df['reward_exit_additive'].sum():.6f} |\n"
+ )
+ content = "".join(section)
+ self.assertIn("❌ Non-canonical", content)
+ self.assertRegex(content, r"Σ Shaping Reward \| 0\.008000 \|")
+ m_abs = re.search(r"Abs Σ Shaping Reward \| ([0-9.]+e[+-][0-9]{2}) \|", content)
+ self.assertIsNotNone(m_abs)
+ if m_abs:
+ self.assertAlmostEqual(abs(total_shaping), float(m_abs.group(1)), places=12)
+
if __name__ == "__main__":
# Configure test discovery and execution