sys.exit(1)
-def base_params(**overrides) -> dict:
- """Return a fresh copy of DEFAULT_MODEL_REWARD_PARAMETERS with overrides applied.
-
- Ensures tests do not mutate the global defaults inadvertently.
- """
- params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
- params.update(overrides)
- return params
-
-
class RewardSpaceTestBase(unittest.TestCase):
"""Base class with common test utilities."""
def setUp(self):
"""Set up test fixtures with reproducible random seed."""
- np.random.seed(self.SEED)
+ # Unified seeding for numpy + random
+ self.seed_all(self.SEED)
self.temp_dir = tempfile.mkdtemp()
self.output_path = Path(self.temp_dir)
+ def tearDown(self):
+ """Clean up temporary files."""
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
+
# PBRS structural test constants
PBRS_TERMINAL_TOL = 1e-12
PBRS_MAX_ABS_SHAPING = 5.0
action=action,
)
+ def base_params(self, **overrides) -> dict:
+ """Return fresh copy of default reward params with overrides."""
+ params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+ params.update(overrides)
+ return params
+
def _canonical_sweep(
self,
params: dict,
pnl, trade_duration, idle_duration, position. Guarantees: no NaN; reward_idle==0 where idle_duration==0.
"""
if seed is not None:
- np.random.seed(seed)
+ self.seed_all(seed)
pnl_std_eff = self.TEST_PNL_STD if pnl_std is None else pnl_std
reward_total = np.random.normal(reward_total_mean, reward_total_std, n)
pnl = np.random.normal(pnl_mean, pnl_std_eff, n)
idle_duration = np.zeros(n)
else: # all_nonzero
idle_duration = np.random.uniform(5, 60, n)
- # Component rewards (basic synthetic shaping consistent with sign expectations)
+ # Component rewards
reward_idle = np.where(idle_duration > 0, np.random.normal(-1, 0.3, n), 0.0)
reward_hold = np.random.normal(-0.5, 0.2, n)
reward_exit = np.random.normal(0.8, 0.6, n)
}
)
- def tearDown(self):
- """Clean up temporary files."""
- shutil.rmtree(self.temp_dir, ignore_errors=True)
-
def assertAlmostEqualFloat(
self,
first: Union[float, int],
def _make_idle_variance_df(self, n: int = 100) -> pd.DataFrame:
"""Synthetic dataframe focusing on idle_duration ↔ reward_idle correlation."""
- np.random.seed(self.SEED)
+ self.seed_all(self.SEED)
idle_duration = np.random.exponential(10, n)
reward_idle = -0.01 * idle_duration + np.random.normal(0, 0.001, n)
return pd.DataFrame(
def test_stats_distribution_metrics_mathematical_bounds(self):
"""Mathematical bounds and validity of distribution shift metrics."""
- np.random.seed(self.SEED)
+ self.seed_all(self.SEED)
df1 = pd.DataFrame(
{
"pnl": np.random.normal(0, self.TEST_PNL_STD, 500),
position=Positions.Long,
action=Actions.Long_exit,
)
- params = self.DEFAULT_PARAMS.copy()
+ params = self.base_params()
profit_target = self.TEST_PROFIT_TARGET * self.TEST_RR
pnl_factor = _get_pnl_factor(params, ctx, profit_target, self.TEST_RR)
self.assertFinite(pnl_factor, name="pnl_factor")
self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=self.TOL_GENERIC_EQ)
def test_max_idle_duration_candles_logic(self):
- params_small = self.DEFAULT_PARAMS.copy()
- params_large = self.DEFAULT_PARAMS.copy()
- params_small["max_idle_duration_candles"] = 50
- params_large["max_idle_duration_candles"] = 200
+ params_small = self.base_params(max_idle_duration_candles=50)
+ params_large = self.base_params(max_idle_duration_candles=200)
base_factor = self.TEST_BASE_FACTOR
context = self.make_ctx(
pnl=0.0,
self.assertGreater(large.idle_penalty, small.idle_penalty)
def test_exit_factor_calculation(self):
- """Test exit factor calculation consistency across core modes + plateau variant.
-
- Plateau behavior expressed via exit_plateau=True with a base kernel (e.g. linear).
- """
+ """Exit factor calculation across core modes + plateau variant (plateau via exit_plateau=True)."""
# Core attenuation kernels (excluding legacy which is step-based)
modes_to_test = ["linear", "power"]
for mode in modes_to_test:
- test_params = self.DEFAULT_PARAMS.copy()
- test_params["exit_attenuation_mode"] = mode
+ test_params = self.base_params(exit_attenuation_mode=mode)
factor = _get_exit_factor(
base_factor=1.0,
pnl=0.02,
self.assertFinite(factor, name=f"exit_factor[{mode}]")
self.assertGreater(factor, 0, f"Exit factor for {mode} should be positive")
- # Plateau+linear variant sanity check (grace region at 0.5)
- plateau_params = self.DEFAULT_PARAMS.copy()
- plateau_params.update(
- {
- "exit_attenuation_mode": "linear",
- "exit_plateau": True,
- "exit_plateau_grace": 0.5,
- "exit_linear_slope": 1.0,
- }
+ # Plateau+linear variant (grace region 0.5)
+ plateau_params = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_plateau=True,
+ exit_plateau_grace=0.5,
+ exit_linear_slope=1.0,
)
plateau_factor_pre = _get_exit_factor(
base_factor=1.0,
win_reward_factor = 3.0 # asymptote = 4.0
beta = 0.5
profit_target = self.TEST_PROFIT_TARGET
- params = self.DEFAULT_PARAMS.copy()
- params.update(
- {
- "win_reward_factor": win_reward_factor,
- "pnl_factor_beta": beta,
- "efficiency_weight": 0.0, # disable efficiency modulation
- "exit_attenuation_mode": "linear",
- "exit_plateau": False,
- "exit_linear_slope": 0.0, # keep attenuation = 1
- }
+ params = self.base_params(
+ win_reward_factor=win_reward_factor,
+ pnl_factor_beta=beta,
+ efficiency_weight=0.0, # disable efficiency modulation
+ exit_attenuation_mode="linear",
+ exit_plateau=False,
+ exit_linear_slope=0.0, # keep attenuation = 1
)
# Ensure provided base_factor=1.0 is actually used (remove default 100)
params.pop("base_factor", None)
def test_scale_invariance_and_decomposition(self):
"""Components scale ~ linearly with base_factor; total equals sum(core + shaping + additives)."""
- params = self.DEFAULT_PARAMS.copy()
- # Remove internal base_factor so the explicit argument is used
- params.pop("base_factor", None)
+ params = self.base_params()
+ params.pop("base_factor", None) # explicit base_factor argument below
base_factor = 80.0
k = 7.5
profit_target = self.TEST_PROFIT_TARGET
def test_long_short_symmetry(self):
"""Long vs Short exit reward magnitudes should match in absolute value for identical PnL (no directional bias)."""
- params = self.DEFAULT_PARAMS.copy()
+ params = self.base_params()
params.pop("base_factor", None)
base_factor = 120.0
profit_target = 0.04
Uses a very large base_factor to trigger potential warning condition without capping.
"""
- params = self.DEFAULT_PARAMS.copy()
+ params = self.base_params()
self.assertIn("check_invariants", params)
self.assertIn("exit_factor_threshold", params)
active_label: str = sc["active"] # type: ignore[index]
with self.subTest(active=active_label):
# Build parameters disabling shaping and additives to enforce strict decomposition
- params_local = self.DEFAULT_PARAMS.copy()
- params_local.update(
- {
- "entry_additive_enabled": False,
- "exit_additive_enabled": False,
- "hold_potential_enabled": False,
- "potential_gamma": 0.0,
- # Ensure any invariance flags do not add shaping
- "check_invariants": False,
- }
+ params_local = self.base_params(
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ hold_potential_enabled=False,
+ potential_gamma=0.0,
+ check_invariants=False,
)
br = calculate_reward(
ctx_obj,
def test_idle_penalty_fallback_and_proportionality(self):
"""Idle penalty fallback denominator & proportional scaling (robustness)."""
- params = self.DEFAULT_PARAMS.copy()
- params["max_idle_duration_candles"] = None
+ params = self.base_params(max_idle_duration_candles=None)
base_factor = 90.0
profit_target = self.TEST_PROFIT_TARGET
risk_reward_ratio = 1.0
def test_exit_factor_threshold_warning_and_non_capping(self):
"""Warning emission without capping when exit_factor_threshold exceeded."""
- params = base_params(exit_factor_threshold=10.0)
+ params = self.base_params(exit_factor_threshold=10.0)
params.pop("base_factor", None)
context = self.make_ctx(
pnl=0.08,
pnl = 0.03
pnl_factor = 1.0
duration_ratios = [0.0, 0.2, 0.5, 1.0, 1.5]
- params_bad = base_params(
+ params_bad = self.base_params(
exit_attenuation_mode="linear", exit_linear_slope=-5.0, exit_plateau=False
)
- params_ref = base_params(
+ params_ref = self.base_params(
exit_attenuation_mode="linear", exit_linear_slope=1.0, exit_plateau=False
)
for dr in duration_ratios:
1.0,
] # include boundary 1.0 => alpha=0 per formula? actually -> -log(1)/log2 = 0
for tau in taus:
- params = base_params(
+ params = self.base_params(
exit_attenuation_mode="power", exit_power_tau=tau, exit_plateau=False
)
f0 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.0, params)
# Boundary condition tests (extremes / continuity / monotonicity)
def test_extreme_parameter_values(self):
- extreme_params = self.DEFAULT_PARAMS.copy()
- extreme_params["win_reward_factor"] = 1000.0
- extreme_params["base_factor"] = 10000.0
+ extreme_params = self.base_params(
+ win_reward_factor=1000.0,
+ base_factor=10000.0,
+ )
context = RewardContext(
pnl=0.05,
trade_duration=50,
modes = ATTENUATION_MODES_WITH_LEGACY
for mode in modes:
with self.subTest(mode=mode):
- test_params = self.DEFAULT_PARAMS.copy()
- test_params["exit_attenuation_mode"] = mode
+ test_params = self.base_params(exit_attenuation_mode=mode)
ctx = RewardContext(
pnl=0.02,
trade_duration=50,
pnl = 0.05
pnl_factor = 1.0
for mode in modes:
- params = self.DEFAULT_PARAMS.copy()
- if mode in ("sqrt", "linear", "power", "half_life"):
- params["exit_attenuation_mode"] = mode
- if mode == "linear":
- params["exit_linear_slope"] = 1.2
if mode == "plateau_linear":
- params["exit_attenuation_mode"] = "linear"
- params["exit_plateau"] = True
- params["exit_plateau_grace"] = 0.2
- params["exit_linear_slope"] = 1.0
- if mode == "power":
- params["exit_power_tau"] = 0.5
- if mode == "half_life":
- params["exit_half_life"] = 0.7
+ params = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_plateau=True,
+ exit_plateau_grace=0.2,
+ exit_linear_slope=1.0,
+ )
+ elif mode == "linear":
+ params = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_linear_slope=1.2,
+ )
+ elif mode == "power":
+ params = self.base_params(
+ exit_attenuation_mode="power",
+ exit_power_tau=0.5,
+ )
+ elif mode == "half_life":
+ params = self.base_params(
+ exit_attenuation_mode="half_life",
+ exit_half_life=0.7,
+ )
+ else: # sqrt
+ params = self.base_params(exit_attenuation_mode="sqrt")
ratios = np.linspace(0, 2, 15)
values = [
pnl = 0.02
pnl_factor = 1.0
# Tau near 1 (minimal attenuation) vs tau near 0 (strong attenuation)
- params_hi = self.DEFAULT_PARAMS.copy()
- params_hi.update({"exit_attenuation_mode": "power", "exit_power_tau": 0.999999})
- params_lo = self.DEFAULT_PARAMS.copy()
- params_lo.update(
- {
- "exit_attenuation_mode": "power",
- "exit_power_tau": self.MIN_EXIT_POWER_TAU,
- }
+ params_hi = self.base_params(
+ exit_attenuation_mode="power", exit_power_tau=0.999999
+ )
+ params_lo = self.base_params(
+ exit_attenuation_mode="power",
+ exit_power_tau=self.MIN_EXIT_POWER_TAU,
)
r = 1.5
hi_val = _get_exit_factor(base_factor, pnl, pnl_factor, r, params_hi)
"Power mode: higher tau (≈1) should attenuate less than tiny tau",
)
# Plateau grace 0 vs 1
- params_g0 = self.DEFAULT_PARAMS.copy()
- params_g0.update(
- {
- "exit_attenuation_mode": "linear",
- "exit_plateau": True,
- "exit_plateau_grace": 0.0,
- "exit_linear_slope": 1.0,
- }
- )
- params_g1 = self.DEFAULT_PARAMS.copy()
- params_g1.update(
- {
- "exit_attenuation_mode": "linear",
- "exit_plateau": True,
- "exit_plateau_grace": 1.0,
- "exit_linear_slope": 1.0,
- }
+ params_g0 = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_plateau=True,
+ exit_plateau_grace=0.0,
+ exit_linear_slope=1.0,
+ )
+ params_g1 = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_plateau=True,
+ exit_plateau_grace=1.0,
+ exit_linear_slope=1.0,
)
val_g0 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g0)
val_g1 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g1)
"Plateau grace=1.0 should delay attenuation vs grace=0.0",
)
# Linear slope zero vs positive
- params_lin0 = self.DEFAULT_PARAMS.copy()
- params_lin0.update(
- {
- "exit_attenuation_mode": "linear",
- "exit_linear_slope": 0.0,
- "exit_plateau": False,
- }
+ params_lin0 = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_linear_slope=0.0,
+ exit_plateau=False,
)
- params_lin1 = self.DEFAULT_PARAMS.copy()
- params_lin1.update(
- {
- "exit_attenuation_mode": "linear",
- "exit_linear_slope": 2.0,
- "exit_plateau": False,
- }
+ params_lin1 = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_linear_slope=2.0,
+ exit_plateau=False,
)
val_lin0 = _get_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin0)
val_lin1 = _get_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin1)
def test_plateau_linear_slope_zero_constant_after_grace(self):
"""Plateau+linear slope=0 should yield flat factor after grace boundary (no attenuation)."""
- params = self.DEFAULT_PARAMS.copy()
- params.update(
- {
- "exit_attenuation_mode": "linear",
- "exit_plateau": True,
- "exit_plateau_grace": 0.3,
- "exit_linear_slope": 0.0,
- }
+ params = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_plateau=True,
+ exit_plateau_grace=0.3,
+ exit_linear_slope=0.0,
)
base_factor = self.TEST_BASE_FACTOR
pnl = 0.04
self.assertEqual(val, 0.0)
def test_exit_potential_canonical(self):
- params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
- params.update(
- {
- "exit_potential_mode": "canonical",
- "hold_potential_enabled": True,
- "entry_additive_enabled": True, # expected to be auto-disabled by canonical invariance
- "exit_additive_enabled": True, # expected to be auto-disabled by canonical invariance
- }
+ params = self.base_params(
+ exit_potential_mode="canonical",
+ hold_potential_enabled=True,
+ entry_additive_enabled=True, # expected to be auto-disabled by canonical invariance
+ exit_additive_enabled=True, # expected to be auto-disabled by canonical invariance
)
base_reward = 0.25
current_pnl = 0.05
def test_pbrs_invariance_internal_flag_set(self):
"""Canonical path sets _pbrs_invariance_applied once; second call idempotent."""
- params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
- params.update(
- {
- "exit_potential_mode": "canonical",
- "hold_potential_enabled": True,
- "entry_additive_enabled": True, # will be auto-disabled
- "exit_additive_enabled": True,
- }
+ params = self.base_params(
+ exit_potential_mode="canonical",
+ hold_potential_enabled=True,
+ entry_additive_enabled=True, # will be auto-disabled
+ exit_additive_enabled=True,
)
# Structural sweep (ensures terminal Φ'==0 and shaping bounded)
terminal_next_potentials, shaping_values = self._canonical_sweep(params)
def test_progressive_release_negative_decay_clamped(self):
"""Negative decay must clamp to 0 => next potential equals last potential (no release)."""
- params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
- params.update(
- {
- "exit_potential_mode": "progressive_release",
- "exit_potential_decay": -0.75, # clamped to 0
- "hold_potential_enabled": True,
- }
+ params = self.base_params(
+ exit_potential_mode="progressive_release",
+ exit_potential_decay=-0.75, # clamped to 0
+ hold_potential_enabled=True,
)
last_potential = 0.42
# Use neutral current state so Φ(s) ≈ 0 (approx) if transforms remain small.
def test_potential_gamma_nan_fallback(self):
"""potential_gamma=NaN should fall back to default value (indirect comparison)."""
- base_params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
- default_gamma = base_params.get("potential_gamma", 0.95)
- params_nan = base_params.copy()
- params_nan.update(
- {"potential_gamma": float("nan"), "hold_potential_enabled": True}
+ base_params_dict = self.base_params()
+ default_gamma = base_params_dict.get("potential_gamma", 0.95)
+ params_nan = self.base_params(
+ potential_gamma=float("nan"), hold_potential_enabled=True
)
# Non-terminal transition so Φ(s') is computed and depends on gamma
res_nan = apply_potential_shaping(
last_potential=0.0,
params=params_nan,
)
- params_ref = base_params.copy()
- params_ref.update(
- {"potential_gamma": default_gamma, "hold_potential_enabled": True}
+ params_ref = self.base_params(
+ potential_gamma=default_gamma, hold_potential_enabled=True
)
res_ref = apply_potential_shaping(
base_reward=0.1,
def test_pbrs_retain_previous_cumulative_drift(self):
"""retain_previous mode accumulates negative shaping drift (non-invariant)."""
- params = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
- params.update(
- {
- "exit_potential_mode": "retain_previous",
- "hold_potential_enabled": True,
- "entry_additive_enabled": False,
- "exit_additive_enabled": False,
- "potential_gamma": 0.9,
- }
+ params = self.base_params(
+ exit_potential_mode="retain_previous",
+ hold_potential_enabled=True,
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ potential_gamma=0.9,
)
gamma = _get_float_param(
params,
"""Additives enabled increase total reward; shaping impact limited."""
# Use a non-canonical exit mode to avoid automatic invariance enforcement
# disabling the additive components on first call (canonical path auto-disables).
- base = base_params(
+ base = self.base_params(
hold_potential_enabled=True,
entry_additive_enabled=False,
exit_additive_enabled=False,
def test_report_cumulative_invariance_aggregation(self):
"""Canonical telescoping term: small per-step mean drift, bounded increments."""
- params = base_params(
+ params = self.base_params(
hold_potential_enabled=True,
entry_additive_enabled=False,
exit_additive_enabled=False,
def test_report_explicit_non_invariance_progressive_release(self):
"""progressive_release should generally yield non-zero cumulative shaping (release leak)."""
- params = base_params(
+ params = self.base_params(
hold_potential_enabled=True,
entry_additive_enabled=False,
exit_additive_enabled=False,
def test_gamma_extremes(self):
"""Gamma=0 and gamma≈1 boundary behaviours produce bounded shaping and finite potentials."""
for gamma in [0.0, 0.999999]:
- params = base_params(
+ params = self.base_params(
hold_potential_enabled=True,
entry_additive_enabled=False,
exit_additive_enabled=False,