]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(quickadapter): cleanup PnL momentum declining trade exit logic
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 26 Dec 2025 14:19:55 +0000 (15:19 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 26 Dec 2025 14:19:55 +0000 (15:19 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/strategies/QuickAdapterV3.py

index 1a18924bba4ff7577a63a284a6fa7c816e9a1aba..6f0f58b9975168c8bdd2600386d6fa7370bb290b 100644 (file)
@@ -23,7 +23,7 @@ from freqtrade.persistence import Trade
 from freqtrade.strategy import AnnotationType, stoploss_from_absolute
 from freqtrade.strategy.interface import IStrategy
 from pandas import DataFrame, Series, isna
-from scipy.stats import t
+from scipy.stats import pearsonr, t
 from technical.pivots_points import pivots_points
 
 from Utils import (
@@ -115,8 +115,8 @@ class QuickAdapterV3(IStrategy):
     use_custom_stoploss = True
 
     default_exit_thresholds: ClassVar[dict[str, float]] = {
-        "k_decl_v": 0.6,
-        "k_decl_a": 0.4,
+        "t_decl_v": 0.675,
+        "t_decl_a": 0.675,
     }
 
     default_exit_thresholds_calibration: ClassVar[dict[str, float]] = {
@@ -2078,61 +2078,72 @@ class QuickAdapterV3(IStrategy):
     @staticmethod
     def get_pnl_momentum(
         unrealized_pnl_history: Sequence[float], window_size: int
-    ) -> tuple[float, float, float, float, float, float, float, float]:
-        unrealized_pnl_history = np.asarray(unrealized_pnl_history)
+    ) -> tuple[
+        tuple[float, ...],
+        float,
+        float,
+        tuple[float, ...],
+        float,
+        float,
+    ]:
+        """Compute velocity and acceleration from PnL history.
+
+        Velocity is the first derivative of PnL, acceleration is the second.
+
+        Args:
+            unrealized_pnl_history: PnL values sequence.
+            window_size: Recent window size (0 = no windowing).
+
+        Returns:
+            (velocity_values, velocity_mean, velocity_std,
+             acceleration_values, acceleration_mean, acceleration_std)
+        """
+        unrealized_pnl_history_array = np.asarray(unrealized_pnl_history)
+
+        if window_size > 0 and len(unrealized_pnl_history_array) > window_size:
+            unrealized_pnl_history_array = unrealized_pnl_history_array[-window_size:]
 
-        velocity = np.diff(unrealized_pnl_history)
+        velocity = np.diff(unrealized_pnl_history_array)
+        velocity_mean = np.nanmean(velocity) if velocity.size > 0 else 0.0
         velocity_std = np.nanstd(velocity, ddof=1) if velocity.size > 1 else 0.0
+
         acceleration = np.diff(velocity)
+        acceleration_mean = np.nanmean(acceleration) if acceleration.size > 0 else 0.0
         acceleration_std = (
             np.nanstd(acceleration, ddof=1) if acceleration.size > 1 else 0.0
         )
 
-        mean_velocity = np.nanmean(velocity) if velocity.size > 0 else 0.0
-        mean_acceleration = np.nanmean(acceleration) if acceleration.size > 0 else 0.0
-
-        if window_size > 0 and len(unrealized_pnl_history) > window_size:
-            recent_unrealized_pnl_history = unrealized_pnl_history[-window_size:]
-        else:
-            recent_unrealized_pnl_history = unrealized_pnl_history
-
-        recent_velocity = np.diff(recent_unrealized_pnl_history)
-        recent_velocity_std = (
-            np.nanstd(recent_velocity, ddof=1) if recent_velocity.size > 1 else 0.0
-        )
-        recent_acceleration = np.diff(recent_velocity)
-        recent_acceleration_std = (
-            np.nanstd(recent_acceleration, ddof=1)
-            if recent_acceleration.size > 1
-            else 0.0
-        )
-
-        recent_mean_velocity = (
-            np.nanmean(recent_velocity) if recent_velocity.size > 0 else 0.0
-        )
-        recent_mean_acceleration = (
-            np.nanmean(recent_acceleration) if recent_acceleration.size > 0 else 0.0
-        )
-
         return (
-            mean_velocity,
+            tuple(velocity.tolist()),
+            velocity_mean,
             velocity_std,
-            mean_acceleration,
+            tuple(acceleration.tolist()),
+            acceleration_mean,
             acceleration_std,
-            recent_mean_velocity,
-            recent_velocity_std,
-            recent_mean_acceleration,
-            recent_acceleration_std,
         )
 
     @staticmethod
     @lru_cache(maxsize=128)
-    def _zscore(mean: float, std: float) -> float:
+    def _t_statistic(mean: float, std: float, n: int) -> float:
+        """Compute t-statistic for H₀: μ = 0.
+
+        Formula: t = mean * √n / std
+
+        Args:
+            mean: Sample mean.
+            std: Sample standard deviation (ddof=1).
+            n: Sample size.
+
+        Returns:
+            t-statistic, or NaN if n < 2 or std ≈ 0.
+        """
+        if n < 2:
+            return np.nan
         if not np.isfinite(mean) or not np.isfinite(std):
             return np.nan
         if np.isclose(std, 0.0):
             return np.nan
-        return mean / std
+        return mean * math.sqrt(n) / std
 
     @staticmethod
     @lru_cache(maxsize=128)
@@ -2145,113 +2156,67 @@ class QuickAdapterV3(IStrategy):
             return False
         return True
 
-    def _get_exit_thresholds(
-        self,
-        hist_len: int,
-        std_v_global: float,
-        std_a_global: float,
-        std_v_recent: float,
-        std_a_recent: float,
-        min_alpha: float = 0.05,
-    ) -> dict[str, float]:
-        q_decl = float(self._exit_thresholds_calibration.get("decline_quantile"))
+    @staticmethod
+    @lru_cache(maxsize=128)
+    def _effective_df(x: tuple[float, ...]) -> float:
+        """Compute effective degrees of freedom with Bartlett's autocorrelation correction.
 
-        recent_hist_len = min(hist_len, self._pnl_momentum_window_size)
+        Formula: df_eff = (n - 1) * (1 - ρ₁) / (1 + ρ₁), where ρ₁ is lag-1 autocorrelation.
 
-        n_v_global = max(0, hist_len - 1)
-        n_a_global = max(0, hist_len - 2)
-        n_v_recent = max(0, recent_hist_len - 1)
-        n_a_recent = max(0, recent_hist_len - 2)
+        Args:
+            x: Observations tuple.
 
-        if hist_len <= 0:
-            alpha_len = 1.0
-        else:
-            alpha_len = recent_hist_len / hist_len
-        alpha_len = max(min_alpha, alpha_len)
-
-        def volatility_adjusted_alpha(
-            alpha_base: float,
-            sigma_global: float,
-            sigma_recent: float,
-            gamma: float = 1.25,
-            min_alpha: float = 0.05,
-        ) -> float:
-            if not (np.isfinite(sigma_global) and np.isfinite(sigma_recent)):
-                return alpha_base
-            if sigma_global <= 0 and sigma_recent <= 0:
-                return alpha_base
-            sigma_total = sigma_global + sigma_recent
-            if sigma_total <= 0:
-                return alpha_base
-            return max(min_alpha, alpha_base * ((sigma_global / sigma_total) ** gamma))
-
-        alpha_v = volatility_adjusted_alpha(
-            alpha_len, std_v_global, std_v_recent, min_alpha=min_alpha
-        )
-        alpha_a = volatility_adjusted_alpha(
-            alpha_len, std_a_global, std_a_recent, min_alpha=min_alpha
-        )
-        n_eff_v = alpha_v * n_v_recent + (1.0 - alpha_v) * n_v_global
-        n_eff_a = alpha_a * n_a_recent + (1.0 - alpha_a) * n_a_global
-
-        def effective_k(
-            q: float,
-            n_eff: float,
-            default_k: float,
-        ) -> float:
-            if not (0.0 < q < 1.0) or np.isclose(q, 0.0) or np.isclose(q, 1.0):
-                return default_k
-            try:
-                if n_eff < 2:
-                    return default_k
-                df_eff = max(n_eff - 1.0, 1.0)
-                k = float(t.ppf(q, df_eff)) / math.sqrt(n_eff)
-                if not np.isfinite(k):
-                    return default_k
-                return k
-            except Exception:
-                return default_k
+        Returns:
+            Effective df (≥ 1). Falls back to n - 1 if n < 4 or on error.
+        """
+        n = len(x)
+        if n < 4:
+            return max(1.0, n - 1)
 
-        k_decl_v = effective_k(
-            q_decl, n_eff_v, QuickAdapterV3.default_exit_thresholds["k_decl_v"]
-        )
-        k_decl_a = effective_k(
-            q_decl, n_eff_a, QuickAdapterV3.default_exit_thresholds["k_decl_a"]
-        )
+        x_arr = np.asarray(x)
+        x_centered = x_arr - np.nanmean(x_arr)
 
-        if debug:
-            logger.info(
-                (
-                    "hist_len=%s recent_len=%s | alpha_len=%s | q_decl=%s | "
-                    "n_v_(global,recent)=(%s,%s) n_a_(global,recent)=(%s,%s) | "
-                    "std_v_(global,recent)=(%s,%s) std_a_(global,recent)=(%s,%s) | "
-                    "alpha_(v,a)=(%s,%s) | n_eff_(v,a)=(%s,%s) | "
-                    "k_decl_(v,a)=(%s,%s)"
-                ),
-                hist_len,
-                recent_hist_len,
-                format_number(alpha_len),
-                format_number(q_decl),
-                n_v_global,
-                n_v_recent,
-                n_a_global,
-                n_a_recent,
-                format_number(std_v_global),
-                format_number(std_v_recent),
-                format_number(std_a_global),
-                format_number(std_a_recent),
-                format_number(alpha_v),
-                format_number(alpha_a),
-                format_number(n_eff_v),
-                format_number(n_eff_a),
-                format_number(k_decl_v),
-                format_number(k_decl_a),
-            )
+        try:
+            rho1, _ = pearsonr(x_centered[:-1], x_centered[1:])
+        except Exception:
+            return n - 1
 
-        return {
-            "k_decl_v": k_decl_v,
-            "k_decl_a": k_decl_a,
-        }
+        if not np.isfinite(rho1):
+            return n - 1
+
+        # Clamp to avoid division by zero or negative n_eff
+        rho1 = np.clip(rho1, -0.99, 0.99)
+        correction_factor = (1 - rho1) / (1 + rho1)
+
+        n_eff = n * correction_factor
+        df_eff = max(1.0, n_eff - 1)
+
+        return df_eff
+
+    @staticmethod
+    @lru_cache(maxsize=128)
+    def _t_critical(q: float, df: float, default_t: float) -> float:
+        """Compute critical t-value from Student's t-distribution.
+
+        Args:
+            q: Quantile in (0, 1), e.g. 0.75.
+            df: Degrees of freedom (can be fractional).
+            default_t: Fallback value on error.
+
+        Returns:
+            t.ppf(q, df), or default_t if invalid inputs.
+        """
+        if not (0.0 < q < 1.0):
+            return default_t
+        if df < 1:
+            return default_t
+        try:
+            t_crit = float(t.ppf(q, df))
+            if not np.isfinite(t_crit):
+                return default_t
+            return t_crit
+        except Exception:
+            return default_t
 
     def custom_exit(
         self,
@@ -2360,41 +2325,58 @@ class QuickAdapterV3(IStrategy):
             trade
         )
         (
-            _,
-            trade_global_pnl_velocity_std,
-            _,
-            trade_global_pnl_acceleration_std,
-            trade_recent_pnl_velocity,
-            trade_recent_pnl_velocity_std,
-            trade_recent_pnl_acceleration,
-            trade_recent_pnl_acceleration_std,
+            trade_recent_velocity_values,
+            trade_recent_velocity_mean,
+            trade_recent_velocity_std,
+            trade_recent_acceleration_values,
+            trade_recent_acceleration_mean,
+            trade_recent_acceleration_std,
         ) = QuickAdapterV3.get_pnl_momentum(
             trade_unrealized_pnl_history, self._pnl_momentum_window_size
         )
 
-        z_recent_v = QuickAdapterV3._zscore(
-            trade_recent_pnl_velocity, trade_recent_pnl_velocity_std
+        q_decl = float(self._exit_thresholds_calibration.get("decline_quantile"))
+
+        n_trade_recent_velocity = len(trade_recent_velocity_values)
+        n_trade_recent_acceleration = len(trade_recent_acceleration_values)
+
+        t_trade_recent_velocity = QuickAdapterV3._t_statistic(
+            trade_recent_velocity_mean,
+            trade_recent_velocity_std,
+            n_trade_recent_velocity,
+        )
+        t_trade_recent_acceleration = QuickAdapterV3._t_statistic(
+            trade_recent_acceleration_mean,
+            trade_recent_acceleration_std,
+            n_trade_recent_acceleration,
+        )
+
+        df_eff_trade_recent_velocity = QuickAdapterV3._effective_df(
+            trade_recent_velocity_values
         )
-        z_recent_a = QuickAdapterV3._zscore(
-            trade_recent_pnl_acceleration, trade_recent_pnl_acceleration_std
+        df_eff_trade_recent_acceleration = QuickAdapterV3._effective_df(
+            trade_recent_acceleration_values
         )
 
-        trade_hist_len = len(trade_unrealized_pnl_history)
-        trade_exit_thresholds = self._get_exit_thresholds(
-            hist_len=trade_hist_len,
-            std_v_global=trade_global_pnl_velocity_std,
-            std_a_global=trade_global_pnl_acceleration_std,
-            std_v_recent=trade_recent_pnl_velocity_std,
-            std_a_recent=trade_recent_pnl_acceleration_std,
+        t_crit_trade_recent_velocity = QuickAdapterV3._t_critical(
+            q_decl,
+            df_eff_trade_recent_velocity,
+            QuickAdapterV3.default_exit_thresholds["t_decl_v"],
+        )
+        t_crit_trade_recent_acceleration = QuickAdapterV3._t_critical(
+            q_decl,
+            df_eff_trade_recent_acceleration,
+            QuickAdapterV3.default_exit_thresholds["t_decl_a"],
         )
-        k_decl_v = trade_exit_thresholds.get("k_decl_v")
-        k_decl_a = trade_exit_thresholds.get("k_decl_a")
 
+        # Declining if t_stat ≤ -t_crit (one-sided test for μ < 0)
         decl_checks: list[bool] = []
-        if np.isfinite(z_recent_v):
-            decl_checks.append(z_recent_v <= -k_decl_v)
-        if np.isfinite(z_recent_a):
-            decl_checks.append(z_recent_a <= -k_decl_a)
+        if np.isfinite(t_trade_recent_velocity):
+            decl_checks.append(t_trade_recent_velocity <= -t_crit_trade_recent_velocity)
+        if np.isfinite(t_trade_recent_acceleration):
+            decl_checks.append(
+                t_trade_recent_acceleration <= -t_crit_trade_recent_acceleration
+            )
         if len(decl_checks) == 0:
             trade_recent_pnl_declining = True
         else:
@@ -2410,7 +2392,7 @@ class QuickAdapterV3(IStrategy):
                     f"Trade {trade.trade_direction} {trade.pair} stage {trade_exit_stage} | "
                     f"Take Profit: {format_number(trade_take_profit_price)}, Rate: {format_number(current_rate)} | "
                     f"Declining: {trade_recent_pnl_declining} "
-                    f"(zV:{format_number(z_recent_v)}<=-k:{format_number(-k_decl_v)}, zA:{format_number(z_recent_a)}<=-k:{format_number(-k_decl_a)})"
+                    f"(tV:{format_number(t_trade_recent_velocity)}<=-t:{format_number(-t_crit_trade_recent_velocity)}, tA:{format_number(t_trade_recent_acceleration)}<=-t:{format_number(-t_crit_trade_recent_acceleration)})"
                 ),
             )