From d1712c7c0ddd18070c98f0b0a64496410d59cab2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 24 Mar 2025 00:02:32 +0100 Subject: [PATCH] refactor(qav3): factor out some code in a Utils.py file MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../user_data/strategies/QuickAdapterV3.py | 55 +----- quickadapter/user_data/strategies/Utils.py | 175 ++++++++++++++++++ 2 files changed, 182 insertions(+), 48 deletions(-) create mode 100644 quickadapter/user_data/strategies/Utils.py diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 5fcce28..b20a312 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -7,7 +7,6 @@ from pathlib import Path from statistics import harmonic_mean import talib.abstract as ta from pandas import DataFrame, Series, isna -from technical import qtpylib from typing import Optional from freqtrade.exchange import timeframe_to_minutes, timeframe_to_prev_date from freqtrade.strategy.interface import IStrategy @@ -19,6 +18,8 @@ from scipy.signal.windows import gaussian import numpy as np import pandas_ta as pta +from Utils import ewo, vwapb + logger = logging.getLogger(__name__) EXTREMA_COLUMN = "&s-extrema" @@ -208,7 +209,9 @@ class QuickAdapterV3(IStrategy): dataframe["%-pct-change"] = dataframe["close"].pct_change() dataframe["%-raw_volume"] = dataframe["volume"] dataframe["%-obv"] = ta.OBV(dataframe) - dataframe["%-ewo"] = EWO(dataframe=dataframe, ma_mode="zlewma", normalize=True) + dataframe["%-ewo"] = ewo( + dataframe=dataframe, mamode="ema", zero_lag=True, normalize=True + ) psar = ta.SAR(dataframe, acceleration=0.02, maximum=0.2) dataframe["%-diff_to_psar"] = dataframe["close"] - psar kc = pta.kc( @@ -261,12 +264,12 @@ class QuickAdapterV3(IStrategy): dataframe["%-macd"], dataframe["%-macdsignal"] ) dataframe["%-dist_to_zerohist"] = get_distance(0, dataframe["%-macdhist"]) - # VWAP + # VWAP bands ( dataframe["vwap_lowerband"], dataframe["vwap_middleband"], dataframe["vwap_upperband"], - ) = VWAPB(dataframe, 20, 1) + ) = vwapb(dataframe, 20, 1) dataframe["%-vwap_width"] = ( dataframe["vwap_upperband"] - dataframe["vwap_lowerband"] ) / dataframe["vwap_middleband"] @@ -730,50 +733,6 @@ def price_retracement_percent(dataframe: DataFrame, period: int) -> Series: ).fillna(0.0) -# VWAP bands -def VWAPB(dataframe: DataFrame, window=20, num_of_std=1) -> tuple: - vwap = qtpylib.rolling_vwap(dataframe, window=window) - rolling_std = vwap.rolling(window=window).std() - vwap_low = vwap - (rolling_std * num_of_std) - vwap_high = vwap + (rolling_std * num_of_std) - return vwap_low, vwap, vwap_high - - -def EWO( - dataframe: DataFrame, ma1_length=5, ma2_length=34, ma_mode="sma", normalize=False -) -> Series: - ma_modes: dict = { - "sma": ta.SMA, - "ema": ta.EMA, - "wma": ta.WMA, - "dema": ta.DEMA, - "tema": ta.TEMA, - "zlewma": ZLEWMA, - "trima": ta.TRIMA, - "kama": ta.KAMA, - "t3": ta.T3, - } - ma_fn = ma_modes.get(ma_mode, ma_modes["sma"]) - ma1 = ma_fn(dataframe, timeperiod=ma1_length) - ma2 = ma_fn(dataframe, timeperiod=ma2_length) - madiff = ma1 - ma2 - if normalize: - madiff = ( - madiff / dataframe["close"] - ) * 100 # Optional normalization with close price - return madiff - - -def ZLEWMA(dataframe: DataFrame, timeperiod: int) -> Series: - """ - Calculate the close price ZLEWMA (Zero Lag Exponential Weighted Moving Average) series of an OHLCV dataframe. - :param dataframe: DataFrame The original OHLCV dataframe - :param timeperiod: int The period to look back - :return: Series The close price ZLEWMA series - """ - return pta.zlma(dataframe["close"], length=timeperiod, mamode="ema") - - def zero_phase_gaussian(series: Series, window: int, std: float): kernel = gaussian(window, std=std) kernel /= kernel.sum() diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py new file mode 100644 index 0000000..7f282df --- /dev/null +++ b/quickadapter/user_data/strategies/Utils.py @@ -0,0 +1,175 @@ +import numpy as np +import pandas as pd +import pandas_ta as pta +import talib.abstract as ta +from technical import qtpylib + + +# VWAP bands +def vwapb(dataframe: pd.DataFrame, window=20, num_of_std=1) -> tuple: + vwap = qtpylib.rolling_vwap(dataframe, window=window) + rolling_std = vwap.rolling(window=window).std() + vwap_low = vwap - (rolling_std * num_of_std) + vwap_high = vwap + (rolling_std * num_of_std) + return vwap_low, vwap, vwap_high + + +def get_ma_fn(mamode: str, zero_lag=False) -> callable: + mamodes: dict = { + "sma": ta.SMA, + "ema": ta.EMA, + "wma": ta.WMA, + "dema": ta.DEMA, + "tema": ta.TEMA, + "trima": ta.TRIMA, + "kama": ta.KAMA, + "t3": ta.T3, + } + if zero_lag: + ma_fn = lambda df, timeperiod: pta.zlma( + df["close"], length=timeperiod, mamode=mamode + ) + else: + ma_fn = mamodes.get(mamode, mamodes["sma"]) + return ma_fn + + +def ewo( + dataframe: pd.DataFrame, + ma1_length=5, + ma2_length=34, + mamode="sma", + zero_lag=False, + normalize=False, +) -> pd.Series: + ma_fn = get_ma_fn(mamode, zero_lag=zero_lag) + ma1 = ma_fn(dataframe, timeperiod=ma1_length) + ma2 = ma_fn(dataframe, timeperiod=ma2_length) + madiff = ma1 - ma2 + if normalize: + madiff = ( + madiff / dataframe["close"] + ) * 100 # Optional normalization with close price + return madiff + + +def smma( + df: pd.DataFrame, period: int, mamode="sma", zero_lag=False, offset=0 +) -> pd.Series: + """ + SMoothed Moving Average (SMMA). + """ + close = df["close"] + if len(close) < period: + return pd.Series(index=close.index, dtype=float) + + smma = close.copy() + smma[: period - 1] = np.nan + ma_fn = get_ma_fn(mamode, zero_lag=zero_lag) + smma.iloc[period - 1] = ma_fn(close[:period], timeperiod=period).iloc[-1] + + for i in range(period, len(close)): + smma.iat[i] = ((period - 1) * smma.iat[i - 1] + smma.iat[i]) / period + + if offset != 0: + smma = smma.shift(offset) + + return smma + + +def alligator( + df: pd.DataFrame, + jaw_period=13, + teeth_period=8, + lips_period=5, + jaw_shift=8, + teeth_shift=5, + lips_shift=3, + mamode="sma", + zero_lag=False, +) -> tuple[pd.Series, pd.Series, pd.Series]: + """ + Calculate Bill Williams' Alligator indicator lines. + """ + median_price = (df["high"] + df["low"]) / 2 + + jaw = smma(median_price, period=jaw_period, mamode=mamode, zero_lag=zero_lag).shift( + jaw_shift + ) + teeth = smma( + median_price, period=teeth_period, mamode=mamode, zero_lag=zero_lag + ).shift(teeth_shift) + lips = smma( + median_price, period=lips_period, mamode=mamode, zero_lag=zero_lag + ).shift(lips_shift) + + return jaw, teeth, lips + + +def fractal_dimension( + prices_array: np.ndarray, period: int, normalize: bool = False +) -> float: + """ + Calculate fractal dimension of a price window, with optional normalization. + + Args: + window: Array of prices for the current window. + period: Window size (must be even). + normalize: If True, normalize HL values by their window lengths. + + Returns: + Fractal dimension (D) clipped between 1.0 and 2.0. + """ + if period % 2 != 0: + raise ValueError("FRAMA period must be even") + + half_period = period // 2 + if half_period < 1 or len(prices_array) < period: + return 1.0 + prices_first_half = prices_array[:half_period] + prices_second_half = prices_array[half_period:] + + HL1 = np.max(prices_first_half) - np.min(prices_first_half) + HL2 = np.max(prices_second_half) - np.min(prices_second_half) + HL3 = np.max(prices_array) - np.min(prices_array) + + if normalize: + HL1 /= half_period + HL2 /= half_period + HL3 /= period + + if HL1 + HL2 == 0 or HL3 == 0: + return 1.0 + + D = (np.log(HL1 + HL2) - np.log(HL3)) / np.log(2) + return np.clip(D, 1.0, 2.0) + + +def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series: + """ + Calculate FRAMA with optional normalization. + + Args: + prices: Pandas Series of closing prices. + period: Lookback window (default=16). + normalize: Enable range normalization (default=False). + + Returns: + FRAMA values as a Pandas Series. + """ + if period % 2 != 0: + raise ValueError("FRAMA period must be even") + + frama = np.full(len(prices), np.nan) + + for i in range(period - 1, len(prices)): + prices_array = prices.iloc[i - period + 1 : i + 1].to_numpy() + D = fractal_dimension(prices_array, period, normalize) + alpha = np.exp(-4.6 * (D - 1)) + + if np.isnan(frama[i - 1]): + frama[i] = prices_array[-1] + else: + frama[i] = alpha * prices_array[-1] + (1 - alpha) * frama[i - 1] + + return pd.Series(frama, index=prices.index) -- 2.43.0