From c19e78954328ba9239a3251f63c46ec034bc38a3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 5 Apr 2025 23:54:46 +0200 Subject: [PATCH] perf(qav3): use custom zigzag algo to label extrema MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../user_data/strategies/QuickAdapterV3.py | 46 ++--- quickadapter/user_data/strategies/Utils.py | 194 ++++++++++++++++++ 2 files changed, 209 insertions(+), 31 deletions(-) diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 6d22d8f..c905915 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -4,7 +4,7 @@ from functools import reduce, cached_property import datetime import math from pathlib import Path -from statistics import harmonic_mean +from statistics import geometric_mean import talib.abstract as ta from pandas import DataFrame, Series, isna from typing import Optional @@ -13,13 +13,13 @@ from freqtrade.strategy.interface import IStrategy from freqtrade.strategy import stoploss_from_absolute from technical.pivots_points import pivots_points from freqtrade.persistence import Trade -from scipy.signal import find_peaks import numpy as np import pandas_ta as pta from Utils import ( alligator, bottom_change_percent, + dynamic_zigzag, ewo, non_zero_range, price_retracement_percent, @@ -59,7 +59,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.2.14" + return "3.2.15" timeframe = "5m" @@ -78,7 +78,7 @@ class QuickAdapterV3(IStrategy): @cached_property def label_natr_ratio(self) -> float: - return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.0125) + return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.075) @cached_property def entry_natr_ratio(self) -> float: @@ -353,31 +353,12 @@ class QuickAdapterV3(IStrategy): def set_freqai_targets(self, dataframe, metadata, **kwargs): label_period_candles = self.get_label_period_candles(str(metadata.get("pair"))) - peaks_distance = label_period_candles - peaks_width = label_period_candles // 4 - # To match current market condition, use the current close price and NATR to evaluate peaks prominence - peaks_prominence = ( - dataframe["close"].iloc[-1] - * ta.NATR(dataframe, timeperiod=label_period_candles).iloc[-1] - * self.label_natr_ratio - ) - min_peaks, _ = find_peaks( - -dataframe["low"].values, - distance=peaks_distance, - width=peaks_width, - prominence=peaks_prominence, - ) - max_peaks, _ = find_peaks( - dataframe["high"].values, - distance=peaks_distance, - width=peaks_width, - prominence=peaks_prominence, + peak_indices, _, peak_directions = dynamic_zigzag( + dataframe, timeperiod=label_period_candles, ratio=self.label_natr_ratio ) dataframe[EXTREMA_COLUMN] = 0 - for mp in min_peaks: - dataframe.at[mp, EXTREMA_COLUMN] = -1 - for mp in max_peaks: - dataframe.at[mp, EXTREMA_COLUMN] = 1 + for peak_idx, peak_dir in zip(peak_indices, peak_directions): + dataframe.at[peak_idx, EXTREMA_COLUMN] = peak_dir dataframe["minima"] = np.where(dataframe[EXTREMA_COLUMN] == -1, -1, 0) dataframe["maxima"] = np.where(dataframe[EXTREMA_COLUMN] == 1, 1, 0) dataframe[EXTREMA_COLUMN] = self.smooth_extrema( @@ -495,7 +476,9 @@ class QuickAdapterV3(IStrategy): * (1 / math.log10(1 + 0.25 * trade_duration_candles)) ) - def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> Optional[float]: + def get_take_profit_distance( + self, df: DataFrame, trade: Trade, current_rate: float + ) -> Optional[float]: trade_duration_candles = self.get_trade_duration_candles(df, trade) if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False: return None @@ -506,8 +489,9 @@ class QuickAdapterV3(IStrategy): if isna(current_natr): return None return ( - trade.open_rate - * max(entry_natr, harmonic_mean([entry_natr, current_natr])) + geometric_mean( + [trade.open_rate * entry_natr, 0.5 * current_rate * current_natr] + ) * self.trailing_stoploss_natr_ratio * math.log10(9 + trade_duration_candles) * self.reward_risk_ratio @@ -575,7 +559,7 @@ class QuickAdapterV3(IStrategy): ): return "maxima_detected_long" - take_profit_distance = self.get_take_profit_distance(df, trade) + take_profit_distance = self.get_take_profit_distance(df, trade, current_rate) if isna(take_profit_distance): return None if take_profit_distance == 0: diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 967ade1..457ffc0 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -296,3 +296,197 @@ def alligator( lips = smma(price_series, period=lips_period, zero_lag=zero_lag, offset=lips_shift) return jaw, teeth, lips + + +def zigzag( + df: pd.DataFrame, + threshold: float = 0.05, +) -> tuple[list, list, list]: + """ + Calculate the ZigZag indicator for a OHLCV DataFrame. + + Parameters: + df (pd.DataFrame): OHLCV DataFrame. + threshold (float): Percentage threshold for reversal (default 0.05 for 5%). + + Returns: + tuple: Lists of indices, extrema, and directions. + """ + if df.empty: + return [], [], [] + + indices = [] + extrema = [] + directions = [] + + current_dir = 0 + current_extreme = None + current_extreme_idx = None + + first_high = df["high"].iloc[0] + first_low = df["low"].iloc[0] + + if (first_high - first_low) / first_low >= threshold: + current_dir = 1 + current_extreme = first_high + else: + current_dir = -1 + current_extreme = first_low + current_extreme_idx = df.index[0] + + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + for i in range(1, len(df)): + current_idx = df.index[i] + h = df.at[current_idx, "high"] + l = df.at[current_idx, "low"] + + if current_dir == 1: # Looking for higher high + if h > current_extreme: + current_extreme = h + current_extreme_idx = current_idx + elif (current_extreme - l) / current_extreme >= threshold: + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + current_dir = -1 + current_extreme = l + current_extreme_idx = current_idx + + elif current_dir == -1: # Looking for lower low + if l < current_extreme: + current_extreme = l + current_extreme_idx = current_idx + elif (h - current_extreme) / current_extreme >= threshold: + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + current_dir = 1 + current_extreme = h + current_extreme_idx = current_idx + + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + + return indices, extrema, directions + + +def dynamic_zigzag( + df: pd.DataFrame, + timeperiod: int = 14, + natr: bool = True, + ratio: float = 1.0, +) -> tuple[list, list, list]: + """ + Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR. + + Parameters: + df (pd.DataFrame): OHLCV DataFrame. + timeperiod (int): Period for ATR/NATR calculation (default: 14). + natr (bool): Use NATR (True) or ATR (False) (default: True). + ratio (float): ratio for dynamic threshold (default: 1.0). + + Returns: + tuple: Lists of indices, extrema, and directions. + """ + if df.empty: + return [], [], [] + + if natr: + thresholds = ta.NATR(df, timeperiod=timeperiod) + else: + thresholds = ta.ATR(df, timeperiod=timeperiod) + thresholds = thresholds.ffill().bfill() + + indices = [] + extrema = [] + directions = [] + + current_dir = 0 + current_extreme = None + current_extreme_idx = None + + first_high = df["high"].iloc[0] + first_low = df["low"].iloc[0] + first_threshold = thresholds.iloc[0] * ratio + + if natr: + first_move = (first_high - first_low) / first_low + else: + first_move = first_high - first_low + if first_move >= first_threshold: + current_dir = 1 + current_extreme = first_high + else: + current_dir = -1 + current_extreme = first_low + current_extreme_idx = df.index[0] + + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + for i in range(1, len(df)): + current_idx = df.index[i] + h = df.at[current_idx, "high"] + l = df.at[current_idx, "low"] + threshold = thresholds.iloc[i] * ratio + + if current_dir == 1: # Looking for higher high + if h > current_extreme: + current_extreme = h + current_extreme_idx = current_idx + else: + if natr: + reversal = (current_extreme - l) / current_extreme >= threshold + else: + reversal = (current_extreme - l) >= threshold + if reversal: + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + current_dir = -1 + current_extreme = l + current_extreme_idx = current_idx + + elif current_dir == -1: # Looking for lower low + if l < current_extreme: + current_extreme = l + current_extreme_idx = current_idx + else: + if natr: + reversal = (h - current_extreme) / current_extreme >= threshold + else: + reversal = (h - current_extreme) >= threshold + if reversal: + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + current_dir = 1 + current_extreme = h + current_extreme_idx = current_idx + + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + + return indices, extrema, directions -- 2.43.0