from freqtrade.strategy import AnnotationType, stoploss_from_absolute
from freqtrade.strategy.interface import IStrategy
from pandas import DataFrame, Series, isna
-from scipy.stats import t
+from scipy.stats import pearsonr, t
from technical.pivots_points import pivots_points
from Utils import (
use_custom_stoploss = True
default_exit_thresholds: ClassVar[dict[str, float]] = {
- "k_decl_v": 0.6,
- "k_decl_a": 0.4,
+ "t_decl_v": 0.675,
+ "t_decl_a": 0.675,
}
default_exit_thresholds_calibration: ClassVar[dict[str, float]] = {
@staticmethod
def get_pnl_momentum(
unrealized_pnl_history: Sequence[float], window_size: int
- ) -> tuple[float, float, float, float, float, float, float, float]:
- unrealized_pnl_history = np.asarray(unrealized_pnl_history)
+ ) -> tuple[
+ tuple[float, ...],
+ float,
+ float,
+ tuple[float, ...],
+ float,
+ float,
+ ]:
+ """Compute velocity and acceleration from PnL history.
+
+ Velocity is the first derivative of PnL, acceleration is the second.
+
+ Args:
+ unrealized_pnl_history: PnL values sequence.
+ window_size: Recent window size (0 = no windowing).
+
+ Returns:
+ (velocity_values, velocity_mean, velocity_std,
+ acceleration_values, acceleration_mean, acceleration_std)
+ """
+ unrealized_pnl_history_array = np.asarray(unrealized_pnl_history)
+
+ if window_size > 0 and len(unrealized_pnl_history_array) > window_size:
+ unrealized_pnl_history_array = unrealized_pnl_history_array[-window_size:]
- velocity = np.diff(unrealized_pnl_history)
+ velocity = np.diff(unrealized_pnl_history_array)
+ velocity_mean = np.nanmean(velocity) if velocity.size > 0 else 0.0
velocity_std = np.nanstd(velocity, ddof=1) if velocity.size > 1 else 0.0
+
acceleration = np.diff(velocity)
+ acceleration_mean = np.nanmean(acceleration) if acceleration.size > 0 else 0.0
acceleration_std = (
np.nanstd(acceleration, ddof=1) if acceleration.size > 1 else 0.0
)
- mean_velocity = np.nanmean(velocity) if velocity.size > 0 else 0.0
- mean_acceleration = np.nanmean(acceleration) if acceleration.size > 0 else 0.0
-
- if window_size > 0 and len(unrealized_pnl_history) > window_size:
- recent_unrealized_pnl_history = unrealized_pnl_history[-window_size:]
- else:
- recent_unrealized_pnl_history = unrealized_pnl_history
-
- recent_velocity = np.diff(recent_unrealized_pnl_history)
- recent_velocity_std = (
- np.nanstd(recent_velocity, ddof=1) if recent_velocity.size > 1 else 0.0
- )
- recent_acceleration = np.diff(recent_velocity)
- recent_acceleration_std = (
- np.nanstd(recent_acceleration, ddof=1)
- if recent_acceleration.size > 1
- else 0.0
- )
-
- recent_mean_velocity = (
- np.nanmean(recent_velocity) if recent_velocity.size > 0 else 0.0
- )
- recent_mean_acceleration = (
- np.nanmean(recent_acceleration) if recent_acceleration.size > 0 else 0.0
- )
-
return (
- mean_velocity,
+ tuple(velocity.tolist()),
+ velocity_mean,
velocity_std,
- mean_acceleration,
+ tuple(acceleration.tolist()),
+ acceleration_mean,
acceleration_std,
- recent_mean_velocity,
- recent_velocity_std,
- recent_mean_acceleration,
- recent_acceleration_std,
)
@staticmethod
@lru_cache(maxsize=128)
- def _zscore(mean: float, std: float) -> float:
+ def _t_statistic(mean: float, std: float, n: int) -> float:
+ """Compute t-statistic for H₀: μ = 0.
+
+ Formula: t = mean * √n / std
+
+ Args:
+ mean: Sample mean.
+ std: Sample standard deviation (ddof=1).
+ n: Sample size.
+
+ Returns:
+ t-statistic, or NaN if n < 2 or std ≈ 0.
+ """
+ if n < 2:
+ return np.nan
if not np.isfinite(mean) or not np.isfinite(std):
return np.nan
if np.isclose(std, 0.0):
return np.nan
- return mean / std
+ return mean * math.sqrt(n) / std
@staticmethod
@lru_cache(maxsize=128)
return False
return True
- def _get_exit_thresholds(
- self,
- hist_len: int,
- std_v_global: float,
- std_a_global: float,
- std_v_recent: float,
- std_a_recent: float,
- min_alpha: float = 0.05,
- ) -> dict[str, float]:
- q_decl = float(self._exit_thresholds_calibration.get("decline_quantile"))
+ @staticmethod
+ @lru_cache(maxsize=128)
+ def _effective_df(x: tuple[float, ...]) -> float:
+ """Compute effective degrees of freedom with Bartlett's autocorrelation correction.
- recent_hist_len = min(hist_len, self._pnl_momentum_window_size)
+ Formula: df_eff = (n - 1) * (1 - ρ₁) / (1 + ρ₁), where ρ₁ is lag-1 autocorrelation.
- n_v_global = max(0, hist_len - 1)
- n_a_global = max(0, hist_len - 2)
- n_v_recent = max(0, recent_hist_len - 1)
- n_a_recent = max(0, recent_hist_len - 2)
+ Args:
+ x: Observations tuple.
- if hist_len <= 0:
- alpha_len = 1.0
- else:
- alpha_len = recent_hist_len / hist_len
- alpha_len = max(min_alpha, alpha_len)
-
- def volatility_adjusted_alpha(
- alpha_base: float,
- sigma_global: float,
- sigma_recent: float,
- gamma: float = 1.25,
- min_alpha: float = 0.05,
- ) -> float:
- if not (np.isfinite(sigma_global) and np.isfinite(sigma_recent)):
- return alpha_base
- if sigma_global <= 0 and sigma_recent <= 0:
- return alpha_base
- sigma_total = sigma_global + sigma_recent
- if sigma_total <= 0:
- return alpha_base
- return max(min_alpha, alpha_base * ((sigma_global / sigma_total) ** gamma))
-
- alpha_v = volatility_adjusted_alpha(
- alpha_len, std_v_global, std_v_recent, min_alpha=min_alpha
- )
- alpha_a = volatility_adjusted_alpha(
- alpha_len, std_a_global, std_a_recent, min_alpha=min_alpha
- )
- n_eff_v = alpha_v * n_v_recent + (1.0 - alpha_v) * n_v_global
- n_eff_a = alpha_a * n_a_recent + (1.0 - alpha_a) * n_a_global
-
- def effective_k(
- q: float,
- n_eff: float,
- default_k: float,
- ) -> float:
- if not (0.0 < q < 1.0) or np.isclose(q, 0.0) or np.isclose(q, 1.0):
- return default_k
- try:
- if n_eff < 2:
- return default_k
- df_eff = max(n_eff - 1.0, 1.0)
- k = float(t.ppf(q, df_eff)) / math.sqrt(n_eff)
- if not np.isfinite(k):
- return default_k
- return k
- except Exception:
- return default_k
+ Returns:
+ Effective df (≥ 1). Falls back to n - 1 if n < 4 or on error.
+ """
+ n = len(x)
+ if n < 4:
+ return max(1.0, n - 1)
- k_decl_v = effective_k(
- q_decl, n_eff_v, QuickAdapterV3.default_exit_thresholds["k_decl_v"]
- )
- k_decl_a = effective_k(
- q_decl, n_eff_a, QuickAdapterV3.default_exit_thresholds["k_decl_a"]
- )
+ x_arr = np.asarray(x)
+ x_centered = x_arr - np.nanmean(x_arr)
- if debug:
- logger.info(
- (
- "hist_len=%s recent_len=%s | alpha_len=%s | q_decl=%s | "
- "n_v_(global,recent)=(%s,%s) n_a_(global,recent)=(%s,%s) | "
- "std_v_(global,recent)=(%s,%s) std_a_(global,recent)=(%s,%s) | "
- "alpha_(v,a)=(%s,%s) | n_eff_(v,a)=(%s,%s) | "
- "k_decl_(v,a)=(%s,%s)"
- ),
- hist_len,
- recent_hist_len,
- format_number(alpha_len),
- format_number(q_decl),
- n_v_global,
- n_v_recent,
- n_a_global,
- n_a_recent,
- format_number(std_v_global),
- format_number(std_v_recent),
- format_number(std_a_global),
- format_number(std_a_recent),
- format_number(alpha_v),
- format_number(alpha_a),
- format_number(n_eff_v),
- format_number(n_eff_a),
- format_number(k_decl_v),
- format_number(k_decl_a),
- )
+ try:
+ rho1, _ = pearsonr(x_centered[:-1], x_centered[1:])
+ except Exception:
+ return n - 1
- return {
- "k_decl_v": k_decl_v,
- "k_decl_a": k_decl_a,
- }
+ if not np.isfinite(rho1):
+ return n - 1
+
+ # Clamp to avoid division by zero or negative n_eff
+ rho1 = np.clip(rho1, -0.99, 0.99)
+ correction_factor = (1 - rho1) / (1 + rho1)
+
+ n_eff = n * correction_factor
+ df_eff = max(1.0, n_eff - 1)
+
+ return df_eff
+
+ @staticmethod
+ @lru_cache(maxsize=128)
+ def _t_critical(q: float, df: float, default_t: float) -> float:
+ """Compute critical t-value from Student's t-distribution.
+
+ Args:
+ q: Quantile in (0, 1), e.g. 0.75.
+ df: Degrees of freedom (can be fractional).
+ default_t: Fallback value on error.
+
+ Returns:
+ t.ppf(q, df), or default_t if invalid inputs.
+ """
+ if not (0.0 < q < 1.0):
+ return default_t
+ if df < 1:
+ return default_t
+ try:
+ t_crit = float(t.ppf(q, df))
+ if not np.isfinite(t_crit):
+ return default_t
+ return t_crit
+ except Exception:
+ return default_t
def custom_exit(
self,
trade
)
(
- _,
- trade_global_pnl_velocity_std,
- _,
- trade_global_pnl_acceleration_std,
- trade_recent_pnl_velocity,
- trade_recent_pnl_velocity_std,
- trade_recent_pnl_acceleration,
- trade_recent_pnl_acceleration_std,
+ trade_recent_velocity_values,
+ trade_recent_velocity_mean,
+ trade_recent_velocity_std,
+ trade_recent_acceleration_values,
+ trade_recent_acceleration_mean,
+ trade_recent_acceleration_std,
) = QuickAdapterV3.get_pnl_momentum(
trade_unrealized_pnl_history, self._pnl_momentum_window_size
)
- z_recent_v = QuickAdapterV3._zscore(
- trade_recent_pnl_velocity, trade_recent_pnl_velocity_std
+ q_decl = float(self._exit_thresholds_calibration.get("decline_quantile"))
+
+ n_trade_recent_velocity = len(trade_recent_velocity_values)
+ n_trade_recent_acceleration = len(trade_recent_acceleration_values)
+
+ t_trade_recent_velocity = QuickAdapterV3._t_statistic(
+ trade_recent_velocity_mean,
+ trade_recent_velocity_std,
+ n_trade_recent_velocity,
+ )
+ t_trade_recent_acceleration = QuickAdapterV3._t_statistic(
+ trade_recent_acceleration_mean,
+ trade_recent_acceleration_std,
+ n_trade_recent_acceleration,
+ )
+
+ df_eff_trade_recent_velocity = QuickAdapterV3._effective_df(
+ trade_recent_velocity_values
)
- z_recent_a = QuickAdapterV3._zscore(
- trade_recent_pnl_acceleration, trade_recent_pnl_acceleration_std
+ df_eff_trade_recent_acceleration = QuickAdapterV3._effective_df(
+ trade_recent_acceleration_values
)
- trade_hist_len = len(trade_unrealized_pnl_history)
- trade_exit_thresholds = self._get_exit_thresholds(
- hist_len=trade_hist_len,
- std_v_global=trade_global_pnl_velocity_std,
- std_a_global=trade_global_pnl_acceleration_std,
- std_v_recent=trade_recent_pnl_velocity_std,
- std_a_recent=trade_recent_pnl_acceleration_std,
+ t_crit_trade_recent_velocity = QuickAdapterV3._t_critical(
+ q_decl,
+ df_eff_trade_recent_velocity,
+ QuickAdapterV3.default_exit_thresholds["t_decl_v"],
+ )
+ t_crit_trade_recent_acceleration = QuickAdapterV3._t_critical(
+ q_decl,
+ df_eff_trade_recent_acceleration,
+ QuickAdapterV3.default_exit_thresholds["t_decl_a"],
)
- k_decl_v = trade_exit_thresholds.get("k_decl_v")
- k_decl_a = trade_exit_thresholds.get("k_decl_a")
+ # Declining if t_stat ≤ -t_crit (one-sided test for μ < 0)
decl_checks: list[bool] = []
- if np.isfinite(z_recent_v):
- decl_checks.append(z_recent_v <= -k_decl_v)
- if np.isfinite(z_recent_a):
- decl_checks.append(z_recent_a <= -k_decl_a)
+ if np.isfinite(t_trade_recent_velocity):
+ decl_checks.append(t_trade_recent_velocity <= -t_crit_trade_recent_velocity)
+ if np.isfinite(t_trade_recent_acceleration):
+ decl_checks.append(
+ t_trade_recent_acceleration <= -t_crit_trade_recent_acceleration
+ )
if len(decl_checks) == 0:
trade_recent_pnl_declining = True
else:
f"Trade {trade.trade_direction} {trade.pair} stage {trade_exit_stage} | "
f"Take Profit: {format_number(trade_take_profit_price)}, Rate: {format_number(current_rate)} | "
f"Declining: {trade_recent_pnl_declining} "
- f"(zV:{format_number(z_recent_v)}<=-k:{format_number(-k_decl_v)}, zA:{format_number(z_recent_a)}<=-k:{format_number(-k_decl_a)})"
+ f"(tV:{format_number(t_trade_recent_velocity)}<=-t:{format_number(-t_crit_trade_recent_velocity)}, tA:{format_number(t_trade_recent_acceleration)}<=-t:{format_number(-t_crit_trade_recent_acceleration)})"
),
)