import warnings
from enum import Enum, IntEnum
from pathlib import Path
-from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
+from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union
import numpy as np
import pandas as pd
train_test_split = None
+AttenuationMode = Literal["sqrt", "linear", "power", "half_life"]
+TransformFunction = Literal["tanh", "softsign", "arctan", "sigmoid", "clip", "asinh"]
+ExitPotentialMode = Literal[
+ "canonical", "non_canonical", "progressive_release", "spike_cancel", "retain_previous"
+]
+
+
class Actions(IntEnum):
Neutral = 0
Long_enter = 1
"sim_zero_reward_epsilon": 1e-12,
"sim_extreme_pnl_threshold": 0.2,
"histogram_epsilon": 1e-10,
+ "distribution_identity_epsilon": 1e-12,
}
# PBRS constants
"retain_previous",
}
+
DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = {
"invalid_action": -2.0,
"base_factor": 100.0,
pass
+def _warn_unknown_mode(
+ mode_type: str,
+ provided_value: str,
+ valid_values: Iterable[str],
+ fallback_value: str,
+ stacklevel: int = 2,
+) -> None:
+ """Emit standardized warning for unknown mode values.
+
+ Args:
+ mode_type: Type of mode (e.g., "exit_attenuation_mode")
+ provided_value: The invalid value that was provided
+ valid_values: Iterable of valid values
+ fallback_value: The value being used as fallback
+ stacklevel: Stack level for warnings.warn
+ """
+ valid_sorted = sorted(valid_values)
+ warnings.warn(
+ f"Unknown {mode_type} '{provided_value}'. "
+ f"Expected one of: {valid_sorted}. Falling back to '{fallback_value}'.",
+ RewardDiagnosticsWarning,
+ stacklevel=stacklevel,
+ )
+
+
def _to_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
return sanitized, adjustments
-def _normalize_and_validate_mode(params: RewardParams) -> None:
- """Validate exit_attenuation_mode; silently fallback to 'linear' if invalid."""
- if "exit_attenuation_mode" not in params:
- return
-
- exit_attenuation_mode = _get_str_param(
- params,
- "exit_attenuation_mode",
- str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_attenuation_mode", "linear")),
- )
- if exit_attenuation_mode not in ATTENUATION_MODES_WITH_LEGACY:
- params["exit_attenuation_mode"] = "linear"
-
-
def add_tunable_cli_args(parser: argparse.ArgumentParser) -> None:
"""Dynamically add CLI options for each tunable in DEFAULT_MODEL_REWARD_PARAMETERS.
kernel = kernels.get(exit_attenuation_mode, None)
if kernel is None:
- warnings.warn(
- (
- f"Unknown exit_attenuation_mode '{exit_attenuation_mode}'; defaulting to 'linear' "
- f"(effective_dr={effective_dr:.5f})"
- ),
- RewardDiagnosticsWarning,
+ _warn_unknown_mode(
+ "exit_attenuation_mode",
+ exit_attenuation_mode,
+ ATTENUATION_MODES_WITH_LEGACY,
+ "linear",
stacklevel=2,
)
kernel = _linear_kernel
# Guard against degenerate distributions (all values identical)
if not np.isfinite(min_val) or not np.isfinite(max_val):
continue
- if np.isclose(max_val, min_val, rtol=0, atol=1e-12):
+ if np.isclose(
+ max_val,
+ min_val,
+ rtol=0,
+ atol=float(INTERNAL_GUARDS.get("distribution_identity_epsilon", 1e-12)),
+ ):
# All mass at a single point -> shift metrics are all zero by definition
metrics[f"{feature}_kl_divergence"] = 0.0
metrics[f"{feature}_js_distance"] = 0.0
return float(np.clip(value, -1.0, 1.0))
-def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float:
+def apply_transform(transform_name: TransformFunction | str, value: float, **kwargs: Any) -> float:
"""Apply named transform; unknown names fallback to tanh with warning."""
transforms = {
"tanh": _apply_transform_tanh,
}
if transform_name not in transforms:
- warnings.warn(
- f"Unknown potential transform '{transform_name}'; falling back to tanh",
- RewardDiagnosticsWarning,
+ _warn_unknown_mode(
+ "potential_transform",
+ transform_name,
+ transforms.keys(),
+ "tanh",
stacklevel=2,
)
return _apply_transform_tanh(value)
elif mode == "retain_previous":
next_potential = last_potential
else:
+ _warn_unknown_mode(
+ "exit_potential_mode",
+ mode,
+ sorted(ALLOWED_EXIT_POTENTIAL_MODES),
+ "canonical (via _fail_safely)",
+ stacklevel=2,
+ )
next_potential = _fail_safely("invalid_exit_potential_mode")
if not np.isfinite(next_potential):
for k, v in adjustments.items()
]
print("Parameter adjustments applied:\n" + "\n".join(adj_lines))
- # Normalize attenuation mode
- _normalize_and_validate_mode(params)
base_factor = _get_float_param(params, "base_factor", float(args.base_factor))
profit_target = _get_float_param(params, "profit_target", float(args.profit_target))
max_scenarios: int = 40,
shuffle_seed: Optional[int] = None,
) -> List[ConfigTuple]:
- # Constants from reward_space_analysis.py
- # ALLOWED_EXIT_POTENTIAL_MODES and ATTENUATION_MODES_WITH_LEGACY
exit_potential_modes = [
"canonical",
"non_canonical",
calculate_reward,
)
+from ..constants import PARAMS
from ..helpers import (
RewardScenarioConfig,
ThresholdTestConfig,
context,
params_large,
base_factor=self.TEST_BASE_FACTOR,
- profit_target=0.06,
+ profit_target=PARAMS.PROFIT_TARGET,
risk_reward_ratio=self.TEST_RR,
short_allowed=True,
action_masking=True,
- Proportional scaling with idle duration
"""
params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100)
- base_factor = 90.0
+ base_factor = PARAMS.BASE_FACTOR
profit_target = self.TEST_PROFIT_TARGET
risk_reward_ratio = 1.0
RewardDiagnosticsWarning,
_get_exit_factor,
_hold_penalty,
- _normalize_and_validate_mode,
validate_reward_parameters,
)
)
-@pytest.mark.robustness
-def test_normalize_and_validate_mode_fallback():
- params = {"exit_attenuation_mode": "invalid_mode"}
- _normalize_and_validate_mode(params)
- assert params["exit_attenuation_mode"] == "linear"
-
-
@pytest.mark.robustness
def test_get_exit_factor_negative_plateau_grace_warning():
params = {"exit_attenuation_mode": "linear", "exit_plateau": True, "exit_plateau_grace": -1.0}
simulate_samples,
)
+from ..constants import PARAMS
from ..helpers import (
assert_diagnostic_warning,
assert_exit_factor_attenuation_modes,
exit_plateau_grace=-2.0,
exit_linear_slope=1.2,
)
- base_factor = 90.0
+ base_factor = PARAMS.BASE_FACTOR
pnl = 0.03
pnl_factor = 1.0
duration_ratio = 0.5
ModelType = Literal["PPO", "RecurrentPPO", "MaskablePPO", "DQN", "QRDQN"]
ScheduleType = Literal["linear", "constant", "unknown"]
+ScheduleTypeKnown = Literal["linear", "constant"] # Subset for get_schedule() function
ExitPotentialMode = Literal[
"canonical",
"non_canonical",
if isinstance(lr, (int, float)):
lr = float(lr)
model_params["learning_rate"] = get_schedule(
- self._SCHEDULE_TYPES[0], lr
+ cast(ScheduleTypeKnown, self._SCHEDULE_TYPES[0]), lr
)
logger.info(
"Learning rate linear schedule enabled, initial value: %s", lr
cr = model_params.get("clip_range", 0.2)
if isinstance(cr, (int, float)):
cr = float(cr)
- model_params["clip_range"] = get_schedule(self._SCHEDULE_TYPES[0], cr)
+ model_params["clip_range"] = get_schedule(
+ cast(ScheduleTypeKnown, self._SCHEDULE_TYPES[0]), cr
+ )
logger.info("Clip range linear schedule enabled, initial value: %s", cr)
# "DQN"
net_arch: Union[
List[int],
Dict[str, List[int]],
- Literal["small", "medium", "large", "extra_large"],
+ NetArchSize,
] = model_params.get("policy_kwargs", {}).get("net_arch", default_net_arch)
# "PPO"
def get_schedule_type(
schedule: Any,
-) -> Tuple[Literal["constant", "linear", "unknown"], float, float]:
+) -> Tuple[ScheduleType, float, float]:
if isinstance(schedule, (int, float)):
try:
schedule = float(schedule)
def get_schedule(
- schedule_type: Literal["linear", "constant"],
+ schedule_type: ScheduleTypeKnown,
initial_value: float,
) -> Callable[[float], float]:
- if schedule_type == ReforceXY._SCHEDULE_TYPES[0]:
+ if schedule_type == ReforceXY._SCHEDULE_TYPES[0]: # "linear"
return SimpleLinearSchedule(initial_value)
- elif schedule_type == ReforceXY._SCHEDULE_TYPES[1]:
+ elif schedule_type == ReforceXY._SCHEDULE_TYPES[1]: # "constant"
return ConstantSchedule(initial_value)
else:
return ConstantSchedule(initial_value)
ReforceXY._NET_ARCH_SIZES[0]: {
"pi": [128, 128],
"vf": [128, 128],
- }, # ReforceXY._NET_ARCH_SIZES[0]
+ }, # "small"
ReforceXY._NET_ARCH_SIZES[1]: {
"pi": [256, 256],
"vf": [256, 256],
- }, # ReforceXY._NET_ARCH_SIZES[1]
+ }, # "medium"
ReforceXY._NET_ARCH_SIZES[2]: {
"pi": [512, 512],
"vf": [512, 512],
zigzag,
)
-ExtremaSelectionMethod = Literal["peak_values", "extrema_rank"]
+ExtremaSelectionMethod = Literal["peak_values", "extrema_rank", "partition"]
OptunaNamespace = Literal["hp", "train", "label"]
debug = False
_EXTREMA_SELECTION_METHODS: Final[tuple[ExtremaSelectionMethod, ...]] = (
"peak_values",
"extrema_rank",
+ "partition",
)
_OPTUNA_STORAGE_BACKENDS: Final[tuple[str, ...]] = ("sqlite", "file")
_OPTUNA_SAMPLERS: Final[tuple[str, ...]] = ("tpe", "auto")
if pred_extrema.empty:
return pd.Series(dtype=float), pd.Series(dtype=float)
- minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
- maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
-
if extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[0]:
+ minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
+ maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
+
pred_minima = (
pred_extrema.iloc[minima_indices]
if minima_indices.size > 0
else pd.Series(dtype=float)
)
elif extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[1]:
+ minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
+ maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
+
n_minima = minima_indices.size
n_maxima = maxima_indices.size
pred_maxima = pred_extrema.nlargest(n_maxima)
else:
pred_maxima = pd.Series(dtype=float)
+ elif extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[2]:
+ eps = np.finfo(float).eps
+
+ pred_maxima = pred_extrema[pred_extrema > eps]
+ pred_minima = pred_extrema[pred_extrema < eps]
else:
raise ValueError(
f"Unsupported extrema selection method: {extrema_selection}. "
DfSignature = Tuple[int, Optional[datetime.datetime]]
CandleDeviationCacheKey = Tuple[
- str, DfSignature, float, float, int, Literal["direct", "inverse"], float
+ str, DfSignature, float, float, int, InterpolationDirection, float
]
CandleThresholdCacheKey = Tuple[str, DfSignature, str, int, float, float]