import math
from functools import cached_property, lru_cache, reduce
from pathlib import Path
-from typing import Any, Callable, Literal, Optional
+from typing import Any, Callable, Literal, Optional, Sequence
import numpy as np
import pandas_ta as pta
INTERFACE_VERSION = 3
def version(self) -> str:
- return "3.3.154"
+ return "3.3.155"
timeframe = "5m"
}
default_exit_thresholds: dict[str, float] = {
- "k_spike_v": 2.0,
- "k_spike_a": 1.5,
- "k_decl_v": 1.0,
- "k_decl_a": 0.5,
+ "k_spike_v": 1.2,
+ "k_spike_a": 1.0,
+ "k_decl_v": 0.6,
+ "k_decl_a": 0.4,
}
default_exit_thresholds_calibration: dict[str, float] = {
"spike_quantile": 0.95,
"decline_quantile": 0.90,
- "min_k_spike": 0.15,
- "min_k_decline": 0.5,
}
position_adjustment_enable = True
return True
- def get_trade_pnl_momentum(
- self, trade: Trade
+ @staticmethod
+ def get_pnl_momentum(
+ unrealized_pnl_history: Sequence[float], window_size: int
) -> tuple[float, float, float, float, float, float, float, float]:
- unrealized_pnl_history = np.asarray(
- QuickAdapterV3.get_trade_unrealized_pnl_history(trade)
- )
+ unrealized_pnl_history = np.asarray(unrealized_pnl_history)
velocity = np.diff(unrealized_pnl_history)
velocity_std = np.std(velocity, ddof=1) if velocity.size > 1 else 0.0
mean_velocity = np.mean(velocity) if velocity.size > 0 else 0.0
mean_acceleration = np.mean(acceleration) if acceleration.size > 0 else 0.0
- recent_unrealized_pnl_history = (
- unrealized_pnl_history[-self._pnl_momentum_window_size :]
- if len(unrealized_pnl_history) > self._pnl_momentum_window_size
- else unrealized_pnl_history
- )
+ if window_size > 0 and len(unrealized_pnl_history) > window_size:
+ recent_unrealized_pnl_history = unrealized_pnl_history[-window_size:]
+ else:
+ recent_unrealized_pnl_history = unrealized_pnl_history
recent_velocity = np.diff(recent_unrealized_pnl_history)
recent_velocity_std = (
return False
return True
- def _get_exit_thresholds(self, trade: Trade) -> dict[str, float]:
+ def _get_exit_thresholds(
+ self,
+ hist_len: int,
+ velocity_std_global: float,
+ acceleration_std_global: float,
+ velocity_std_recent: float,
+ acceleration_std_recent: float,
+ ) -> dict[str, float]:
q_spike = float(self._exit_thresholds_calibration.get("spike_quantile"))
q_decl = float(self._exit_thresholds_calibration.get("decline_quantile"))
- hist_len = len(QuickAdapterV3.get_trade_unrealized_pnl_history(trade))
-
- n_v = max(0, hist_len - 1)
- n_a = max(0, hist_len - 2)
-
recent_hist_len = min(hist_len, self._pnl_momentum_window_size)
- n_rv = max(0, recent_hist_len - 1)
- n_ra = max(0, recent_hist_len - 2)
- def t_k(n: int, q: float, default_k: float, min_k: float) -> float:
- if not (0.0 < q < 1.0) or n < 1:
+ n_v_global = max(0, hist_len - 1)
+ n_a_global = max(0, hist_len - 2)
+ n_v_recent = max(0, recent_hist_len - 1)
+ n_a_recent = max(0, recent_hist_len - 2)
+
+ if hist_len <= 0:
+ alpha_len = 1.0
+ else:
+ alpha_len = recent_hist_len / hist_len if hist_len > 0 else 1.0
+ if alpha_len < 0.05:
+ alpha_len = 0.05
+
+ def volatility_adjusted_alpha(
+ alpha_base: float,
+ sigma_global: float,
+ sigma_recent: float,
+ gamma=1.25,
+ min_alpha=0.05,
+ ) -> float:
+ if not (np.isfinite(sigma_global) and np.isfinite(sigma_recent)):
+ return alpha_base
+ if sigma_global <= 0 and sigma_recent <= 0:
+ return alpha_base
+ sigma_total = sigma_global + sigma_recent
+ if sigma_total <= 0:
+ return alpha_base
+ ratio = sigma_global / sigma_total
+ alpha_vol = alpha_base * (ratio**gamma)
+ return max(min_alpha, alpha_vol)
+
+ alpha_v = volatility_adjusted_alpha(
+ alpha_len, velocity_std_global, velocity_std_recent
+ )
+ alpha_a = volatility_adjusted_alpha(
+ alpha_len, acceleration_std_global, acceleration_std_recent
+ )
+ n_eff_v = alpha_v * n_v_recent + (1.0 - alpha_v) * n_v_global
+ n_eff_a = alpha_a * n_a_recent + (1.0 - alpha_a) * n_a_global
+
+ def effective_k(
+ q: float,
+ n_eff: float,
+ default_k: float,
+ ) -> float:
+ if not (0.0 < q < 1.0) or np.isclose(q, 0.0) or np.isclose(q, 1.0):
return default_k
try:
- df = max(n - 1, 1)
- return max(float(t.ppf(q, df)) / math.sqrt(n), min_k)
+ if n_eff < 2:
+ return default_k
+ df_eff = max(n_eff - 1.0, 1.0)
+ k = float(t.ppf(q, df_eff)) / math.sqrt(n_eff)
+ if not np.isfinite(k):
+ return default_k
+ return k
except Exception:
return default_k
- k_spike_v = t_k(
- n_rv,
- q_spike,
- self.default_exit_thresholds["k_spike_v"],
- self._exit_thresholds_calibration["min_k_spike"],
+ k_spike_v = effective_k(
+ q_spike, n_eff_v, self.default_exit_thresholds["k_spike_v"]
)
- k_spike_a = t_k(
- n_ra,
- q_spike,
- self.default_exit_thresholds["k_spike_a"],
- self._exit_thresholds_calibration["min_k_spike"],
+ k_spike_a = effective_k(
+ q_spike, n_eff_a, self.default_exit_thresholds["k_spike_a"]
)
- k_decl_v = t_k(
- n_v,
- q_decl,
- self.default_exit_thresholds["k_decl_v"],
- self._exit_thresholds_calibration["min_k_decline"],
+ k_decl_v = effective_k(
+ q_decl, n_eff_v, self.default_exit_thresholds["k_decl_v"]
)
- k_decl_a = t_k(
- n_a,
- q_decl,
- self.default_exit_thresholds["k_decl_a"],
- self._exit_thresholds_calibration["min_k_decline"],
+ k_decl_a = effective_k(
+ q_decl, n_eff_a, self.default_exit_thresholds["k_decl_a"]
)
if debug:
logger.info(
- f"n_(rv,ra,v,a)=({n_rv},{n_ra},{n_v},{n_a}) | "
- f"q_(spike,decl)=({format_number(q_spike)},{format_number(q_decl)}) | "
- f"k_spike_(v,a)=({format_number(k_spike_v)},{format_number(k_spike_a)}), k_decl_(v,a)=({format_number(k_decl_v)},{format_number(k_decl_a)})"
+ (
+ "hist_len=%s recent_len=%s | alpha_len=%s | q_spike=%s q_decl=%s | "
+ "n_v_(global,recent)=(%s,%s) n_a_(global,recent)=(%s,%s) | "
+ "std_v_(global,recent)=(%s,%s) std_a_(global,recent)=(%s,%s) | "
+ "alpha_(v,a)=(%s,%s) | n_eff_(v,a)=(%s,%s) | "
+ "k_spike_(v,a)=(%s,%s) k_decl_(v,a)=(%s,%s)"
+ ),
+ hist_len,
+ recent_hist_len,
+ format_number(alpha_len),
+ format_number(q_spike),
+ format_number(q_decl),
+ n_v_global,
+ n_v_recent,
+ n_a_global,
+ n_a_recent,
+ format_number(velocity_std_global),
+ format_number(velocity_std_recent),
+ format_number(acceleration_std_global),
+ format_number(acceleration_std_recent),
+ format_number(alpha_v),
+ format_number(alpha_a),
+ format_number(n_eff_v),
+ format_number(n_eff_a),
+ format_number(k_spike_v),
+ format_number(k_spike_a),
+ format_number(k_decl_v),
+ format_number(k_decl_a),
)
return {
)
return None
+ trade_unrealized_pnl_history = QuickAdapterV3.get_trade_unrealized_pnl_history(
+ trade
+ )
(
trade_pnl_velocity,
trade_pnl_velocity_std,
trade_recent_pnl_velocity_std,
trade_recent_pnl_acceleration,
trade_recent_pnl_acceleration_std,
- ) = self.get_trade_pnl_momentum(trade)
+ ) = QuickAdapterV3.get_pnl_momentum(
+ trade_unrealized_pnl_history, self._pnl_momentum_window_size
+ )
z_dv = QuickAdapterV3._zscore(trade_pnl_velocity, trade_pnl_velocity_std)
z_da = QuickAdapterV3._zscore(
trade_recent_pnl_acceleration, trade_recent_pnl_acceleration_std
)
- trade_exit_thresholds = self._get_exit_thresholds(trade)
+ trade_hist_len = len(trade_unrealized_pnl_history)
+ trade_exit_thresholds = self._get_exit_thresholds(
+ hist_len=trade_hist_len,
+ velocity_std_global=trade_pnl_velocity_std,
+ acceleration_std_global=trade_pnl_acceleration_std,
+ velocity_std_recent=trade_recent_pnl_velocity_std,
+ acceleration_std_recent=trade_recent_pnl_acceleration_std,
+ )
k_spike_v = trade_exit_thresholds.get("k_spike_v")
k_spike_a = trade_exit_thresholds.get("k_spike_a")
k_decl_v = trade_exit_thresholds.get("k_decl_v")