From 73a1946d96adb983917bb82abc88c23a4509a1c1 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 19 Nov 2025 13:41:59 +0100 Subject: [PATCH] feat(qav3): add prediction extrema partition selection method MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../reward_space_analysis.py | 87 +++++++++++++------ .../test_reward_space_analysis_cli.py | 2 - .../components/test_reward_components.py | 5 +- .../tests/robustness/test_branch_coverage.py | 8 -- .../tests/robustness/test_robustness.py | 3 +- ReforceXY/user_data/freqaimodels/ReforceXY.py | 21 +++-- .../freqaimodels/QuickAdapterRegressorV3.py | 17 +++- .../user_data/strategies/QuickAdapterV3.py | 2 +- 8 files changed, 90 insertions(+), 55 deletions(-) diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index 7e0da69..936ec9d 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -18,7 +18,7 @@ import random import warnings from enum import Enum, IntEnum from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union import numpy as np import pandas as pd @@ -39,6 +39,13 @@ except Exception: train_test_split = None +AttenuationMode = Literal["sqrt", "linear", "power", "half_life"] +TransformFunction = Literal["tanh", "softsign", "arctan", "sigmoid", "clip", "asinh"] +ExitPotentialMode = Literal[ + "canonical", "non_canonical", "progressive_release", "spike_cancel", "retain_previous" +] + + class Actions(IntEnum): Neutral = 0 Long_enter = 1 @@ -78,6 +85,7 @@ INTERNAL_GUARDS: dict[str, float] = { "sim_zero_reward_epsilon": 1e-12, "sim_extreme_pnl_threshold": 0.2, "histogram_epsilon": 1e-10, + "distribution_identity_epsilon": 1e-12, } # PBRS constants @@ -97,6 +105,7 @@ ALLOWED_EXIT_POTENTIAL_MODES = { "retain_previous", } + DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { "invalid_action": -2.0, "base_factor": 100.0, @@ -242,6 +251,31 @@ class RewardDiagnosticsWarning(RuntimeWarning): pass +def _warn_unknown_mode( + mode_type: str, + provided_value: str, + valid_values: Iterable[str], + fallback_value: str, + stacklevel: int = 2, +) -> None: + """Emit standardized warning for unknown mode values. + + Args: + mode_type: Type of mode (e.g., "exit_attenuation_mode") + provided_value: The invalid value that was provided + valid_values: Iterable of valid values + fallback_value: The value being used as fallback + stacklevel: Stack level for warnings.warn + """ + valid_sorted = sorted(valid_values) + warnings.warn( + f"Unknown {mode_type} '{provided_value}'. " + f"Expected one of: {valid_sorted}. Falling back to '{fallback_value}'.", + RewardDiagnosticsWarning, + stacklevel=stacklevel, + ) + + def _to_bool(value: Any) -> bool: if isinstance(value, bool): return value @@ -524,20 +558,6 @@ def validate_reward_parameters( return sanitized, adjustments -def _normalize_and_validate_mode(params: RewardParams) -> None: - """Validate exit_attenuation_mode; silently fallback to 'linear' if invalid.""" - if "exit_attenuation_mode" not in params: - return - - exit_attenuation_mode = _get_str_param( - params, - "exit_attenuation_mode", - str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_attenuation_mode", "linear")), - ) - if exit_attenuation_mode not in ATTENUATION_MODES_WITH_LEGACY: - params["exit_attenuation_mode"] = "linear" - - def add_tunable_cli_args(parser: argparse.ArgumentParser) -> None: """Dynamically add CLI options for each tunable in DEFAULT_MODEL_REWARD_PARAMETERS. @@ -755,12 +775,11 @@ def _get_exit_factor( kernel = kernels.get(exit_attenuation_mode, None) if kernel is None: - warnings.warn( - ( - f"Unknown exit_attenuation_mode '{exit_attenuation_mode}'; defaulting to 'linear' " - f"(effective_dr={effective_dr:.5f})" - ), - RewardDiagnosticsWarning, + _warn_unknown_mode( + "exit_attenuation_mode", + exit_attenuation_mode, + ATTENUATION_MODES_WITH_LEGACY, + "linear", stacklevel=2, ) kernel = _linear_kernel @@ -2031,7 +2050,12 @@ def compute_distribution_shift_metrics( # Guard against degenerate distributions (all values identical) if not np.isfinite(min_val) or not np.isfinite(max_val): continue - if np.isclose(max_val, min_val, rtol=0, atol=1e-12): + if np.isclose( + max_val, + min_val, + rtol=0, + atol=float(INTERNAL_GUARDS.get("distribution_identity_epsilon", 1e-12)), + ): # All mass at a single point -> shift metrics are all zero by definition metrics[f"{feature}_kl_divergence"] = 0.0 metrics[f"{feature}_js_distance"] = 0.0 @@ -2567,7 +2591,7 @@ def _apply_transform_clip(value: float) -> float: return float(np.clip(value, -1.0, 1.0)) -def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float: +def apply_transform(transform_name: TransformFunction | str, value: float, **kwargs: Any) -> float: """Apply named transform; unknown names fallback to tanh with warning.""" transforms = { "tanh": _apply_transform_tanh, @@ -2579,9 +2603,11 @@ def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float: } if transform_name not in transforms: - warnings.warn( - f"Unknown potential transform '{transform_name}'; falling back to tanh", - RewardDiagnosticsWarning, + _warn_unknown_mode( + "potential_transform", + transform_name, + transforms.keys(), + "tanh", stacklevel=2, ) return _apply_transform_tanh(value) @@ -2723,6 +2749,13 @@ def _compute_exit_potential(last_potential: float, params: RewardParams) -> floa elif mode == "retain_previous": next_potential = last_potential else: + _warn_unknown_mode( + "exit_potential_mode", + mode, + sorted(ALLOWED_EXIT_POTENTIAL_MODES), + "canonical (via _fail_safely)", + stacklevel=2, + ) next_potential = _fail_safely("invalid_exit_potential_mode") if not np.isfinite(next_potential): @@ -3771,8 +3804,6 @@ def main() -> None: for k, v in adjustments.items() ] print("Parameter adjustments applied:\n" + "\n".join(adj_lines)) - # Normalize attenuation mode - _normalize_and_validate_mode(params) base_factor = _get_float_param(params, "base_factor", float(args.base_factor)) profit_target = _get_float_param(params, "profit_target", float(args.profit_target)) diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py index ec8bd2d..0fd48cc 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py @@ -104,8 +104,6 @@ def build_arg_matrix( max_scenarios: int = 40, shuffle_seed: Optional[int] = None, ) -> List[ConfigTuple]: - # Constants from reward_space_analysis.py - # ALLOWED_EXIT_POTENTIAL_MODES and ATTENUATION_MODES_WITH_LEGACY exit_potential_modes = [ "canonical", "non_canonical", diff --git a/ReforceXY/reward_space_analysis/tests/components/test_reward_components.py b/ReforceXY/reward_space_analysis/tests/components/test_reward_components.py index d7914dc..9bd41b9 100644 --- a/ReforceXY/reward_space_analysis/tests/components/test_reward_components.py +++ b/ReforceXY/reward_space_analysis/tests/components/test_reward_components.py @@ -16,6 +16,7 @@ from reward_space_analysis import ( calculate_reward, ) +from ..constants import PARAMS from ..helpers import ( RewardScenarioConfig, ThresholdTestConfig, @@ -243,7 +244,7 @@ class TestRewardComponents(RewardSpaceTestBase): context, params_large, base_factor=self.TEST_BASE_FACTOR, - profit_target=0.06, + profit_target=PARAMS.PROFIT_TARGET, risk_reward_ratio=self.TEST_RR, short_allowed=True, action_masking=True, @@ -404,7 +405,7 @@ class TestRewardComponents(RewardSpaceTestBase): - Proportional scaling with idle duration """ params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100) - base_factor = 90.0 + base_factor = PARAMS.BASE_FACTOR profit_target = self.TEST_PROFIT_TARGET risk_reward_ratio = 1.0 diff --git a/ReforceXY/reward_space_analysis/tests/robustness/test_branch_coverage.py b/ReforceXY/reward_space_analysis/tests/robustness/test_branch_coverage.py index d136fdd..fc062af 100644 --- a/ReforceXY/reward_space_analysis/tests/robustness/test_branch_coverage.py +++ b/ReforceXY/reward_space_analysis/tests/robustness/test_branch_coverage.py @@ -9,7 +9,6 @@ from reward_space_analysis import ( RewardDiagnosticsWarning, _get_exit_factor, _hold_penalty, - _normalize_and_validate_mode, validate_reward_parameters, ) @@ -59,13 +58,6 @@ def test_validate_reward_parameters_relaxed_adjustment_batch(): ) -@pytest.mark.robustness -def test_normalize_and_validate_mode_fallback(): - params = {"exit_attenuation_mode": "invalid_mode"} - _normalize_and_validate_mode(params) - assert params["exit_attenuation_mode"] == "linear" - - @pytest.mark.robustness def test_get_exit_factor_negative_plateau_grace_warning(): params = {"exit_attenuation_mode": "linear", "exit_plateau": True, "exit_plateau_grace": -1.0} diff --git a/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py b/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py index c89d7c8..3d5ae5e 100644 --- a/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py +++ b/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py @@ -18,6 +18,7 @@ from reward_space_analysis import ( simulate_samples, ) +from ..constants import PARAMS from ..helpers import ( assert_diagnostic_warning, assert_exit_factor_attenuation_modes, @@ -553,7 +554,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): exit_plateau_grace=-2.0, exit_linear_slope=1.2, ) - base_factor = 90.0 + base_factor = PARAMS.BASE_FACTOR pnl = 0.03 pnl_factor = 1.0 duration_ratio = 0.5 diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index b43f422..6a1edbd 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -74,6 +74,7 @@ from stable_baselines3.common.vec_env import ( ModelType = Literal["PPO", "RecurrentPPO", "MaskablePPO", "DQN", "QRDQN"] ScheduleType = Literal["linear", "constant", "unknown"] +ScheduleTypeKnown = Literal["linear", "constant"] # Subset for get_schedule() function ExitPotentialMode = Literal[ "canonical", "non_canonical", @@ -495,7 +496,7 @@ class ReforceXY(BaseReinforcementLearningModel): if isinstance(lr, (int, float)): lr = float(lr) model_params["learning_rate"] = get_schedule( - self._SCHEDULE_TYPES[0], lr + cast(ScheduleTypeKnown, self._SCHEDULE_TYPES[0]), lr ) logger.info( "Learning rate linear schedule enabled, initial value: %s", lr @@ -510,7 +511,9 @@ class ReforceXY(BaseReinforcementLearningModel): cr = model_params.get("clip_range", 0.2) if isinstance(cr, (int, float)): cr = float(cr) - model_params["clip_range"] = get_schedule(self._SCHEDULE_TYPES[0], cr) + model_params["clip_range"] = get_schedule( + cast(ScheduleTypeKnown, self._SCHEDULE_TYPES[0]), cr + ) logger.info("Clip range linear schedule enabled, initial value: %s", cr) # "DQN" @@ -529,7 +532,7 @@ class ReforceXY(BaseReinforcementLearningModel): net_arch: Union[ List[int], Dict[str, List[int]], - Literal["small", "medium", "large", "extra_large"], + NetArchSize, ] = model_params.get("policy_kwargs", {}).get("net_arch", default_net_arch) # "PPO" @@ -3737,7 +3740,7 @@ def steps_to_days(steps: int, timeframe: str) -> float: def get_schedule_type( schedule: Any, -) -> Tuple[Literal["constant", "linear", "unknown"], float, float]: +) -> Tuple[ScheduleType, float, float]: if isinstance(schedule, (int, float)): try: schedule = float(schedule) @@ -3767,12 +3770,12 @@ def get_schedule_type( def get_schedule( - schedule_type: Literal["linear", "constant"], + schedule_type: ScheduleTypeKnown, initial_value: float, ) -> Callable[[float], float]: - if schedule_type == ReforceXY._SCHEDULE_TYPES[0]: + if schedule_type == ReforceXY._SCHEDULE_TYPES[0]: # "linear" return SimpleLinearSchedule(initial_value) - elif schedule_type == ReforceXY._SCHEDULE_TYPES[1]: + elif schedule_type == ReforceXY._SCHEDULE_TYPES[1]: # "constant" return ConstantSchedule(initial_value) else: return ConstantSchedule(initial_value) @@ -3789,11 +3792,11 @@ def get_net_arch( ReforceXY._NET_ARCH_SIZES[0]: { "pi": [128, 128], "vf": [128, 128], - }, # ReforceXY._NET_ARCH_SIZES[0] + }, # "small" ReforceXY._NET_ARCH_SIZES[1]: { "pi": [256, 256], "vf": [256, 256], - }, # ReforceXY._NET_ARCH_SIZES[1] + }, # "medium" ReforceXY._NET_ARCH_SIZES[2]: { "pi": [512, 512], "vf": [512, 512], diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 884a6a0..e214346 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -33,7 +33,7 @@ from Utils import ( zigzag, ) -ExtremaSelectionMethod = Literal["peak_values", "extrema_rank"] +ExtremaSelectionMethod = Literal["peak_values", "extrema_rank", "partition"] OptunaNamespace = Literal["hp", "train", "label"] debug = False @@ -73,6 +73,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): _EXTREMA_SELECTION_METHODS: Final[tuple[ExtremaSelectionMethod, ...]] = ( "peak_values", "extrema_rank", + "partition", ) _OPTUNA_STORAGE_BACKENDS: Final[tuple[str, ...]] = ("sqlite", "file") _OPTUNA_SAMPLERS: Final[tuple[str, ...]] = ("tpe", "auto") @@ -763,10 +764,10 @@ class QuickAdapterRegressorV3(BaseRegressionModel): if pred_extrema.empty: return pd.Series(dtype=float), pd.Series(dtype=float) - minima_indices = sp.signal.find_peaks(-pred_extrema)[0] - maxima_indices = sp.signal.find_peaks(pred_extrema)[0] - if extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[0]: + minima_indices = sp.signal.find_peaks(-pred_extrema)[0] + maxima_indices = sp.signal.find_peaks(pred_extrema)[0] + pred_minima = ( pred_extrema.iloc[minima_indices] if minima_indices.size > 0 @@ -778,6 +779,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel): else pd.Series(dtype=float) ) elif extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[1]: + minima_indices = sp.signal.find_peaks(-pred_extrema)[0] + maxima_indices = sp.signal.find_peaks(pred_extrema)[0] + n_minima = minima_indices.size n_maxima = maxima_indices.size @@ -790,6 +794,11 @@ class QuickAdapterRegressorV3(BaseRegressionModel): pred_maxima = pred_extrema.nlargest(n_maxima) else: pred_maxima = pd.Series(dtype=float) + elif extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[2]: + eps = np.finfo(float).eps + + pred_maxima = pred_extrema[pred_extrema > eps] + pred_minima = pred_extrema[pred_extrema < eps] else: raise ValueError( f"Unsupported extrema selection method: {extrema_selection}. " diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 4821de1..a7ac5ef 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -47,7 +47,7 @@ TradingMode = Literal["spot", "margin", "futures"] DfSignature = Tuple[int, Optional[datetime.datetime]] CandleDeviationCacheKey = Tuple[ - str, DfSignature, float, float, int, Literal["direct", "inverse"], float + str, DfSignature, float, float, int, InterpolationDirection, float ] CandleThresholdCacheKey = Tuple[str, DfSignature, str, int, float, float] -- 2.43.0