import datetime
import math
from pathlib import Path
-from statistics import harmonic_mean
+from statistics import geometric_mean
import talib.abstract as ta
from pandas import DataFrame, Series, isna
from typing import Optional
from freqtrade.strategy import stoploss_from_absolute
from technical.pivots_points import pivots_points
from freqtrade.persistence import Trade
-from scipy.signal import find_peaks
import numpy as np
import pandas_ta as pta
from Utils import (
alligator,
bottom_change_percent,
+ dynamic_zigzag,
ewo,
non_zero_range,
price_retracement_percent,
INTERFACE_VERSION = 3
def version(self) -> str:
- return "3.2.14"
+ return "3.2.15"
timeframe = "5m"
@cached_property
def label_natr_ratio(self) -> float:
- return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.0125)
+ return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.075)
@cached_property
def entry_natr_ratio(self) -> float:
def set_freqai_targets(self, dataframe, metadata, **kwargs):
label_period_candles = self.get_label_period_candles(str(metadata.get("pair")))
- peaks_distance = label_period_candles
- peaks_width = label_period_candles // 4
- # To match current market condition, use the current close price and NATR to evaluate peaks prominence
- peaks_prominence = (
- dataframe["close"].iloc[-1]
- * ta.NATR(dataframe, timeperiod=label_period_candles).iloc[-1]
- * self.label_natr_ratio
- )
- min_peaks, _ = find_peaks(
- -dataframe["low"].values,
- distance=peaks_distance,
- width=peaks_width,
- prominence=peaks_prominence,
- )
- max_peaks, _ = find_peaks(
- dataframe["high"].values,
- distance=peaks_distance,
- width=peaks_width,
- prominence=peaks_prominence,
+ peak_indices, _, peak_directions = dynamic_zigzag(
+ dataframe, timeperiod=label_period_candles, ratio=self.label_natr_ratio
)
dataframe[EXTREMA_COLUMN] = 0
- for mp in min_peaks:
- dataframe.at[mp, EXTREMA_COLUMN] = -1
- for mp in max_peaks:
- dataframe.at[mp, EXTREMA_COLUMN] = 1
+ for peak_idx, peak_dir in zip(peak_indices, peak_directions):
+ dataframe.at[peak_idx, EXTREMA_COLUMN] = peak_dir
dataframe["minima"] = np.where(dataframe[EXTREMA_COLUMN] == -1, -1, 0)
dataframe["maxima"] = np.where(dataframe[EXTREMA_COLUMN] == 1, 1, 0)
dataframe[EXTREMA_COLUMN] = self.smooth_extrema(
* (1 / math.log10(1 + 0.25 * trade_duration_candles))
)
- def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> Optional[float]:
+ def get_take_profit_distance(
+ self, df: DataFrame, trade: Trade, current_rate: float
+ ) -> Optional[float]:
trade_duration_candles = self.get_trade_duration_candles(df, trade)
if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False:
return None
if isna(current_natr):
return None
return (
- trade.open_rate
- * max(entry_natr, harmonic_mean([entry_natr, current_natr]))
+ geometric_mean(
+ [trade.open_rate * entry_natr, 0.5 * current_rate * current_natr]
+ )
* self.trailing_stoploss_natr_ratio
* math.log10(9 + trade_duration_candles)
* self.reward_risk_ratio
):
return "maxima_detected_long"
- take_profit_distance = self.get_take_profit_distance(df, trade)
+ take_profit_distance = self.get_take_profit_distance(df, trade, current_rate)
if isna(take_profit_distance):
return None
if take_profit_distance == 0:
lips = smma(price_series, period=lips_period, zero_lag=zero_lag, offset=lips_shift)
return jaw, teeth, lips
+
+
+def zigzag(
+ df: pd.DataFrame,
+ threshold: float = 0.05,
+) -> tuple[list, list, list]:
+ """
+ Calculate the ZigZag indicator for a OHLCV DataFrame.
+
+ Parameters:
+ df (pd.DataFrame): OHLCV DataFrame.
+ threshold (float): Percentage threshold for reversal (default 0.05 for 5%).
+
+ Returns:
+ tuple: Lists of indices, extrema, and directions.
+ """
+ if df.empty:
+ return [], [], []
+
+ indices = []
+ extrema = []
+ directions = []
+
+ current_dir = 0
+ current_extreme = None
+ current_extreme_idx = None
+
+ first_high = df["high"].iloc[0]
+ first_low = df["low"].iloc[0]
+
+ if (first_high - first_low) / first_low >= threshold:
+ current_dir = 1
+ current_extreme = first_high
+ else:
+ current_dir = -1
+ current_extreme = first_low
+ current_extreme_idx = df.index[0]
+
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ for i in range(1, len(df)):
+ current_idx = df.index[i]
+ h = df.at[current_idx, "high"]
+ l = df.at[current_idx, "low"]
+
+ if current_dir == 1: # Looking for higher high
+ if h > current_extreme:
+ current_extreme = h
+ current_extreme_idx = current_idx
+ elif (current_extreme - l) / current_extreme >= threshold:
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ current_dir = -1
+ current_extreme = l
+ current_extreme_idx = current_idx
+
+ elif current_dir == -1: # Looking for lower low
+ if l < current_extreme:
+ current_extreme = l
+ current_extreme_idx = current_idx
+ elif (h - current_extreme) / current_extreme >= threshold:
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ current_dir = 1
+ current_extreme = h
+ current_extreme_idx = current_idx
+
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+
+ return indices, extrema, directions
+
+
+def dynamic_zigzag(
+ df: pd.DataFrame,
+ timeperiod: int = 14,
+ natr: bool = True,
+ ratio: float = 1.0,
+) -> tuple[list, list, list]:
+ """
+ Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR.
+
+ Parameters:
+ df (pd.DataFrame): OHLCV DataFrame.
+ timeperiod (int): Period for ATR/NATR calculation (default: 14).
+ natr (bool): Use NATR (True) or ATR (False) (default: True).
+ ratio (float): ratio for dynamic threshold (default: 1.0).
+
+ Returns:
+ tuple: Lists of indices, extrema, and directions.
+ """
+ if df.empty:
+ return [], [], []
+
+ if natr:
+ thresholds = ta.NATR(df, timeperiod=timeperiod)
+ else:
+ thresholds = ta.ATR(df, timeperiod=timeperiod)
+ thresholds = thresholds.ffill().bfill()
+
+ indices = []
+ extrema = []
+ directions = []
+
+ current_dir = 0
+ current_extreme = None
+ current_extreme_idx = None
+
+ first_high = df["high"].iloc[0]
+ first_low = df["low"].iloc[0]
+ first_threshold = thresholds.iloc[0] * ratio
+
+ if natr:
+ first_move = (first_high - first_low) / first_low
+ else:
+ first_move = first_high - first_low
+ if first_move >= first_threshold:
+ current_dir = 1
+ current_extreme = first_high
+ else:
+ current_dir = -1
+ current_extreme = first_low
+ current_extreme_idx = df.index[0]
+
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ for i in range(1, len(df)):
+ current_idx = df.index[i]
+ h = df.at[current_idx, "high"]
+ l = df.at[current_idx, "low"]
+ threshold = thresholds.iloc[i] * ratio
+
+ if current_dir == 1: # Looking for higher high
+ if h > current_extreme:
+ current_extreme = h
+ current_extreme_idx = current_idx
+ else:
+ if natr:
+ reversal = (current_extreme - l) / current_extreme >= threshold
+ else:
+ reversal = (current_extreme - l) >= threshold
+ if reversal:
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ current_dir = -1
+ current_extreme = l
+ current_extreme_idx = current_idx
+
+ elif current_dir == -1: # Looking for lower low
+ if l < current_extreme:
+ current_extreme = l
+ current_extreme_idx = current_idx
+ else:
+ if natr:
+ reversal = (h - current_extreme) / current_extreme >= threshold
+ else:
+ reversal = (h - current_extreme) >= threshold
+ if reversal:
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ current_dir = 1
+ current_extreme = h
+ current_extreme_idx = current_idx
+
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+
+ return indices, extrema, directions