where:
- `r_pnl = pnl / pnl_target`
-- `r_dur = clamp(duration_ratio, 0, 1)`
+- `r_dur = max(duration_ratio, 0)`
- `scale = base_factor · hold_potential_ratio`
- `g = hold_potential_gain`
- `T_pnl`, `T_dur` = configured transforms
"pandas",
"scikit-learn",
"scipy>=1.11",
- "pytest",
]
-[dependency-groups]
+[project.optional-dependencies]
dev = [
- "pytest>=6.0",
- "ruff",
- "pytest-cov>=7.0.0",
+ "pytest>=8.0",
+ "pytest-cov>=7.0",
+ "ruff>=0.8",
]
[build-system]
log_cli_format = "%(asctime)s [%(levelname)8s] %(name)s: %(message)s"
log_cli_date_format = "%Y-%m-%d %H:%M:%S"
-# Coverage configuration
addopts = [
"--verbose",
"--tb=short",
"--strict-markers",
"--color=yes",
- "--cov=reward_space_analysis",
- "--cov-config=pyproject.toml",
- "--cov-fail-under=85"
+ "--cov",
]
[tool.coverage.run]
source = ["reward_space_analysis"]
+branch = true
+parallel = true
+relative_files = true
omit = [
- "tests/*",
- "test_*.py",
+ "*/tests/*",
+ "**/test_*.py",
+ "**/__pycache__/*",
+]
+
+[tool.coverage.report]
+show_missing = true
+skip_empty = true
+fail_under = 85
+exclude_lines = [
+ "pragma: no cover",
+ "def __repr__",
+ "raise AssertionError",
+ "raise NotImplementedError",
+ "if TYPE_CHECKING:",
+ "if __name__ == .__main__.:",
+ "@abstractmethod",
]
[tool.ruff]
target-version = "py311"
[tool.ruff.lint]
-select = ["E", "F", "W", "I"]
-ignore = ["E501"]
+select = [
+ "E", # pycodestyle errors
+ "W", # pycodestyle warnings
+ "F", # pyflakes
+ "I", # isort
+ "B", # flake8-bugbear
+ "C4", # flake8-comprehensions
+ "UP", # pyupgrade
+ "SIM", # flake8-simplify
+ "TCH", # flake8-type-checking
+ "PTH", # flake8-use-pathlib
+ "RUF", # ruff-specific rules
+]
+ignore = [
+ "E501", # line too long
+]
+
+[tool.ruff.lint.isort]
+known-first-party = ["reward_space_analysis"]
+
+[tool.ruff.format]
+quote-style = "double"
+indent-style = "space"
import warnings
from enum import Enum, IntEnum
from pathlib import Path
-from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Any, Literal
import numpy as np
import pandas as pd
from scipy.spatial.distance import jensenshannon
from scipy.stats import entropy, probplot
+if TYPE_CHECKING:
+ from collections.abc import Iterable
+
try:
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import partial_dependence, permutation_importance
# When that diagnostic column is not available (e.g., reporting from partial datasets),
# we fall back to the weaker heuristic |Σ shaping| < PBRS_INVARIANCE_TOL.
PBRS_INVARIANCE_TOL: float = 1e-6
-# Default discount factor γ for potential-based reward shaping
+# Default discount factor γ for potential-based reward shaping # noqa: RUF003
POTENTIAL_GAMMA_DEFAULT: float = 0.95
# Default risk/reward ratio (RR)
RISK_REWARD_RATIO_DEFAULT: float = 2.0
# Supported attenuation modes
-ATTENUATION_MODES: Tuple[str, ...] = ("sqrt", "linear", "power", "half_life")
-ATTENUATION_MODES_WITH_LEGACY: Tuple[str, ...] = ("legacy",) + ATTENUATION_MODES
+ATTENUATION_MODES: tuple[str, ...] = ("sqrt", "linear", "power", "half_life")
+ATTENUATION_MODES_WITH_LEGACY: tuple[str, ...] = ("legacy", *ATTENUATION_MODES)
# Internal numeric guards and behavior toggles
INTERNAL_GUARDS: dict[str, float] = {
}
# Supported trading modes
-TRADING_MODES: Tuple[str, ...] = ("spot", "margin", "futures")
+TRADING_MODES: tuple[str, ...] = ("spot", "margin", "futures")
# Supported p-value adjustment methods
-ADJUST_METHODS: Tuple[str, ...] = ("none", "benjamini_hochberg")
+ADJUST_METHODS: tuple[str, ...] = ("none", "benjamini_hochberg")
# Alias without underscore for convenience
_ADJUST_METHODS_ALIASES: frozenset[str] = frozenset({"benjaminihochberg"})
"exit_factor_threshold": 1000.0,
# === PBRS PARAMETERS ===
# Potential-based reward shaping core parameters
- # Discount factor γ for potential term (0 ≤ γ ≤ 1)
+ # Discount factor γ for potential term (0 ≤ γ ≤ 1) # noqa: RUF003
"potential_gamma": POTENTIAL_GAMMA_DEFAULT,
# Exit potential modes: canonical | non_canonical | progressive_release | spike_cancel | retain_previous
"exit_potential_mode": "canonical",
"exit_additive_transform_duration": "tanh",
}
-DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
+DEFAULT_MODEL_REWARD_PARAMETERS_HELP: dict[str, str] = {
"invalid_action": "Penalty for invalid actions",
"base_factor": "Base reward scale",
"idle_penalty_power": "Idle penalty exponent",
"check_invariants": "Enable runtime invariant checks",
"exit_factor_threshold": "Warn if |exit_factor| exceeds",
# PBRS parameters
- "potential_gamma": "PBRS discount γ (0–1)",
+ "potential_gamma": "PBRS discount γ (0-1)", # noqa: RUF001
"exit_potential_mode": "Exit potential mode (canonical|non_canonical|progressive_release|spike_cancel|retain_previous)",
- "exit_potential_decay": "Decay for progressive_release (0–1)",
+ "exit_potential_decay": "Decay for progressive_release (0-1)",
"hold_potential_enabled": "Enable hold potential Φ",
"hold_potential_ratio": "Hold potential ratio",
"hold_potential_gain": "Hold potential gain",
# Parameter validation utilities
# ---------------------------------------------------------------------------
-_PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = {
+_PARAMETER_BOUNDS: dict[str, dict[str, float]] = {
# key: {min: ..., max: ...} (bounds are inclusive where it makes sense)
"invalid_action": {"max": 0.0}, # penalty should be <= 0
"base_factor": {"min": 0.0},
"exit_additive_gain": {"min": 0.0},
}
-RewardParamValue = Union[float, str, bool, None]
-RewardParams = Dict[str, RewardParamValue]
+RewardParamValue = float | str | bool | None
+RewardParams = dict[str, RewardParamValue]
class RewardDiagnosticsWarning(RuntimeWarning):
raise ValueError(f"Param: unrecognized boolean literal {value!r}")
-def _get_bool_param(params: RewardParams, key: str, default: Optional[bool] = None) -> bool:
+def _get_bool_param(params: RewardParams, key: str, default: bool | None = None) -> bool:
"""Extract boolean parameter with type safety.
Args:
def _get_float_param(
- params: RewardParams, key: str, default: Optional[RewardParamValue] = None
+ params: RewardParams, key: str, default: RewardParamValue | None = None
) -> float:
"""Extract float parameter with type safety and default fallback.
key: str,
value: float,
*,
- bounds: Optional[Dict[str, float]] = None,
+ bounds: dict[str, float] | None = None,
strict: bool,
) -> tuple[float, list[str]]:
"""Clamp numeric `value` to bounds for `key`.
return adjusted, reason_parts
-def _get_int_param(
- params: RewardParams, key: str, default: Optional[RewardParamValue] = None
-) -> int:
+def _get_int_param(params: RewardParams, key: str, default: RewardParamValue | None = None) -> int:
"""Extract integer parameter with robust coercion.
Args:
return int(default) if isinstance(default, (int, float)) else 0
-def _get_str_param(params: RewardParams, key: str, default: Optional[str] = None) -> str:
+def _get_str_param(params: RewardParams, key: str, default: str | None = None) -> str:
"""Extract string parameter with type safety and default fallback.
Args:
def get_max_idle_duration_candles(
params: RewardParams,
*,
- max_trade_duration_candles: Optional[int] = None,
+ max_trade_duration_candles: int | None = None,
) -> int:
mtd = (
int(max_trade_duration_candles)
def validate_reward_parameters(
params: RewardParams,
strict: bool = True,
-) -> Tuple[RewardParams, Dict[str, Dict[str, Any]]]:
+) -> tuple[RewardParams, dict[str, dict[str, Any]]]:
"""Clamp parameters to bounds and coerce booleans and numeric overrides.
Returns a sanitized copy plus adjustments mapping (param -> original/adjusted/reason).
- Numeric-bounded keys are coerced to float when provided as str/bool/None.
* In strict mode: raise on non-numeric or out-of-bounds.
* In relaxed mode: fallback to min bound or 0.0 with adjustment reason.
- - Non‑finite numerics fall back to min bound or 0.0 (relaxed) or raise (strict).
+ - Non-finite numerics fall back to min bound or 0.0 (relaxed) or raise (strict).
"""
sanitized = dict(params)
- adjustments: Dict[str, Dict[str, Any]] = {}
+ adjustments: dict[str, dict[str, Any]] = {}
# Boolean parameter coercion
_bool_keys = [
if not np.isclose(adjusted, original_numeric):
sanitized[key] = adjusted
prev_reason = adjustments.get(key, {}).get("reason")
- reason: List[str] = []
+ reason: list[str] = []
if prev_reason:
reason.append(prev_reason)
reason.extend(reason_parts)
next_potential: float = 0.0
# PBRS helpers
base_reward: float = 0.0
- pbrs_delta: float = 0.0 # Δ(s,a,s') = γ·Φ(s') − Φ(s)
+ pbrs_delta: float = 0.0 # Δ(s,a,s') = γ·Φ(s') − Φ(s) # noqa: RUF003
invariance_correction: float = 0.0
else:
effective_dr = duration_ratio
- kernel = kernels.get(exit_attenuation_mode, None)
+ kernel = kernels.get(exit_attenuation_mode)
if kernel is None:
_warn_unknown_mode(
"exit_attenuation_mode",
"""
Compute exit reward factor by applying multiplicative coefficients to base_factor.
- Formula: exit_factor = base_factor × time_attenuation_coefficient × pnl_target_coefficient × efficiency_coefficient
+ Formula: exit_factor = base_factor * time_attenuation_coefficient * pnl_target_coefficient * efficiency_coefficient
Args:
base_factor: Base reward value before coefficient adjustments
pnl: Realized profit/loss
- pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
+ pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio)
duration_ratio: Trade duration relative to target duration
context: Trade context with unrealized profit/loss extremes
params: Reward configuration parameters
if exit_factor < 0.0 and pnl >= 0.0:
exit_factor = 0.0
exit_factor_threshold = _get_float_param(params, "exit_factor_threshold")
- if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold):
+ if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold): # noqa: SIM102
if abs(exit_factor) > exit_factor_threshold:
warnings.warn(
f"|exit_factor|={abs(exit_factor):.2f} > threshold={exit_factor_threshold:.2f}",
Args:
params: Reward configuration parameters
pnl: Realized profit/loss
- pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
+ pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio)
risk_reward_ratio: Risk/reward ratio for loss penalty calculation
Returns:
Args:
base_factor: Base reward value before coefficient adjustments
- pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
+ pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio)
duration_ratio: Trade duration relative to target duration
context: Trade context with PnL and unrealized profit/loss extremes
params: Reward configuration parameters
risk_reward_ratio: Risk/reward ratio (must match the value used to calculate pnl_target)
Returns:
- float: Exit reward (pnl × exit_factor)
+ float: Exit reward (pnl * exit_factor)
"""
exit_factor = _get_exit_factor(
base_factor, context.pnl, pnl_target, duration_ratio, context, params, risk_reward_ratio
short_allowed=short_allowed,
)
- base_reward: Optional[float] = None
+ base_reward: float | None = None
if not is_valid and not action_masking:
breakdown.invalid_penalty = _get_float_param(params, "invalid_action")
base_reward = breakdown.invalid_penalty
)
max_trade_duration_cap = int(max_trade_duration_candles * max_duration_ratio)
- samples: list[Dict[str, float]] = []
+ samples: list[dict[str, float]] = []
prev_potential: float = 0.0
# Stateful trajectory variables
)
-def _compute_summary_stats(df: pd.DataFrame) -> Dict[str, Any]:
+def _compute_summary_stats(df: pd.DataFrame) -> dict[str, Any]:
"""Compute summary statistics without writing to file."""
action_summary = df.groupby("action")["reward"].agg(["count", "mean", "std", "min", "max"])
component_share = df[
return aggregated
-def _compute_relationship_stats(df: pd.DataFrame) -> Dict[str, Any]:
+def _compute_relationship_stats(df: pd.DataFrame) -> dict[str, Any]:
"""Return binned stats dict for idle, trade duration and pnl (uniform bins).
Defensive against missing optional columns (e.g., reward_invalid when synthetic
df: pd.DataFrame,
profit_aim: float,
risk_reward_ratio: float,
-) -> Dict[str, Any]:
+) -> dict[str, Any]:
"""Compute representativity statistics for the reward space."""
pnl_target = float(profit_aim * risk_reward_ratio)
total = len(df)
skip_partial_dependence: bool = False,
rf_n_jobs: int = 1,
perm_n_jobs: int = 1,
-) -> Tuple[pd.DataFrame, Dict[str, Any], Dict[str, pd.DataFrame], Optional[RandomForestRegressor]]:
+) -> tuple[pd.DataFrame, dict[str, Any], dict[str, pd.DataFrame], RandomForestRegressor | None]:
"""Compute feature importances using RandomForestRegressor.
Parameters
n_test=0,
)
- model: Optional[RandomForestRegressor] = RandomForestRegressor(
+ model: RandomForestRegressor | None = RandomForestRegressor(
n_estimators=400,
max_depth=None,
random_state=seed,
)
# Partial dependence (optional)
- partial_deps: Dict[str, pd.DataFrame] = {}
+ partial_deps: dict[str, pd.DataFrame] = {}
if model is not None and not skip_partial_dependence:
for feature in [
f for f in ["trade_duration", "idle_duration", "pnl"] if f in X_test.columns
else:
try:
df = pd.DataFrame(list(candidate))
- except TypeError:
+ except TypeError as e:
raise ValueError(
f"Data: 'transitions' in '{path}' is not iterable (type {type(candidate)!r})"
- )
+ ) from e
except Exception as e:
raise ValueError(
f"Data: could not build DataFrame from 'transitions' in '{path}': {e!r}"
else:
try:
all_transitions.extend(list(trans))
- except TypeError:
+ except TypeError as e:
raise ValueError(
f"Data: episode 'transitions' is not iterable in '{path}' (type {type(trans)!r})"
- )
+ ) from e
else:
skipped += 1
if skipped:
if enforce_columns:
raise ValueError(
f"Data: missing required columns {sorted(missing_required)}. "
- f"Found: {sorted(list(df.columns))}"
+ f"Found: {sorted(df.columns)}"
)
warnings.warn(
f"Missing columns {sorted(missing_required)}; filled with NaN when loading (enforce_columns=False)",
def compute_distribution_shift_metrics(
synthetic_df: pd.DataFrame,
real_df: pd.DataFrame,
-) -> Dict[str, float]:
+) -> dict[str, float]:
"""Compute distribution shift metrics between synthetic and real samples.
Returns KL divergence, JS distance, Wasserstein distance, and KS test
return metrics
-def _validate_distribution_metrics(metrics: Dict[str, float]) -> None:
+def _validate_distribution_metrics(metrics: dict[str, float]) -> None:
"""Validate mathematical bounds of distribution shift metrics."""
for key, value in metrics.items():
if not np.isfinite(value):
raise AssertionError(f"KL divergence {key} must be >= 0, got {value:.6f}")
# JS distance must be in [0, 1]
- if "js_distance" in key:
- if not (0 <= value <= 1):
- raise AssertionError(f"JS distance {key} must be in [0,1], got {value:.6f}")
+ if "js_distance" in key and not (0 <= value <= 1):
+ raise AssertionError(f"JS distance {key} must be in [0,1], got {value:.6f}")
# Wasserstein distance must be >= 0
if "wasserstein" in key and value < 0:
raise AssertionError(f"Wasserstein distance {key} must be >= 0, got {value:.6f}")
# KS statistic must be in [0, 1]
- if "ks_statistic" in key:
- if not (0 <= value <= 1):
- raise AssertionError(f"KS statistic {key} must be in [0,1], got {value:.6f}")
+ if "ks_statistic" in key and not (0 <= value <= 1):
+ raise AssertionError(f"KS statistic {key} must be in [0,1], got {value:.6f}")
# p-values must be in [0, 1]
- if "pvalue" in key:
- if not (0 <= value <= 1):
- raise AssertionError(f"p-value {key} must be in [0,1], got {value:.6f}")
+ if "pvalue" in key and not (0 <= value <= 1):
+ raise AssertionError(f"p-value {key} must be in [0,1], got {value:.6f}")
def statistical_hypothesis_tests(
df: pd.DataFrame, *, adjust_method: str = ADJUST_METHODS[0], seed: int = 42
-) -> Dict[str, Any]:
+) -> dict[str, Any]:
"""Statistical hypothesis tests (Spearman, Kruskal-Wallis, Mann-Whitney).
Parameters
adj_final = np.empty_like(adj_sorted)
adj_final[order] = np.clip(adj_sorted, 0, 1)
# Attach adjusted p-values and recompute significance
- for (name, res), p_adj in zip(items, adj_final):
+ for (name, res), p_adj in zip(items, adj_final, strict=False):
res["p_value_adj"] = float(p_adj)
res["significant_adj"] = bool(p_adj < alpha)
results[name] = res
return results
-def _validate_hypothesis_test_results(results: Dict[str, Any]) -> None:
+def _validate_hypothesis_test_results(results: dict[str, Any]) -> None:
"""Validate statistical properties of hypothesis test results."""
for test_name, result in results.items():
# All p-values must be in [0, 1] or NaN (for cases like constant input)
def bootstrap_confidence_intervals(
df: pd.DataFrame,
- metrics: List[str],
+ metrics: list[str],
n_bootstrap: int = 10000,
confidence_level: float = 0.95,
seed: int = 42,
*,
strict_diagnostics: bool = False,
-) -> Dict[str, Tuple[float, float, float]]:
+) -> dict[str, tuple[float, float, float]]:
"""Compute bootstrap confidence intervals for metric means.
Returns percentile-based CIs, skipping metrics with <10 samples.
warnings.warn(
f"n_bootstrap={n_bootstrap} < {min_rec}; confidence intervals may be unstable",
RewardDiagnosticsWarning,
+ stacklevel=2,
)
# Local RNG to avoid mutating global NumPy RNG state
def _validate_bootstrap_results(
- results: Dict[str, Tuple[float, float, float]], *, strict_diagnostics: bool
+ results: dict[str, tuple[float, float, float]], *, strict_diagnostics: bool
) -> None:
"""Validate each bootstrap CI: finite bounds, ordered, positive width (adjust or raise)."""
for metric, (mean, ci_low, ci_high) in results.items():
if strict_diagnostics:
raise AssertionError(f"Bootstrap CI for {metric}: non-positive width {width:.6f}")
# Graceful mode: expand interval symmetrically
- if width == 0:
- epsilon = INTERNAL_GUARDS["degenerate_ci_epsilon"]
- else:
- epsilon = abs(width) * 1e-6
+ epsilon = INTERNAL_GUARDS["degenerate_ci_epsilon"] if width == 0 else abs(width) * 1e-06
center = mean
# Adjust only if current bounds are identical; otherwise enforce ordering minimally.
if ci_low == ci_high:
warnings.warn(
f"bootstrap_ci for '{metric}' degenerate (width={width:.6e}); adjusted with epsilon={epsilon:.1e}",
RewardDiagnosticsWarning,
+ stacklevel=2,
)
*,
seed: int | None = None,
strict_diagnostics: bool = False,
-) -> Dict[str, Any]:
+) -> dict[str, Any]:
"""Return mapping col-> diagnostics (tests, moments, entropy, divergences).
Skips missing columns; selects Shapiro-Wilk when n<=5000 else K2; ignores non-finite intermediates.
msg = f"Extreme moment(s) for {col}: skew={skew_v:.3e}, kurtosis={kurt_v:.3e} exceeds threshold {thr}."
if strict_diagnostics:
raise AssertionError(msg)
- warnings.warn(msg, RewardDiagnosticsWarning)
+ warnings.warn(msg, RewardDiagnosticsWarning, stacklevel=2)
if len(data) < 5000:
sw_stat, sw_pval = stats.shapiro(data)
return diagnostics
-def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnostics: bool) -> None:
+def _validate_distribution_diagnostics(diag: dict[str, Any], *, strict_diagnostics: bool) -> None:
"""Validate mathematical properties of distribution diagnostics.
Ensures all reported statistics are finite and within theoretical bounds where applicable.
zero_var_columns.add(prefix)
for key, value in list(diag.items()):
- if any(suffix in key for suffix in ["_mean", "_std", "_skewness", "_kurtosis"]):
+ if any(suffix in key for suffix in ["_mean", "_std", "_skewness", "_kurtosis"]): # noqa: SIM102
if not np.isfinite(value):
# Graceful degradation for constant distributions: skewness/kurtosis become NaN.
constant_problem = any(
warnings.warn(
f"{key} undefined (constant distribution); falling back to {fallback}",
RewardDiagnosticsWarning,
+ stacklevel=2,
)
else:
raise AssertionError(f"Distribution diagnostic {key} is not finite: {value}")
- if key.endswith("_shapiro_pval"):
- if not (0 <= value <= 1):
- raise AssertionError(f"Shapiro p-value {key} must be in [0,1], got {value}")
- if key.endswith("_anderson_stat") or key.endswith("_anderson_critical_5pct"):
+ if key.endswith("_shapiro_pval") and not (0 <= value <= 1):
+ raise AssertionError(f"Shapiro p-value {key} must be in [0,1], got {value}")
+ if key.endswith("_anderson_stat") or key.endswith("_anderson_critical_5pct"): # noqa: SIM102
if not np.isfinite(value):
prefix = key.rsplit("_", 2)[0]
if prefix in zero_var_columns and not strict_diagnostics:
warnings.warn(
f"{key} undefined (constant distribution); falling back to {fallback}",
RewardDiagnosticsWarning,
+ stacklevel=2,
)
continue
raise AssertionError(f"Anderson statistic {key} must be finite, got {value}")
- if key.endswith("_qq_r_squared"):
+ if key.endswith("_qq_r_squared"): # noqa: SIM102
if not (isinstance(value, (int, float)) and np.isfinite(value) and 0 <= value <= 1):
prefix = key[: -len("_qq_r_squared")]
if prefix in zero_var_columns and not strict_diagnostics:
warnings.warn(
f"{key} undefined (constant distribution); falling back to {fallback_r2}",
RewardDiagnosticsWarning,
+ stacklevel=2,
)
else:
raise AssertionError(f"Q-Q R^2 {key} must be in [0,1], got {value}")
def _apply_transform_sigmoid(value: float) -> float:
- """sigmoid: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1)."""
+ """sigmoid: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1).""" # noqa: RUF002
x = value
try:
if x >= 0:
R'(s,a,s') = R(s,a,s') + Δ(s,a,s')
where:
- Δ(s,a,s') = γ·Φ(s') - Φ(s) (PBRS shaping term)
+ Δ(s,a,s') = gamma * Phi(s') - Phi(s) (PBRS shaping term)
Hold Potential Formula
----------------------
Let:
r_pnl = pnl / pnl_target
- r_dur = clamp(duration_ratio, 0, 1)
+ r_dur = max(duration_ratio, 0)
scale = base_factor · hold_potential_ratio
g = gain
T_pnl, T_dur = configured bounded transforms
non_finite_key: str,
*,
base_factor: float,
- risk_reward_ratio: Optional[float] = None,
+ risk_reward_ratio: float | None = None,
) -> float:
"""Generic helper for (pnl, duration) bi-component transforms."""
if not (np.isfinite(pnl) and np.isfinite(pnl_target) and np.isfinite(duration_ratio)):
return _fail_safely(f"{kind}_invalid_pnl_target")
pnl_ratio = float(pnl / pnl_target)
- duration_ratio = float(np.clip(duration_ratio, 0.0, 1.0))
+ duration_ratio = float(max(0.0, duration_ratio))
ratio = _get_float_param(params, scale_key)
scale = ratio * base_factor
profit_aim: float,
risk_reward_ratio: float,
seed: int,
- real_df: Optional[pd.DataFrame] = None,
+ real_df: pd.DataFrame | None = None,
*,
adjust_method: str = ADJUST_METHODS[0],
- stats_seed: Optional[int] = None,
+ stats_seed: int | None = None,
strict_diagnostics: bool = False,
bootstrap_resamples: int = 10000,
skip_partial_dependence: bool = False,
sep += "|" + "-" * (len(str(c)) + 2)
sep += "|\n"
# Rows
- rows: List[str] = []
+ rows: list[str] = []
for idx, row in df.iterrows():
vals = [_fmt_val(row[c], ndigits) for c in cols]
rows.append("| " + str(idx) + " | " + " | ".join(vals) + " |")
# Blank separator before overrides block
f.write("| | |\n")
- overrides_pairs: List[str] = []
+ overrides_pairs: list[str] = []
if reward_params:
for k, default_v in DEFAULT_MODEL_REWARD_PARAMETERS.items():
if k in ("exit_potential_mode", "potential_gamma"):
f.write("### 1.3 Component Activation Rates\n\n")
f.write("Percentage of samples where each reward component is non-zero:\n\n")
comp_share = summary_stats["component_share"].copy()
- formatted_rows: List[str] = [
+ formatted_rows: list[str] = [
"| Component | Activation Rate |",
"|-----------|----------------|",
]
f.write(_df_to_md(corr_df, index_name=corr_df.index.name, ndigits=4))
_dropped = relationship_stats.get("correlation_dropped") or []
if _dropped:
- dropped_strs: List[str] = [str(x) for x in _dropped]
+ dropped_strs: list[str] = [str(x) for x in _dropped]
f.write("\n_Constant features removed: " + ", ".join(dropped_strs) + "._\n\n")
# Section 3.5: PBRS Analysis
f.write("|--------|-------|-------------|\n")
f.write(f"| Mean Base Reward | {mean_base:.6f} | Average reward before PBRS |\n")
f.write(f"| Std Base Reward | {std_base:.6f} | Variability of base reward |\n")
- f.write(f"| Mean PBRS Delta | {mean_pbrs:.6f} | Average γ·Φ(s')−Φ(s) |\n")
+ f.write(f"| Mean PBRS Delta | {mean_pbrs:.6f} | Average γ·Φ(s')−Φ(s) |\n") # noqa: RUF001
f.write(f"| Std PBRS Delta | {std_pbrs:.6f} | Variability of PBRS delta |\n")
f.write(
- f"| Mean Invariance Correction | {mean_inv_corr:.6f} | Average reward_shaping − pbrs_delta |\n"
+ f"| Mean Invariance Correction | {mean_inv_corr:.6f} | Average reward_shaping − pbrs_delta |\n" # noqa: RUF001
)
f.write(
f"| Std Invariance Correction | {std_inv_corr:.6f} | Variability of correction |\n"
# Render as markdown without index column
header = "| feature | importance_mean | importance_std |\n"
sep = "|---------|------------------|----------------|\n"
- rows: List[str] = []
+ rows: list[str] = []
for _, r in top_imp.iterrows():
rows.append(
f"| {r['feature']} | {_fmt_val(r['importance_mean'], 6)} | {_fmt_val(r['importance_std'], 6)} |"
h = hypothesis_tests["idle_correlation"]
f.write("#### 5.1.1 Idle Duration → Idle Penalty Correlation\n\n")
f.write(f"**Test Method:** {h['test']}\n\n")
- f.write(f"- Spearman ρ: **{h['rho']:.4f}**\n")
+ f.write(f"- Spearman ρ: **{h['rho']:.4f}**\n") # noqa: RUF001
f.write(f"- p-value: {h['p_value']:.4g}\n")
if "p_value_adj" in h:
f.write(
- f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
+ f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" # noqa: RUF001
)
f.write(f"- 95% CI: [{h['ci_95'][0]:.4f}, {h['ci_95'][1]:.4f}]\n")
f.write(f"- CI width: {(h['ci_95'][1] - h['ci_95'][0]):.4f}\n")
f.write(f"- Sample size: {h['n_samples']:,}\n")
- f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n")
+ f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n") # noqa: RUF001
f.write(f"- **Interpretation:** {h['interpretation']}\n\n")
if "position_reward_difference" in hypothesis_tests:
f.write(f"- p-value: {h['p_value']:.4g}\n")
if "p_value_adj" in h:
f.write(
- f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
+ f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" # noqa: RUF001
)
f.write(f"- Effect size (ε²): {h['effect_size_epsilon_sq']:.4f}\n")
f.write(f"- Number of groups: {h['n_groups']}\n")
- f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n")
+ f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n") # noqa: RUF001
f.write(f"- **Interpretation:** {h['interpretation']} effect\n\n")
if "pnl_sign_reward_difference" in hypothesis_tests:
f.write(f"- p-value: {h['p_value']:.4g}\n")
if "p_value_adj" in h:
f.write(
- f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
+ f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" # noqa: RUF001
)
f.write(f"- Median (PnL+): {h['median_pnl_positive']:.4f}\n")
f.write(f"- Median (PnL-): {h['median_pnl_negative']:.4f}\n")
- f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n\n")
+ f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n\n") # noqa: RUF001
# Bootstrap CI
if bootstrap_ci:
"action_masking",
]
- sim_params: Dict[str, Any] = {}
+ sim_params: dict[str, Any] = {}
for k in candidate_keys:
if k in args_dict:
v = args_dict[k]
# Generate manifest summarizing key metrics
try:
manifest_path = args.out_dir / "manifest.json"
- resolved_reward_params: Dict[str, Any] = dict(
+ resolved_reward_params: dict[str, Any] = dict(
params
) # already validated/normalized upstream
- manifest: Dict[str, Any] = {
+ manifest: dict[str, Any] = {
"generated_at": pd.Timestamp.now().isoformat(),
- "num_samples": int(len(df)),
+ "num_samples": len(df),
"seed": int(args.seed),
"pnl_target": float(profit_aim * risk_reward_ratio),
"pvalue_adjust_method": args.pvalue_adjust,
sim_params_dict = df.attrs.get("simulation_params", {})
if not isinstance(sim_params_dict, dict):
sim_params_dict = {}
- sim_params: Dict[str, Any] = dict(sim_params_dict)
+ sim_params: dict[str, Any] = dict(sim_params_dict)
if sim_params:
excluded_for_hash = {"out_dir", "real_episodes"}
- sim_params_for_hash: Dict[str, Any] = {
+ sim_params_for_hash: dict[str, Any] = {
k: sim_params[k] for k in sim_params if k not in excluded_for_hash
}
- _hash_source: Dict[str, Any] = {
+ _hash_source: dict[str, Any] = {
**{f"sim::{k}": sim_params_for_hash[k] for k in sorted(sim_params_for_hash)},
**{
f"reward::{k}": resolved_reward_params[k]
from __future__ import annotations
import argparse
+import contextlib
import itertools
import json
import math
import tempfile
import time
from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple, TypedDict
+from typing import Any, TypedDict
try:
from typing import NotRequired, Required # Python >=3.11
except ImportError:
- from typing_extensions import NotRequired, Required # Python <3.11
+ from typing import NotRequired, Required # Python <3.11
-ConfigTuple = Tuple[str, str, float, int, int, int]
+ConfigTuple = tuple[str, str, float, int, int, int]
SUMMARY_FILENAME = "reward_space_cli.json"
stdout: str
stderr: str
strict: bool
- seconds: Optional[float]
+ seconds: float | None
warnings: int
class SummaryResult(TypedDict, total=False):
# Required keys
total: Required[int]
- successes: Required[List[ScenarioResult]]
- failures: Required[List[ScenarioResult]]
- mean_seconds: Required[Optional[float]]
- max_seconds: Required[Optional[float]]
- min_seconds: Required[Optional[float]]
- median_seconds: Required[Optional[float]]
- p95_seconds: Required[Optional[float]]
+ successes: Required[list[ScenarioResult]]
+ failures: Required[list[ScenarioResult]]
+ mean_seconds: Required[float | None]
+ max_seconds: Required[float | None]
+ min_seconds: Required[float | None]
+ median_seconds: Required[float | None]
+ p95_seconds: Required[float | None]
# Extension keys
- warnings_breakdown: NotRequired[Dict[str, int]]
- seeds: NotRequired[Dict[str, Any]]
- metadata: NotRequired[Dict[str, Any]]
+ warnings_breakdown: NotRequired[dict[str, int]]
+ seeds: NotRequired[dict[str, Any]]
+ metadata: NotRequired[dict[str, Any]]
interrupted: NotRequired[bool]
def build_arg_matrix(
max_scenarios: int = 40,
- shuffle_seed: Optional[int] = None,
-) -> List[ConfigTuple]:
+ shuffle_seed: int | None = None,
+) -> list[ConfigTuple]:
exit_potential_modes = [
"canonical",
"non_canonical",
exit_additive_enabled,
)
- full: List[ConfigTuple] = list(product_iter)
+ full: list[ConfigTuple] = list(product_iter)
full = [c for c in full if not (c[0] == "canonical" and (c[4] == 1 or c[5] == 1))]
if shuffle_seed is not None:
rnd = random.Random(shuffle_seed)
return full
step = len(full) / max_scenarios
idx_pos = step / 2.0 # Centered sampling
- selected: List[ConfigTuple] = []
+ selected: list[ConfigTuple] = []
selected_indices: set[int] = set()
for _ in range(max_scenarios):
- idx = int(round(idx_pos))
+ idx = round(idx_pos)
if idx < 0:
idx = 0
elif idx >= len(full):
skip_partial_dependence: bool = False,
unrealized_pnl: bool = False,
full_logs: bool = False,
- params: Optional[List[str]] = None,
+ params: list[str] | None = None,
tail_chars: int = 5000,
) -> ScenarioResult:
(
if strict:
cmd.append("--strict_diagnostics")
if params:
- cmd += ["--params"] + list(params)
+ cmd += ["--params", *list(params)]
start = time.perf_counter()
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=timeout)
scenarios = build_arg_matrix(max_scenarios=args.max_scenarios, shuffle_seed=args.shuffle_seed)
# Validate --params basic KEY=VALUE format
- valid_params: List[str] = []
- invalid_params: List[str] = []
+ valid_params: list[str] = []
+ invalid_params: list[str] = []
for p in args.params:
if "=" in p:
valid_params.append(p)
args.params = valid_params
# Prepare list of (conf, strict)
- scenario_pairs: List[Tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios]
+ scenario_pairs: list[tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios]
indices = {conf: idx for idx, conf in enumerate(scenarios, start=1)}
n_duplicated = min(max(0, args.strict_sample), len(scenarios))
if n_duplicated > 0:
for c in scenarios[:n_duplicated]:
scenario_pairs.append((c, True))
- results: List[ScenarioResult] = []
+ results: list[ScenarioResult] = []
total = len(scenario_pairs)
interrupted = False
try:
successes = [r for r in results if r["status"] == "ok"]
failures = [r for r in results if r["status"] != "ok"]
- durations: List[float] = [
+ durations: list[float] = [
float(r["seconds"]) for r in results if isinstance(r["seconds"], float)
]
if durations:
p95_seconds = _sorted[0]
else:
pos = 0.95 * (n - 1)
- i0 = int(math.floor(pos))
- i1 = int(math.ceil(pos))
+ i0 = math.floor(pos)
+ i1 = math.ceil(pos)
if i0 == i1:
p95_seconds = _sorted[i0]
else:
"p95_seconds": p95_seconds,
}
# Build warnings breakdown
- warnings_breakdown: Dict[str, int] = {}
+ warnings_breakdown: dict[str, int] = {}
for r in results:
text = (r["stderr"] + "\n" + r["stdout"]).splitlines()
for line in text:
warnings_breakdown[fp] = warnings_breakdown.get(fp, 0) + 1
# Collect reproducibility metadata
- def _git_hash() -> Optional[str]:
+ def _git_hash() -> str | None:
try:
proc = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
summary["interrupted"] = True
# Atomic write to avoid corrupt partial files
tmp_fd, tmp_path = tempfile.mkstemp(prefix="_tmp_summary_", dir=str(out_dir))
+ tmp_path_obj = Path(tmp_path)
try:
with os.fdopen(tmp_fd, "w", encoding="utf-8") as fh:
json.dump(summary, fh, indent=2)
- os.replace(tmp_path, out_dir / SUMMARY_FILENAME)
+ tmp_path_obj.replace(out_dir / SUMMARY_FILENAME)
except Exception:
# Best effort fallback
try:
json.dumps(summary, indent=2), encoding="utf-8"
)
finally:
- if os.path.exists(tmp_path):
- try:
- os.remove(tmp_path)
- except OSError:
- pass
+ if tmp_path_obj.exists():
+ with contextlib.suppress(OSError):
+ tmp_path_obj.unlink()
else:
# Defensive cleanup: remove temp file if atomic replace did not clean up
- if os.path.exists(tmp_path):
- try:
- os.remove(tmp_path)
- except OSError:
- pass
+ if tmp_path_obj.exists():
+ with contextlib.suppress(OSError):
+ tmp_path_obj.unlink()
print(f"Summary saved to: {out_dir / SUMMARY_FILENAME}")
if not interrupted and summary["failures"]:
print("Failures detected:")
self.assertTrue(math.isnan(_get_float_param({"k": float("-inf")}, "k", 0.0)))
self.assertTrue(math.isnan(_get_float_param({"k": np.nan}, "k", 0.0)))
self.assertTrue(
- math.isnan(_get_float_param(cast(RewardParams, {"k": cast(Any, [1, 2, 3])}), "k", 0.0))
+ math.isnan(
+ _get_float_param(cast("RewardParams", {"k": cast("Any", [1, 2, 3])}), "k", 0.0)
+ )
)
def test_get_str_param(self):
self.assertEqual(_get_int_param({"k": ""}, "k", 5), 5)
self.assertEqual(_get_int_param({"k": "abc"}, "k", 5), 5)
self.assertEqual(_get_int_param({"k": "NaN"}, "k", 5), 5)
- self.assertEqual(_get_int_param(cast(RewardParams, {"k": cast(Any, [1, 2, 3])}), "k", 3), 3)
+ self.assertEqual(
+ _get_int_param(cast("RewardParams", {"k": cast("Any", [1, 2, 3])}), "k", 3), 3
+ )
self.assertEqual(_get_int_param({}, "missing", "zzz"), 0)
def test_argument_parser_construction(self):
self.assertIn("action", df.columns)
values = df["action"].tolist()
self.assertTrue(
- all((float(v).is_integer() for v in values)),
+ all(float(v).is_integer() for v in values),
"Non-integer values detected in 'action' column",
)
allowed = {int(action.value) for action in Actions}
- self.assertTrue(set((int(v) for v in values)).issubset(allowed))
+ self.assertTrue({int(v) for v in values}.issubset(allowed))
class TestParamsPropagation(RewardSpaceTestBase):
_assert_cli_success(self, result)
manifest_path = out_dir / "manifest.json"
self.assertTrue(manifest_path.exists(), "Missing manifest.json")
- with open(manifest_path, "r") as f:
+ with manifest_path.open() as f:
manifest = json.load(f)
self.assertIn("reward_params", manifest)
self.assertIn("simulation_params", manifest)
_assert_cli_success(self, result)
manifest_path = out_dir / "manifest.json"
self.assertTrue(manifest_path.exists(), "Missing manifest.json")
- with open(manifest_path, "r") as f:
+ with manifest_path.open() as f:
manifest = json.load(f)
self.assertIn("reward_params", manifest)
self.assertIn("simulation_params", manifest)
**Setup:**
- PnL: 0.0 (breakeven)
- - pnl_target: profit_aim × risk_reward_ratio
+ - pnl_target: profit_aim * risk_reward_ratio
- Parameters: default base_params
**Assertions:**
**Setup:**
- PnL: 150% of pnl_target (exceeds target by 50%)
- - pnl_target: 0.045 (profit_aim=0.03 × risk_reward_ratio=1.5)
+ - pnl_target: 0.045 (profit_aim=0.03 * risk_reward_ratio=1.5)
- Parameters: win_reward_factor=2.0, pnl_factor_beta=0.5
**Assertions:**
**Setup:**
- PnL: -0.06 (exceeds pnl_target magnitude)
- - pnl_target: 0.045 (profit_aim=0.03 × risk_reward_ratio=1.5)
+ - pnl_target: 0.045 (profit_aim=0.03 * risk_reward_ratio=1.5)
- Penalty threshold: pnl < -pnl_target = -0.045
- Parameters: win_reward_factor=2.0, pnl_factor_beta=0.5
**Setup:**
- PnL: -0.005 (very close to min_unrealized_profit=-0.006)
- Efficiency ratio: (-0.005 - (-0.006)) / (0.0 - (-0.006)) ≈ 0.167 (low)
- - For losses: coefficient = 1 + weight × (center - ratio) → rewards low ratio
+ - For losses: coefficient = 1 + weight * (center - ratio) → rewards low ratio
- efficiency_weight: 1.0, efficiency_center: 0.5
- Trade context: Long position cutting losses quickly
pnl_ratio = pnl / pnl_target
expected = 1.0 + win_reward_factor * math.tanh(beta * (pnl_ratio - 1.0))
expected_ratios.append(expected)
- for obs, exp in zip(ratios_observed, expected_ratios):
+ for obs, exp in zip(ratios_observed, expected_ratios, strict=False):
self.assertFinite(obs, name="observed_ratio")
self.assertFinite(exp, name="expected_ratio")
self.assertLess(
Verifies:
- max_idle_duration = None → use max_trade_duration as fallback
- - penalty(duration=40) ≈ 2 × penalty(duration=20)
+ - penalty(duration=40) ≈ 2 * penalty(duration=20)
- Proportional scaling with idle duration
"""
base_factor = PARAMS.BASE_FACTOR
__all__ = [
- "ToleranceConfig",
+ "CONTINUITY",
+ "EXIT_FACTOR",
+ "PARAMS",
+ "PBRS",
+ "SCENARIOS",
+ "SEEDS",
+ "STATISTICAL",
+ "STAT_TOL",
+ "TOLERANCE",
"ContinuityConfig",
"ExitFactorConfig",
"PBRSConfig",
"StatisticalConfig",
- "TestSeeds",
+ "StatisticalTolerances",
"TestParameters",
"TestScenarios",
- "StatisticalTolerances",
- "TOLERANCE",
- "CONTINUITY",
- "EXIT_FACTOR",
- "PBRS",
- "STATISTICAL",
- "SEEDS",
- "PARAMS",
- "SCENARIOS",
- "STAT_TOL",
+ "TestSeeds",
+ "ToleranceConfig",
]
)
__all__ = [
- "assert_monotonic_nonincreasing",
- "assert_monotonic_nonnegative",
- "assert_finite",
+ "DEFAULT_REWARD_CONFIG",
+ "DEFAULT_SIMULATION_CONFIG",
+ "ContextFactory",
+ "ExitFactorConfig",
+ "ProgressiveScalingConfig",
+ "RewardScenarioConfig",
+ "SimulationConfig",
+ "StatisticalTestConfig",
+ "ThresholdTestConfig",
+ "ValidationCallback",
+ "ValidationConfig",
+ "WarningCaptureConfig",
+ "assert_adjustment_reason_contains",
"assert_almost_equal_list",
- "assert_trend",
"assert_component_sum_integrity",
- "assert_progressive_scaling_behavior",
- "assert_single_active_component",
- "assert_single_active_component_with_additives",
- "assert_reward_calculation_scenarios",
- "assert_parameter_sensitivity_behavior",
- "make_idle_penalty_test_contexts",
+ "assert_diagnostic_warning",
"assert_exit_factor_attenuation_modes",
+ "assert_exit_factor_invariant_suite",
+ "assert_exit_factor_kernel_fallback",
"assert_exit_factor_plateau_behavior",
"assert_exit_mode_mathematical_validation",
- "assert_multi_parameter_sensitivity",
+ "assert_finite",
"assert_hold_penalty_threshold_behavior",
- "safe_float",
- "build_validation_case",
- "execute_validation_batch",
- "assert_adjustment_reason_contains",
- "run_strict_validation_failure_cases",
- "run_relaxed_validation_adjustment_cases",
- "assert_exit_factor_invariant_suite",
- "assert_exit_factor_kernel_fallback",
- "assert_relaxed_multi_reason_aggregation",
- "assert_pbrs_invariance_report_classification",
- "assert_pbrs_canonical_sum_within_tolerance",
+ "assert_monotonic_nonincreasing",
+ "assert_monotonic_nonnegative",
+ "assert_multi_parameter_sensitivity",
+ "assert_no_warnings",
"assert_non_canonical_shaping_exceeds",
+ "assert_parameter_sensitivity_behavior",
+ "assert_pbrs_canonical_sum_within_tolerance",
+ "assert_pbrs_invariance_report_classification",
+ "assert_progressive_scaling_behavior",
+ "assert_relaxed_multi_reason_aggregation",
+ "assert_reward_calculation_scenarios",
+ "assert_single_active_component",
+ "assert_single_active_component_with_additives",
+ "assert_trend",
+ "build_validation_case",
"calculate_reward_with_defaults",
+ "capture_warnings",
+ "execute_validation_batch",
"get_exit_factor_with_defaults",
+ "make_idle_penalty_test_contexts",
+ "run_relaxed_validation_adjustment_cases",
+ "run_strict_validation_failure_cases",
+ "safe_float",
"simulate_samples_with_defaults",
- "RewardScenarioConfig",
- "ValidationConfig",
- "ThresholdTestConfig",
- "ProgressiveScalingConfig",
- "ExitFactorConfig",
- "StatisticalTestConfig",
- "SimulationConfig",
- "WarningCaptureConfig",
- "ValidationCallback",
- "ContextFactory",
- "DEFAULT_REWARD_CONFIG",
- "DEFAULT_SIMULATION_CONFIG",
- "capture_warnings",
- "assert_diagnostic_warning",
- "assert_no_warnings",
"validate_warning_content",
]
single invariant ownership and reduce duplication across taxonomy modules.
"""
-from typing import Any, Dict, List, Sequence, Tuple
+import itertools
+from collections.abc import Sequence
+from typing import Any
import numpy as np
def assert_reward_calculation_scenarios(
test_case,
- scenarios: List[Tuple[Any, Dict[str, Any], str]],
+ scenarios: list[tuple[Any, dict[str, Any], str]],
config: RewardScenarioConfig,
validation_fn,
):
def assert_parameter_sensitivity_behavior(
test_case,
- parameter_variations: List[Dict[str, Any]],
+ parameter_variations: list[dict[str, Any]],
base_context,
- base_params: Dict[str, Any],
+ base_params: dict[str, Any],
component_name: str,
expected_trend: str,
config: RewardScenarioConfig,
def make_idle_penalty_test_contexts(
context_factory_fn,
idle_duration_scenarios: Sequence[int],
- base_context_kwargs: Dict[str, Any] | None = None,
+ base_context_kwargs: dict[str, Any] | None = None,
):
"""Generate contexts for idle penalty testing with varying durations.
test_case: Test case instance with assertion methods
base_factor: Base scaling factor
pnl: Realized profit/loss
- pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
+ pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio)
context: RewardContext for efficiency coefficient calculation
attenuation_modes: List of mode names to test
base_params_fn: Factory function for creating parameter dicts
if mode == "plateau_linear":
grace = float(mode_params["exit_plateau_grace"])
filtered = [
- (r, v) for r, v in zip(ratios, values) if r >= grace - tolerance_relaxed
+ (r, v)
+ for r, v in zip(ratios, values, strict=False)
+ if r >= grace - tolerance_relaxed
]
values_to_check = [v for _, v in filtered]
else:
values_to_check = values
- for earlier, later in zip(values_to_check, values_to_check[1:]):
+ for earlier, later in itertools.pairwise(values_to_check):
test_case.assertLessEqual(
later, earlier + tolerance_relaxed, f"Non-monotonic attenuation in mode={mode}"
)
def assert_exit_mode_mathematical_validation(
test_case,
context,
- params: Dict[str, Any],
+ params: dict[str, Any],
base_factor: float,
profit_aim: float,
risk_reward_ratio: float,
reward_half_life.exit_component,
reward_linear.exit_component,
]
- test_case.assertTrue(all((r > 0 for r in rewards)))
- unique_rewards = set((f"{r:.6f}" for r in rewards))
+ test_case.assertTrue(all(r > 0 for r in rewards))
+ unique_rewards = {f"{r:.6f}" for r in rewards}
test_case.assertGreater(len(unique_rewards), 1)
def assert_multi_parameter_sensitivity(
test_case,
- parameter_test_cases: List[Tuple[float, float, str]],
+ parameter_test_cases: list[tuple[float, float, str]],
context_factory_fn,
- base_params: Dict[str, Any],
+ base_params: dict[str, Any],
config: RewardScenarioConfig,
):
"""Validate reward behavior across multiple parameter combinations.
def assert_hold_penalty_threshold_behavior(
test_case,
context_factory_fn,
- params: Dict[str, Any],
+ params: dict[str, Any],
base_factor: float,
profit_aim: float,
risk_reward_ratio: float,
def build_validation_case(
- param_updates: Dict[str, Any],
+ param_updates: dict[str, Any],
strict: bool,
expect_error: bool = False,
expected_reason_substrings: Sequence[str] | None = None,
-) -> Dict[str, Any]:
+) -> dict[str, Any]:
"""Build a structured validation test case descriptor.
Creates a standardized test case dictionary for parameter validation testing,
}
-def execute_validation_batch(test_case, cases: Sequence[Dict[str, Any]], validate_fn):
+def execute_validation_batch(test_case, cases: Sequence[dict[str, Any]], validate_fn):
"""Execute a batch of parameter validation test cases.
Runs multiple validation scenarios in batch, handling both strict (error-raising)
params = case["params"].copy()
strict_flag = case["strict"]
if strict_flag and case["expect_error"]:
- test_case.assertRaises(Exception, validate_fn, params, True)
+ test_case.assertRaises(ValueError, validate_fn, params, True)
continue
result = validate_fn(params, strict=strict_flag)
if isinstance(result, tuple) and len(result) == 2 and isinstance(result[0], dict):
def assert_adjustment_reason_contains(
- test_case, adjustments: Dict[str, Dict[str, Any]], key: str, expected_substrings: Sequence[str]
+ test_case, adjustments: dict[str, dict[str, Any]], key: str, expected_substrings: Sequence[str]
):
"""Assert adjustment reason contains all expected substrings.
def run_strict_validation_failure_cases(
- test_case, failure_params_list: Sequence[Dict[str, Any]], validate_fn
+ test_case, failure_params_list: Sequence[dict[str, Any]], validate_fn
):
"""Batch test strict validation failures.
def run_relaxed_validation_adjustment_cases(
test_case,
- relaxed_cases: Sequence[Tuple[Dict[str, Any], Sequence[str]]],
+ relaxed_cases: Sequence[tuple[dict[str, Any], Sequence[str]]],
validate_fn,
):
"""Batch test relaxed validation adjustments.
def assert_exit_factor_invariant_suite(
- test_case, suite_cases: Sequence[Dict[str, Any]], exit_factor_fn
+ test_case, suite_cases: Sequence[dict[str, Any]], exit_factor_fn
):
"""Validate exit factor invariants across multiple scenarios.
suite_cases: List of scenario dicts with keys:
- base_factor: Base scaling factor
- pnl: Realized profit/loss
- - pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) for coefficient calculation
+ - pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) for coefficient calculation
- context: RewardContext for efficiency coefficient
- duration_ratio: Duration ratio (0-2)
- params: Parameter dictionary
pnl_target: float,
duration_ratio: float,
context,
- bad_params: Dict[str, Any],
- reference_params: Dict[str, Any],
+ bad_params: dict[str, Any],
+ reference_params: dict[str, Any],
risk_reward_ratio: float,
):
"""Validate exit factor fallback behavior on kernel failure.
def assert_relaxed_multi_reason_aggregation(
test_case,
validate_fn,
- params: Dict[str, Any],
- key_expectations: Dict[str, Sequence[str]],
+ params: dict[str, Any],
+ key_expectations: dict[str, Sequence[str]],
):
"""Validate relaxed validation produces expected adjustment reasons.
exit_factor_fn: Exit factor calculation function (_get_exit_factor)
base_factor: Base factor for exit calculation
pnl: PnL value
- pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) for coefficient calculation
+ pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) for coefficient calculation
context: RewardContext for efficiency coefficient
plateau_params: Parameters dict with plateau configuration
grace: Grace period threshold (exit_plateau_grace value)
def calculate_reward_with_defaults(
context,
- params: Dict[str, Any],
+ params: dict[str, Any],
config: RewardScenarioConfig | None = None,
**overrides,
):
pnl: float,
duration_ratio: float,
context,
- params: Dict[str, Any],
+ params: dict[str, Any],
base_factor: float | None = None,
pnl_target: float | None = None,
risk_reward_ratio: float | None = None,
def simulate_samples_with_defaults(
- params: Dict[str, Any],
+ params: dict[str, Any],
config: SimulationConfig | None = None,
base_factor: float | None = None,
profit_aim: float | None = None,
... )
"""
+from collections.abc import Callable
from dataclasses import dataclass
-from typing import Callable, Optional
from ..constants import PARAMS, SEEDS, STATISTICAL, TOLERANCE
tolerance_strict: float = TOLERANCE.IDENTITY_STRICT
tolerance_relaxed: float = TOLERANCE.IDENTITY_RELAXED
- exclude_components: Optional[list[str]] = None
+ exclude_components: list[str] | None = None
component_description: str = "reward components"
decomposition, attenuation mode and plateau behavior.
The exit factor is computed as:
- exit_factor = base_factor × time_attenuation × pnl_target × efficiency
+ exit_factor = base_factor * time_attenuation * pnl_target * efficiency
Attributes:
base_factor: Base scaling factor
n_bootstrap: int = STATISTICAL.BOOTSTRAP_DEFAULT_ITERATIONS
confidence_level: float = 0.95
seed: int = SEEDS.BASE
- adjust_method: Optional[str] = None
+ adjust_method: str | None = None
alpha: float = 0.05
__all__ = [
- "RewardScenarioConfig",
- "ValidationConfig",
- "ThresholdTestConfig",
- "ProgressiveScalingConfig",
+ "DEFAULT_REWARD_CONFIG",
+ "DEFAULT_SIMULATION_CONFIG",
+ "ContextFactory",
"ExitFactorConfig",
- "StatisticalTestConfig",
+ "ProgressiveScalingConfig",
+ "RewardScenarioConfig",
"SimulationConfig",
- "WarningCaptureConfig",
+ "StatisticalTestConfig",
+ "ThresholdTestConfig",
"ValidationCallback",
- "ContextFactory",
- "DEFAULT_REWARD_CONFIG",
- "DEFAULT_SIMULATION_CONFIG",
+ "ValidationConfig",
+ "WarningCaptureConfig",
]
import warnings
from contextlib import contextmanager
-from typing import Any, Optional
+from typing import Any
import reward_space_analysis
@contextmanager
def assert_diagnostic_warning(
expected_substrings: list[str],
- warning_category: Optional[type[Warning]] = None,
+ warning_category: type[Warning] | None = None,
strict_mode: bool = True,
):
"""Context manager that captures warnings and asserts their presence.
__all__ = [
- "capture_warnings",
"assert_diagnostic_warning",
"assert_no_warnings",
+ "capture_warnings",
"validate_warning_content",
]
_assert_cli_success(self, result2)
for run_dir in ["run1", "run2"]:
- with open(self.output_path / run_dir / "manifest.json", "r") as f:
+ with (self.output_path / run_dir / "manifest.json").open() as f:
manifest = json.load(f)
required_keys = {
"generated_at",
self.assertEqual(manifest["num_samples"], SCENARIOS.SAMPLE_SIZE_SMALL)
self.assertEqual(manifest["seed"], SEEDS.BASE)
- with open(self.output_path / "run1" / "manifest.json", "r") as f:
+ with (self.output_path / "run1" / "manifest.json").open() as f:
manifest1 = json.load(f)
- with open(self.output_path / "run2" / "manifest.json", "r") as f:
+ with (self.output_path / "run2" / "manifest.json").open() as f:
manifest2 = json.load(f)
self.assertEqual(
manifest1["params_hash"],
terminal_next_potentials, shaping_values = self._canonical_sweep(params)
self.assertEqual(params, params_before)
if terminal_next_potentials:
- self.assertTrue(all((abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials)))
- max_abs = max((abs(v) for v in shaping_values)) if shaping_values else 0.0
+ self.assertTrue(all(abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials))
+ max_abs = max(abs(v) for v in shaping_values) if shaping_values else 0.0
self.assertLessEqual(max_abs, PBRS.MAX_ABS_SHAPING)
def test_progressive_release_negative_decay_clamped(self):
gamma = float(gamma_fallback)
except Exception:
gamma = 0.95
- # PBRS shaping Δ = γ·Φ(next) − Φ(prev). Here Φ(next)=Φ(prev) since decay clamps to 0.
+ # PBRS shaping Δ = γ·Φ(next) − Φ(prev). Here Φ(next)=Φ(prev) since decay clamps to 0. # noqa: RUF003
self.assertLessEqual(
abs(shaping - ((gamma - 1.0) * prev_potential)),
TOLERANCE.GENERIC_EQ,
)
execute_validation_batch(
self,
- [success_case] + strict_failures + [relaxed_case],
+ [success_case, *strict_failures, relaxed_case],
validate_reward_parameters,
)
params_relaxed = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
def test_compute_exit_potential_mode_differences(self):
"""Exit potential modes: canonical vs spike_cancel shaping magnitude differences."""
gamma = 0.93
- base_common = dict(
- hold_potential_enabled=True,
- potential_gamma=gamma,
- entry_additive_enabled=False,
- exit_additive_enabled=False,
- hold_potential_ratio=1.0,
- )
+ base_common = {
+ "hold_potential_enabled": True,
+ "potential_gamma": gamma,
+ "entry_additive_enabled": False,
+ "exit_additive_enabled": False,
+ "hold_potential_ratio": 1.0,
+ }
ctx_pnl = 0.012
ctx_dur_ratio = 0.3
params_can = self.base_params(exit_potential_mode="canonical", **base_common)
self.assertLessEqual(abs(shap), PBRS.MAX_ABS_SHAPING)
# With bounded transforms and hold_potential_ratio=1:
- # |Φ(s)| <= base_factor and |Δ| <= (1+γ)*base_factor
+ # |Φ(s)| <= base_factor and |Δ| <= (1+γ)*base_factor # noqa: RUF003
self.assertLessEqual(abs(float(shap)), (1.0 + gamma) * PARAMS.BASE_FACTOR)
def test_report_cumulative_invariance_aggregation(self):
if abs(inc) > max_abs_step:
max_abs_step = abs(inc)
steps += 1
- if is_exit:
- prev_potential = 0.0
- else:
- prev_potential = next_potential
+ prev_potential = 0.0 if is_exit else next_potential
mean_drift = telescoping_sum / max(1, steps)
self.assertLess(
abs(mean_drift),
def test_decomposition_integrity(self):
"""reward must equal the single active core component under mutually exclusive scenarios (idle/hold/exit/invalid)."""
scenarios = [
- dict(
- ctx=self.make_ctx(
+ {
+ "ctx": self.make_ctx(
pnl=0.0,
trade_duration=0,
idle_duration=25,
position=Positions.Neutral,
action=Actions.Neutral,
),
- active="idle_penalty",
- ),
- dict(
- ctx=self.make_ctx(
+ "active": "idle_penalty",
+ },
+ {
+ "ctx": self.make_ctx(
pnl=0.0,
trade_duration=150,
idle_duration=0,
position=Positions.Long,
action=Actions.Neutral,
),
- active="hold_penalty",
- ),
- dict(
- ctx=self.make_ctx(
+ "active": "hold_penalty",
+ },
+ {
+ "ctx": self.make_ctx(
pnl=PARAMS.PROFIT_AIM,
trade_duration=60,
idle_duration=0,
position=Positions.Long,
action=Actions.Long_exit,
),
- active="exit_component",
- ),
- dict(
- ctx=self.make_ctx(
+ "active": "exit_component",
+ },
+ {
+ "ctx": self.make_ctx(
pnl=0.01,
trade_duration=10,
idle_duration=0,
position=Positions.Short,
action=Actions.Long_exit,
),
- active="invalid_penalty",
- ),
+ "active": "invalid_penalty",
+ },
]
for sc in scenarios:
ctx_obj = sc["ctx"]
)
# Part 2: Monotonic attenuation validation
- modes = list(ATTENUATION_MODES) + ["plateau_linear"]
+ modes = [*list(ATTENUATION_MODES), "plateau_linear"]
test_pnl = 0.05
test_context = self.make_ctx(
pnl=test_pnl,
self.assertTrue(runtime_warnings)
self.assertTrue(
any(
- (
- ">" in str(w.message)
- and "threshold" in str(w.message)
- or "|exit_factor|=" in str(w.message)
- for w in runtime_warnings
- )
+ (">" in str(w.message) and "threshold" in str(w.message))
+ or "|exit_factor|=" in str(w.message)
+ for w in runtime_warnings
)
)
params,
PARAMS.RISK_REWARD_RATIO,
)
- if 0.0 < tau <= 1.0:
- alpha = -math.log(tau) / math.log(2.0)
- else:
- alpha = 1.0
+ alpha = -math.log(tau) / math.log(2.0) if 0.0 < tau <= 1.0 else 1.0
expected_ratio = 1.0 / (1.0 + duration_ratio) ** alpha
observed_ratio = f1 / f0 if f0 != 0 else np.nan
self.assertFinite(observed_ratio, name="observed_ratio")
f"Scaling ratio too large (ratio={ratio:.2f})",
)
- # === Robustness invariants 102–105 ===
+ # === Robustness invariants 102–105 === # noqa: RUF003
# Owns invariant: robustness-exit-mode-fallback-102
def test_robustness_102_unknown_exit_mode_fallback_linear(self):
"""Invariant 102: Unknown exit_attenuation_mode gracefully warns and falls back to linear kernel."""
- model is None
"""
df = _minimal_df(0) # empty
- importance_df, stats, partial_deps, model = _perform_feature_analysis(
+ importance_df, stats, _partial_deps, model = _perform_feature_analysis(
df, seed=SEEDS.FEATURE_EMPTY, skip_partial_dependence=True
)
assert importance_df.empty
"""
rng = np.random.default_rng(SEEDS.FEATURE_PRIME_11)
df = pd.DataFrame({"pnl": rng.normal(0, 1, 25), "reward": rng.normal(0, 1, 25)})
- importance_df, stats, partial_deps, model = _perform_feature_analysis(
+ importance_df, stats, _partial_deps, model = _perform_feature_analysis(
df, seed=SEEDS.FEATURE_PRIME_11, skip_partial_dependence=True
)
assert stats["n_features"] == 1
"reward": rng.normal(0, 1, 40),
}
)
- importance_df, stats, partial_deps, model = _perform_feature_analysis(
+ importance_df, stats, _partial_deps, model = _perform_feature_analysis(
df, seed=SEEDS.FEATURE_PRIME_13, skip_partial_dependence=True
)
# Should hit NaN stub path (model_fitted False)
if RandomForestRegressor is None: # type: ignore[comparison-overlap]
pytest.skip("sklearn components unavailable; skipping model fitting failure test")
- def boom(self, *a, **kw): # noqa: D401
+ def boom(self, *a, **kw):
raise RuntimeError("forced fit failure")
monkeypatch.setattr(RandomForestRegressor, "fit", boom)
df = _minimal_df(50)
- importance_df, stats, partial_deps, model = _perform_feature_analysis(
+ importance_df, stats, _partial_deps, model = _perform_feature_analysis(
df, seed=SEEDS.FEATURE_PRIME_21, skip_partial_dependence=True
)
assert stats["model_fitted"] is False
"""
# Monkeypatch permutation_importance to raise while allowing partial dependence
- def perm_boom(*a, **kw): # noqa: D401
+ def perm_boom(*a, **kw):
raise RuntimeError("forced permutation failure")
monkeypatch.setattr("reward_space_analysis.permutation_importance", perm_boom)
def test_module_level_sklearn_import_failure_reload():
- """Force module-level sklearn import failure to execute fallback block (lines 32–42).
+ """Force module-level sklearn import failure to execute fallback block (lines 32-42).
Strategy:
- Temporarily monkeypatch builtins.__import__ to raise on any 'sklearn' import.
orig_mod = sys.modules.get("reward_space_analysis")
orig_import = builtins.__import__
- def fake_import(name, *args, **kwargs): # noqa: D401
+ def fake_import(name, *args, **kwargs):
if name.startswith("sklearn"):
raise RuntimeError("forced sklearn import failure")
return orig_import(name, *args, **kwargs)
reloaded_module = importlib.import_module("reward_space_analysis")
# Fallback assigns sklearn symbols to None
- assert getattr(reloaded_module, "RandomForestRegressor") is None
- assert getattr(reloaded_module, "train_test_split") is None
- assert getattr(reloaded_module, "permutation_importance") is None
- assert getattr(reloaded_module, "r2_score") is None
+ assert reloaded_module.RandomForestRegressor is None
+ assert reloaded_module.train_test_split is None
+ assert reloaded_module.permutation_importance is None
+ assert reloaded_module.r2_score is None
# Perform feature analysis should raise ImportError under missing components
df = _minimal_df(15)
with pytest.raises(ImportError):
# Use existing helper to get synthetic stats df (small for speed)
df = self.make_stats_df(n=120, seed=SEEDS.BASE, idle_pattern="mixed")
try:
- importance_df, analysis_stats, partial_deps, model = _perform_feature_analysis(
+ importance_df, analysis_stats, partial_deps, _model = _perform_feature_analysis(
df, seed=SEEDS.BASE, skip_partial_dependence=True, rf_n_jobs=1, perm_n_jobs=1
)
except ImportError:
for metric_name, value in metrics.items():
if "pnl" in metric_name:
if any(
- (
- suffix in metric_name
- for suffix in [
- "js_distance",
- "ks_statistic",
- "wasserstein",
- "kl_divergence",
- ]
- )
+ suffix in metric_name
+ for suffix in [
+ "js_distance",
+ "ks_statistic",
+ "wasserstein",
+ "kl_divergence",
+ ]
):
self.assertDistanceMetric(value, name=metric_name)
else:
"Idle duration and reward arrays should have same length",
)
self.assertTrue(
- all((d >= 0 for d in idle_dur)), "Idle durations should be non-negative"
+ all(d >= 0 for d in idle_dur), "Idle durations should be non-negative"
)
negative_rewards = (idle_rew < 0).sum()
total_rewards = len(idle_rew)
diagnostics = distribution_diagnostics(df)
expected_prefixes = ["reward_", "pnl_"]
for prefix in expected_prefixes:
- matching_keys = [key for key in diagnostics.keys() if key.startswith(prefix)]
+ matching_keys = [key for key in diagnostics if key.startswith(prefix)]
self.assertGreater(len(matching_keys), 0, f"Should have diagnostics for {prefix}")
expected_suffixes = ["mean", "std", "skewness", "kurtosis"]
for suffix in expected_suffixes:
df, adjust_method="benjamini_hochberg", seed=SEEDS.REPRODUCIBILITY
)
self.assertGreater(len(results_adj), 0)
- for name, res in results_adj.items():
+ for _name, res in results_adj.items():
self.assertIn("p_value", res)
self.assertIn("p_value_adj", res)
self.assertIn("significant_adj", res)
large = self._shift_scale_df(SCENARIOS.SAMPLE_SIZE_LARGE)
res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400)
res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400)
- _, lo_s, hi_s = list(res_small.values())[0]
- _, lo_l, hi_l = list(res_large.values())[0]
+ _, lo_s, hi_s = next(iter(res_small.values()))
+ _, lo_l, hi_l = next(iter(res_large.values()))
hw_small = (hi_s - lo_s) / 2.0
hw_large = (hi_l - lo_l) / 2.0
self.assertFinite(hw_small, name="hw_small")
#!/usr/bin/env python3
"""Base class and utilities for reward space analysis tests."""
+import itertools
import math
import random
import shutil
import tempfile
import unittest
+from collections.abc import Iterable, Sequence
from pathlib import Path
-from typing import Any, Dict, Iterable, Optional, Sequence, Union
+from typing import Any
import numpy as np
import pandas as pd
"entry_additive_enabled",
"exit_additive_enabled",
]
-PBRS_REQUIRED_PARAMS = PBRS_INTEGRATION_PARAMS + ["exit_potential_mode"]
+PBRS_REQUIRED_PARAMS = [*PBRS_INTEGRATION_PARAMS, "exit_potential_mode"]
class RewardSpaceTestBase(unittest.TestCase):
action=action,
)
- def base_params(self, **overrides) -> Dict[str, Any]:
+ def base_params(self, **overrides) -> dict[str, Any]:
"""Return fresh copy of default reward params with overrides."""
- params: Dict[str, Any] = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+ params: dict[str, Any] = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
params.update(overrides)
return params
self,
params: dict,
*,
- iterations: Optional[int] = None,
- terminal_prob: Optional[float] = None,
+ iterations: int | None = None,
+ terminal_prob: float | None = None,
seed: int = SEEDS.CANONICAL_SWEEP,
) -> tuple[list[float], list[float]]:
"""Run a lightweight canonical invariance sweep.
reward_mean: float = 0.0,
reward_std: float = 1.0,
pnl_mean: float = 0.01,
- pnl_std: Optional[float] = None,
+ pnl_std: float | None = None,
trade_duration_dist: str = "uniform",
idle_pattern: str = "mixed",
- seed: Optional[int] = None,
+ seed: int | None = None,
) -> pd.DataFrame:
"""Generate a synthetic statistical DataFrame.
def assertAlmostEqualFloat(
self,
- first: Union[float, int],
- second: Union[float, int],
- tolerance: Optional[float] = None,
- rtol: Optional[float] = None,
- msg: Union[str, None] = None,
+ first: float | int,
+ second: float | int,
+ tolerance: float | None = None,
+ rtol: float | None = None,
+ msg: str | None = None,
) -> None:
"""Compare floats with absolute and optional relative tolerance.
or f"Difference {diff} exceeds tolerance {tolerance} and relative tolerance {rtol} (a={first}, b={second})"
)
- def assertPValue(self, value: Union[float, int], msg: str = "") -> None:
+ def assertPValue(self, value: float | int, msg: str = "") -> None:
"""Assert a p-value is finite and within [0,1]."""
self.assertFinite(value, name="p-value")
self.assertGreaterEqual(value, 0.0, msg or f"p-value < 0: {value}")
self.assertLessEqual(value, 1.0, msg or f"p-value > 1: {value}")
def assertPlacesEqual(
- self, a: Union[float, int], b: Union[float, int], places: int, msg: Optional[str] = None
+ self, a: float | int, b: float | int, places: int, msg: str | None = None
) -> None:
"""Bridge for legacy places-based approximate equality.
def assertDistanceMetric(
self,
- value: Union[float, int],
+ value: float | int,
*,
non_negative: bool = True,
- upper: Optional[float] = None,
+ upper: float | None = None,
name: str = "metric",
) -> None:
"""Generic distance/divergence bounds: finite, optional non-negativity and optional upper bound."""
def assertEffectSize(
self,
- value: Union[float, int],
+ value: float | int,
*,
lower: float = -1.0,
upper: float = 1.0,
self.assertGreaterEqual(value, lower, f"{name} < {lower}: {value}")
self.assertLessEqual(value, upper, f"{name} > {upper}: {value}")
- def assertFinite(self, value: Union[float, int], name: str = "value") -> None:
+ def assertFinite(self, value: float | int, name: str = "value") -> None:
"""Assert scalar is finite."""
if not np.isfinite(value):
self.fail(f"{name} not finite: {value}")
def assertMonotonic(
self,
- seq: Union[Sequence[Union[float, int]], Iterable[Union[float, int]]],
+ seq: Sequence[float | int] | Iterable[float | int],
*,
- non_increasing: Optional[bool] = None,
- non_decreasing: Optional[bool] = None,
+ non_increasing: bool | None = None,
+ non_decreasing: bool | None = None,
tolerance: float = 0.0,
name: str = "sequence",
) -> None:
data = list(seq)
if len(data) < 2:
return
- if non_increasing and non_decreasing or (not non_increasing and (not non_decreasing)):
+ if (non_increasing and non_decreasing) or (not non_increasing and (not non_decreasing)):
self.fail("Specify exactly one monotonic direction")
- for a, b in zip(data, data[1:]):
+ for a, b in itertools.pairwise(data):
if non_increasing:
if b > a + tolerance:
self.fail(f"{name} not non-increasing at pair ({a}, {b})")
- elif non_decreasing:
- if b + tolerance < a:
- self.fail(f"{name} not non-decreasing at pair ({a}, {b})")
+ elif non_decreasing and b + tolerance < a:
+ self.fail(f"{name} not non-decreasing at pair ({a}, {b})")
def assertWithin(
self,
- value: Union[float, int],
- low: Union[float, int],
- high: Union[float, int],
+ value: float | int,
+ low: float | int,
+ high: float | int,
*,
name: str = "value",
inclusive: bool = True,
self.assertLess(value, high, f"{name} >= {high}")
def assertNearZero(
- self, value: Union[float, int], *, atol: Optional[float] = None, msg: Optional[str] = None
+ self, value: float | int, *, atol: float | None = None, msg: str | None = None
) -> None:
"""Assert a scalar is numerically near zero within absolute tolerance.
a,
b,
*,
- atol: Optional[float] = None,
- rtol: Optional[float] = None,
- msg: Optional[str] = None,
+ atol: float | None = None,
+ rtol: float | None = None,
+ msg: str | None = None,
) -> None:
"""Assert function(func, a, b) == function(func, b, a) within tolerance.
"""
import math
+from typing import ClassVar
import pytest
"""Comprehensive transform function tests with parameterized scenarios."""
# Transform function test data
- SMOOTH_TRANSFORMS = [t for t in ALLOWED_TRANSFORMS if t != "clip"]
- ALL_TRANSFORMS = list(ALLOWED_TRANSFORMS)
+ SMOOTH_TRANSFORMS: ClassVar[list[str]] = [t for t in ALLOWED_TRANSFORMS if t != "clip"]
+ ALL_TRANSFORMS: ClassVar[list[str]] = list(ALLOWED_TRANSFORMS)
def test_transform_exact_values(self):
"""Test transform functions produce exact expected values for specific inputs."""
("asinh", [0.0], [0.0]), # More complex calculations tested separately
# arctan transform: (2/pi) * arctan(x) in (-1, 1)
("arctan", [0.0, 1.0], [0.0, 2.0 / math.pi * math.atan(1.0)]),
- # sigmoid transform: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1)
+ # sigmoid transform: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1) # noqa: RUF003
("sigmoid", [0.0], [0.0]), # More complex calculations tested separately
# clip transform: clip(x, -1, 1) in [-1, 1]
("clip", [0.0, 0.5, 2.0, -2.0], [0.0, 0.5, 1.0, -1.0]),
]
for transform_name, test_values, expected_values in test_cases:
- for test_val, expected_value in zip(test_values, expected_values):
+ for test_val, expected_value in zip(test_values, expected_values, strict=False):
with self.subTest(
transform=transform_name, input=test_val, expected=expected_value
):
dependencies = [
{ name = "numpy" },
{ name = "pandas" },
- { name = "pytest" },
{ name = "scikit-learn" },
{ name = "scipy" },
]
-[package.dev-dependencies]
+[package.optional-dependencies]
dev = [
{ name = "pytest" },
{ name = "pytest-cov" },
requires-dist = [
{ name = "numpy", specifier = ">=1.26" },
{ name = "pandas" },
- { name = "pytest" },
+ { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" },
+ { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=7.0" },
+ { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8" },
{ name = "scikit-learn" },
{ name = "scipy", specifier = ">=1.11" },
]
-
-[package.metadata.requires-dev]
-dev = [
- { name = "pytest", specifier = ">=6.0" },
- { name = "pytest-cov", specifier = ">=7.0.0" },
- { name = "ruff" },
-]
+provides-extras = ["dev"]
[[package]]
name = "ruff"
if require_position and position not in (Positions.Long, Positions.Short):
return 0.0
- duration_ratio = 0.0 if duration_ratio < 0.0 else duration_ratio
- if duration_ratio > 1.0:
- duration_ratio = 1.0
+ duration_ratio = max(0.0, duration_ratio)
try:
pnl_ratio = pnl / pnl_target
**State Variables:**
r_pnl : pnl / pnl_target (PnL ratio)
- r_dur : duration / max_duration (duration ratio, clamp [0,1])
+ r_dur : duration / max_duration (duration ratio, max 0)
scale : scale parameter
g : gain parameter
T_x : transform function (tanh, softsign, etc.)