]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
feat(qav3): add prediction extrema partition selection method
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 19 Nov 2025 12:41:59 +0000 (13:41 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 19 Nov 2025 12:41:59 +0000 (13:41 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/reward_space_analysis/reward_space_analysis.py
ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py
ReforceXY/reward_space_analysis/tests/components/test_reward_components.py
ReforceXY/reward_space_analysis/tests/robustness/test_branch_coverage.py
ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py
ReforceXY/user_data/freqaimodels/ReforceXY.py
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py

index 7e0da6949073cd6ab97f60e9ac0f7c7a7697b7f0..936ec9d92afff3c888e100a09a040425f90c6206 100644 (file)
@@ -18,7 +18,7 @@ import random
 import warnings
 from enum import Enum, IntEnum
 from pathlib import Path
-from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
+from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union
 
 import numpy as np
 import pandas as pd
@@ -39,6 +39,13 @@ except Exception:
     train_test_split = None
 
 
+AttenuationMode = Literal["sqrt", "linear", "power", "half_life"]
+TransformFunction = Literal["tanh", "softsign", "arctan", "sigmoid", "clip", "asinh"]
+ExitPotentialMode = Literal[
+    "canonical", "non_canonical", "progressive_release", "spike_cancel", "retain_previous"
+]
+
+
 class Actions(IntEnum):
     Neutral = 0
     Long_enter = 1
@@ -78,6 +85,7 @@ INTERNAL_GUARDS: dict[str, float] = {
     "sim_zero_reward_epsilon": 1e-12,
     "sim_extreme_pnl_threshold": 0.2,
     "histogram_epsilon": 1e-10,
+    "distribution_identity_epsilon": 1e-12,
 }
 
 # PBRS constants
@@ -97,6 +105,7 @@ ALLOWED_EXIT_POTENTIAL_MODES = {
     "retain_previous",
 }
 
+
 DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = {
     "invalid_action": -2.0,
     "base_factor": 100.0,
@@ -242,6 +251,31 @@ class RewardDiagnosticsWarning(RuntimeWarning):
     pass
 
 
+def _warn_unknown_mode(
+    mode_type: str,
+    provided_value: str,
+    valid_values: Iterable[str],
+    fallback_value: str,
+    stacklevel: int = 2,
+) -> None:
+    """Emit standardized warning for unknown mode values.
+
+    Args:
+        mode_type: Type of mode (e.g., "exit_attenuation_mode")
+        provided_value: The invalid value that was provided
+        valid_values: Iterable of valid values
+        fallback_value: The value being used as fallback
+        stacklevel: Stack level for warnings.warn
+    """
+    valid_sorted = sorted(valid_values)
+    warnings.warn(
+        f"Unknown {mode_type} '{provided_value}'. "
+        f"Expected one of: {valid_sorted}. Falling back to '{fallback_value}'.",
+        RewardDiagnosticsWarning,
+        stacklevel=stacklevel,
+    )
+
+
 def _to_bool(value: Any) -> bool:
     if isinstance(value, bool):
         return value
@@ -524,20 +558,6 @@ def validate_reward_parameters(
     return sanitized, adjustments
 
 
-def _normalize_and_validate_mode(params: RewardParams) -> None:
-    """Validate exit_attenuation_mode; silently fallback to 'linear' if invalid."""
-    if "exit_attenuation_mode" not in params:
-        return
-
-    exit_attenuation_mode = _get_str_param(
-        params,
-        "exit_attenuation_mode",
-        str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_attenuation_mode", "linear")),
-    )
-    if exit_attenuation_mode not in ATTENUATION_MODES_WITH_LEGACY:
-        params["exit_attenuation_mode"] = "linear"
-
-
 def add_tunable_cli_args(parser: argparse.ArgumentParser) -> None:
     """Dynamically add CLI options for each tunable in DEFAULT_MODEL_REWARD_PARAMETERS.
 
@@ -755,12 +775,11 @@ def _get_exit_factor(
 
     kernel = kernels.get(exit_attenuation_mode, None)
     if kernel is None:
-        warnings.warn(
-            (
-                f"Unknown exit_attenuation_mode '{exit_attenuation_mode}'; defaulting to 'linear' "
-                f"(effective_dr={effective_dr:.5f})"
-            ),
-            RewardDiagnosticsWarning,
+        _warn_unknown_mode(
+            "exit_attenuation_mode",
+            exit_attenuation_mode,
+            ATTENUATION_MODES_WITH_LEGACY,
+            "linear",
             stacklevel=2,
         )
         kernel = _linear_kernel
@@ -2031,7 +2050,12 @@ def compute_distribution_shift_metrics(
         # Guard against degenerate distributions (all values identical)
         if not np.isfinite(min_val) or not np.isfinite(max_val):
             continue
-        if np.isclose(max_val, min_val, rtol=0, atol=1e-12):
+        if np.isclose(
+            max_val,
+            min_val,
+            rtol=0,
+            atol=float(INTERNAL_GUARDS.get("distribution_identity_epsilon", 1e-12)),
+        ):
             # All mass at a single point -> shift metrics are all zero by definition
             metrics[f"{feature}_kl_divergence"] = 0.0
             metrics[f"{feature}_js_distance"] = 0.0
@@ -2567,7 +2591,7 @@ def _apply_transform_clip(value: float) -> float:
     return float(np.clip(value, -1.0, 1.0))
 
 
-def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float:
+def apply_transform(transform_name: TransformFunction | str, value: float, **kwargs: Any) -> float:
     """Apply named transform; unknown names fallback to tanh with warning."""
     transforms = {
         "tanh": _apply_transform_tanh,
@@ -2579,9 +2603,11 @@ def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float:
     }
 
     if transform_name not in transforms:
-        warnings.warn(
-            f"Unknown potential transform '{transform_name}'; falling back to tanh",
-            RewardDiagnosticsWarning,
+        _warn_unknown_mode(
+            "potential_transform",
+            transform_name,
+            transforms.keys(),
+            "tanh",
             stacklevel=2,
         )
         return _apply_transform_tanh(value)
@@ -2723,6 +2749,13 @@ def _compute_exit_potential(last_potential: float, params: RewardParams) -> floa
     elif mode == "retain_previous":
         next_potential = last_potential
     else:
+        _warn_unknown_mode(
+            "exit_potential_mode",
+            mode,
+            sorted(ALLOWED_EXIT_POTENTIAL_MODES),
+            "canonical (via _fail_safely)",
+            stacklevel=2,
+        )
         next_potential = _fail_safely("invalid_exit_potential_mode")
 
     if not np.isfinite(next_potential):
@@ -3771,8 +3804,6 @@ def main() -> None:
             for k, v in adjustments.items()
         ]
         print("Parameter adjustments applied:\n" + "\n".join(adj_lines))
-    # Normalize attenuation mode
-    _normalize_and_validate_mode(params)
 
     base_factor = _get_float_param(params, "base_factor", float(args.base_factor))
     profit_target = _get_float_param(params, "profit_target", float(args.profit_target))
index ec8bd2dbe7921e7e8beb125db49425c11275e201..0fd48cc962d381eaeef160e5781356d939f1791b 100644 (file)
@@ -104,8 +104,6 @@ def build_arg_matrix(
     max_scenarios: int = 40,
     shuffle_seed: Optional[int] = None,
 ) -> List[ConfigTuple]:
-    # Constants from reward_space_analysis.py
-    # ALLOWED_EXIT_POTENTIAL_MODES and ATTENUATION_MODES_WITH_LEGACY
     exit_potential_modes = [
         "canonical",
         "non_canonical",
index d7914dcfd20bc5ad0f703210691db105699a8d7e..9bd41b97cf4ba3e4d3210cfe787924eb43aa5c90 100644 (file)
@@ -16,6 +16,7 @@ from reward_space_analysis import (
     calculate_reward,
 )
 
+from ..constants import PARAMS
 from ..helpers import (
     RewardScenarioConfig,
     ThresholdTestConfig,
@@ -243,7 +244,7 @@ class TestRewardComponents(RewardSpaceTestBase):
             context,
             params_large,
             base_factor=self.TEST_BASE_FACTOR,
-            profit_target=0.06,
+            profit_target=PARAMS.PROFIT_TARGET,
             risk_reward_ratio=self.TEST_RR,
             short_allowed=True,
             action_masking=True,
@@ -404,7 +405,7 @@ class TestRewardComponents(RewardSpaceTestBase):
         - Proportional scaling with idle duration
         """
         params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100)
-        base_factor = 90.0
+        base_factor = PARAMS.BASE_FACTOR
         profit_target = self.TEST_PROFIT_TARGET
         risk_reward_ratio = 1.0
 
index d136fddcf5f043475ff492e2dd45891848f6766e..fc062af2681528f0a0a583f3f218dab99e8f6485 100644 (file)
@@ -9,7 +9,6 @@ from reward_space_analysis import (
     RewardDiagnosticsWarning,
     _get_exit_factor,
     _hold_penalty,
-    _normalize_and_validate_mode,
     validate_reward_parameters,
 )
 
@@ -59,13 +58,6 @@ def test_validate_reward_parameters_relaxed_adjustment_batch():
     )
 
 
-@pytest.mark.robustness
-def test_normalize_and_validate_mode_fallback():
-    params = {"exit_attenuation_mode": "invalid_mode"}
-    _normalize_and_validate_mode(params)
-    assert params["exit_attenuation_mode"] == "linear"
-
-
 @pytest.mark.robustness
 def test_get_exit_factor_negative_plateau_grace_warning():
     params = {"exit_attenuation_mode": "linear", "exit_plateau": True, "exit_plateau_grace": -1.0}
index c89d7c884bb7e96cd83d7567975d1ffb10a30d24..3d5ae5eb4be8bce8516d008941bd11d888803ce8 100644 (file)
@@ -18,6 +18,7 @@ from reward_space_analysis import (
     simulate_samples,
 )
 
+from ..constants import PARAMS
 from ..helpers import (
     assert_diagnostic_warning,
     assert_exit_factor_attenuation_modes,
@@ -553,7 +554,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
             exit_plateau_grace=-2.0,
             exit_linear_slope=1.2,
         )
-        base_factor = 90.0
+        base_factor = PARAMS.BASE_FACTOR
         pnl = 0.03
         pnl_factor = 1.0
         duration_ratio = 0.5
index b43f422ca0add068e88b4677a1f2bd05ce848197..6a1edbdf19038abdbb31cf75be37f5712c672251 100644 (file)
@@ -74,6 +74,7 @@ from stable_baselines3.common.vec_env import (
 
 ModelType = Literal["PPO", "RecurrentPPO", "MaskablePPO", "DQN", "QRDQN"]
 ScheduleType = Literal["linear", "constant", "unknown"]
+ScheduleTypeKnown = Literal["linear", "constant"]  # Subset for get_schedule() function
 ExitPotentialMode = Literal[
     "canonical",
     "non_canonical",
@@ -495,7 +496,7 @@ class ReforceXY(BaseReinforcementLearningModel):
             if isinstance(lr, (int, float)):
                 lr = float(lr)
                 model_params["learning_rate"] = get_schedule(
-                    self._SCHEDULE_TYPES[0], lr
+                    cast(ScheduleTypeKnown, self._SCHEDULE_TYPES[0]), lr
                 )
                 logger.info(
                     "Learning rate linear schedule enabled, initial value: %s", lr
@@ -510,7 +511,9 @@ class ReforceXY(BaseReinforcementLearningModel):
             cr = model_params.get("clip_range", 0.2)
             if isinstance(cr, (int, float)):
                 cr = float(cr)
-                model_params["clip_range"] = get_schedule(self._SCHEDULE_TYPES[0], cr)
+                model_params["clip_range"] = get_schedule(
+                    cast(ScheduleTypeKnown, self._SCHEDULE_TYPES[0]), cr
+                )
                 logger.info("Clip range linear schedule enabled, initial value: %s", cr)
 
         # "DQN"
@@ -529,7 +532,7 @@ class ReforceXY(BaseReinforcementLearningModel):
         net_arch: Union[
             List[int],
             Dict[str, List[int]],
-            Literal["small", "medium", "large", "extra_large"],
+            NetArchSize,
         ] = model_params.get("policy_kwargs", {}).get("net_arch", default_net_arch)
 
         # "PPO"
@@ -3737,7 +3740,7 @@ def steps_to_days(steps: int, timeframe: str) -> float:
 
 def get_schedule_type(
     schedule: Any,
-) -> Tuple[Literal["constant", "linear", "unknown"], float, float]:
+) -> Tuple[ScheduleType, float, float]:
     if isinstance(schedule, (int, float)):
         try:
             schedule = float(schedule)
@@ -3767,12 +3770,12 @@ def get_schedule_type(
 
 
 def get_schedule(
-    schedule_type: Literal["linear", "constant"],
+    schedule_type: ScheduleTypeKnown,
     initial_value: float,
 ) -> Callable[[float], float]:
-    if schedule_type == ReforceXY._SCHEDULE_TYPES[0]:
+    if schedule_type == ReforceXY._SCHEDULE_TYPES[0]:  # "linear"
         return SimpleLinearSchedule(initial_value)
-    elif schedule_type == ReforceXY._SCHEDULE_TYPES[1]:
+    elif schedule_type == ReforceXY._SCHEDULE_TYPES[1]:  # "constant"
         return ConstantSchedule(initial_value)
     else:
         return ConstantSchedule(initial_value)
@@ -3789,11 +3792,11 @@ def get_net_arch(
             ReforceXY._NET_ARCH_SIZES[0]: {
                 "pi": [128, 128],
                 "vf": [128, 128],
-            },  # ReforceXY._NET_ARCH_SIZES[0]
+            },  # "small"
             ReforceXY._NET_ARCH_SIZES[1]: {
                 "pi": [256, 256],
                 "vf": [256, 256],
-            },  # ReforceXY._NET_ARCH_SIZES[1]
+            },  # "medium"
             ReforceXY._NET_ARCH_SIZES[2]: {
                 "pi": [512, 512],
                 "vf": [512, 512],
index 884a6a046cbef1d92774638468b3cdb32862ca7e..e2143465da756f8086ffd423b1bc3bc65573a149 100644 (file)
@@ -33,7 +33,7 @@ from Utils import (
     zigzag,
 )
 
-ExtremaSelectionMethod = Literal["peak_values", "extrema_rank"]
+ExtremaSelectionMethod = Literal["peak_values", "extrema_rank", "partition"]
 OptunaNamespace = Literal["hp", "train", "label"]
 
 debug = False
@@ -73,6 +73,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     _EXTREMA_SELECTION_METHODS: Final[tuple[ExtremaSelectionMethod, ...]] = (
         "peak_values",
         "extrema_rank",
+        "partition",
     )
     _OPTUNA_STORAGE_BACKENDS: Final[tuple[str, ...]] = ("sqlite", "file")
     _OPTUNA_SAMPLERS: Final[tuple[str, ...]] = ("tpe", "auto")
@@ -763,10 +764,10 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         if pred_extrema.empty:
             return pd.Series(dtype=float), pd.Series(dtype=float)
 
-        minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
-        maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
-
         if extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[0]:
+            minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
+            maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
+
             pred_minima = (
                 pred_extrema.iloc[minima_indices]
                 if minima_indices.size > 0
@@ -778,6 +779,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                 else pd.Series(dtype=float)
             )
         elif extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[1]:
+            minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
+            maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
+
             n_minima = minima_indices.size
             n_maxima = maxima_indices.size
 
@@ -790,6 +794,11 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                 pred_maxima = pred_extrema.nlargest(n_maxima)
             else:
                 pred_maxima = pd.Series(dtype=float)
+        elif extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[2]:
+            eps = np.finfo(float).eps
+
+            pred_maxima = pred_extrema[pred_extrema > eps]
+            pred_minima = pred_extrema[pred_extrema < eps]
         else:
             raise ValueError(
                 f"Unsupported extrema selection method: {extrema_selection}. "
index 4821de199cba25690f18a6754b2cd8a68e7988e3..a7ac5efe306f29f2b08366a1043e629d1d4719de 100644 (file)
@@ -47,7 +47,7 @@ TradingMode = Literal["spot", "margin", "futures"]
 
 DfSignature = Tuple[int, Optional[datetime.datetime]]
 CandleDeviationCacheKey = Tuple[
-    str, DfSignature, float, float, int, Literal["direct", "inverse"], float
+    str, DfSignature, float, float, int, InterpolationDirection, float
 ]
 CandleThresholdCacheKey = Tuple[str, DfSignature, str, int, float, float]