]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(qav3): factor out some code in a Utils.py file
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 23 Mar 2025 23:02:32 +0000 (00:02 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 23 Mar 2025 23:02:32 +0000 (00:02 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py [new file with mode: 0644]

index 5fcce28c4b9013a3915b5f482ba5f6cdeca38e82..b20a3129830a272ba882cfa4d0ac0ab30eb023f0 100644 (file)
@@ -7,7 +7,6 @@ from pathlib import Path
 from statistics import harmonic_mean
 import talib.abstract as ta
 from pandas import DataFrame, Series, isna
-from technical import qtpylib
 from typing import Optional
 from freqtrade.exchange import timeframe_to_minutes, timeframe_to_prev_date
 from freqtrade.strategy.interface import IStrategy
@@ -19,6 +18,8 @@ from scipy.signal.windows import gaussian
 import numpy as np
 import pandas_ta as pta
 
+from Utils import ewo, vwapb
+
 logger = logging.getLogger(__name__)
 
 EXTREMA_COLUMN = "&s-extrema"
@@ -208,7 +209,9 @@ class QuickAdapterV3(IStrategy):
         dataframe["%-pct-change"] = dataframe["close"].pct_change()
         dataframe["%-raw_volume"] = dataframe["volume"]
         dataframe["%-obv"] = ta.OBV(dataframe)
-        dataframe["%-ewo"] = EWO(dataframe=dataframe, ma_mode="zlewma", normalize=True)
+        dataframe["%-ewo"] = ewo(
+            dataframe=dataframe, mamode="ema", zero_lag=True, normalize=True
+        )
         psar = ta.SAR(dataframe, acceleration=0.02, maximum=0.2)
         dataframe["%-diff_to_psar"] = dataframe["close"] - psar
         kc = pta.kc(
@@ -261,12 +264,12 @@ class QuickAdapterV3(IStrategy):
             dataframe["%-macd"], dataframe["%-macdsignal"]
         )
         dataframe["%-dist_to_zerohist"] = get_distance(0, dataframe["%-macdhist"])
-        # VWAP
+        # VWAP bands
         (
             dataframe["vwap_lowerband"],
             dataframe["vwap_middleband"],
             dataframe["vwap_upperband"],
-        ) = VWAPB(dataframe, 20, 1)
+        ) = vwapb(dataframe, 20, 1)
         dataframe["%-vwap_width"] = (
             dataframe["vwap_upperband"] - dataframe["vwap_lowerband"]
         ) / dataframe["vwap_middleband"]
@@ -730,50 +733,6 @@ def price_retracement_percent(dataframe: DataFrame, period: int) -> Series:
     ).fillna(0.0)
 
 
-# VWAP bands
-def VWAPB(dataframe: DataFrame, window=20, num_of_std=1) -> tuple:
-    vwap = qtpylib.rolling_vwap(dataframe, window=window)
-    rolling_std = vwap.rolling(window=window).std()
-    vwap_low = vwap - (rolling_std * num_of_std)
-    vwap_high = vwap + (rolling_std * num_of_std)
-    return vwap_low, vwap, vwap_high
-
-
-def EWO(
-    dataframe: DataFrame, ma1_length=5, ma2_length=34, ma_mode="sma", normalize=False
-) -> Series:
-    ma_modes: dict = {
-        "sma": ta.SMA,
-        "ema": ta.EMA,
-        "wma": ta.WMA,
-        "dema": ta.DEMA,
-        "tema": ta.TEMA,
-        "zlewma": ZLEWMA,
-        "trima": ta.TRIMA,
-        "kama": ta.KAMA,
-        "t3": ta.T3,
-    }
-    ma_fn = ma_modes.get(ma_mode, ma_modes["sma"])
-    ma1 = ma_fn(dataframe, timeperiod=ma1_length)
-    ma2 = ma_fn(dataframe, timeperiod=ma2_length)
-    madiff = ma1 - ma2
-    if normalize:
-        madiff = (
-            madiff / dataframe["close"]
-        ) * 100  # Optional normalization with close price
-    return madiff
-
-
-def ZLEWMA(dataframe: DataFrame, timeperiod: int) -> Series:
-    """
-    Calculate the close price ZLEWMA (Zero Lag Exponential Weighted Moving Average) series of an OHLCV dataframe.
-    :param dataframe: DataFrame The original OHLCV dataframe
-    :param timeperiod: int The period to look back
-    :return: Series The close price ZLEWMA series
-    """
-    return pta.zlma(dataframe["close"], length=timeperiod, mamode="ema")
-
-
 def zero_phase_gaussian(series: Series, window: int, std: float):
     kernel = gaussian(window, std=std)
     kernel /= kernel.sum()
diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py
new file mode 100644 (file)
index 0000000..7f282df
--- /dev/null
@@ -0,0 +1,175 @@
+import numpy as np
+import pandas as pd
+import pandas_ta as pta
+import talib.abstract as ta
+from technical import qtpylib
+
+
+# VWAP bands
+def vwapb(dataframe: pd.DataFrame, window=20, num_of_std=1) -> tuple:
+    vwap = qtpylib.rolling_vwap(dataframe, window=window)
+    rolling_std = vwap.rolling(window=window).std()
+    vwap_low = vwap - (rolling_std * num_of_std)
+    vwap_high = vwap + (rolling_std * num_of_std)
+    return vwap_low, vwap, vwap_high
+
+
+def get_ma_fn(mamode: str, zero_lag=False) -> callable:
+    mamodes: dict = {
+        "sma": ta.SMA,
+        "ema": ta.EMA,
+        "wma": ta.WMA,
+        "dema": ta.DEMA,
+        "tema": ta.TEMA,
+        "trima": ta.TRIMA,
+        "kama": ta.KAMA,
+        "t3": ta.T3,
+    }
+    if zero_lag:
+        ma_fn = lambda df, timeperiod: pta.zlma(
+            df["close"], length=timeperiod, mamode=mamode
+        )
+    else:
+        ma_fn = mamodes.get(mamode, mamodes["sma"])
+    return ma_fn
+
+
+def ewo(
+    dataframe: pd.DataFrame,
+    ma1_length=5,
+    ma2_length=34,
+    mamode="sma",
+    zero_lag=False,
+    normalize=False,
+) -> pd.Series:
+    ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
+    ma1 = ma_fn(dataframe, timeperiod=ma1_length)
+    ma2 = ma_fn(dataframe, timeperiod=ma2_length)
+    madiff = ma1 - ma2
+    if normalize:
+        madiff = (
+            madiff / dataframe["close"]
+        ) * 100  # Optional normalization with close price
+    return madiff
+
+
+def smma(
+    df: pd.DataFrame, period: int, mamode="sma", zero_lag=False, offset=0
+) -> pd.Series:
+    """
+    SMoothed Moving Average (SMMA).
+    """
+    close = df["close"]
+    if len(close) < period:
+        return pd.Series(index=close.index, dtype=float)
+
+    smma = close.copy()
+    smma[: period - 1] = np.nan
+    ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
+    smma.iloc[period - 1] = ma_fn(close[:period], timeperiod=period).iloc[-1]
+
+    for i in range(period, len(close)):
+        smma.iat[i] = ((period - 1) * smma.iat[i - 1] + smma.iat[i]) / period
+
+    if offset != 0:
+        smma = smma.shift(offset)
+
+    return smma
+
+
+def alligator(
+    df: pd.DataFrame,
+    jaw_period=13,
+    teeth_period=8,
+    lips_period=5,
+    jaw_shift=8,
+    teeth_shift=5,
+    lips_shift=3,
+    mamode="sma",
+    zero_lag=False,
+) -> tuple[pd.Series, pd.Series, pd.Series]:
+    """
+    Calculate Bill Williams' Alligator indicator lines.
+    """
+    median_price = (df["high"] + df["low"]) / 2
+
+    jaw = smma(median_price, period=jaw_period, mamode=mamode, zero_lag=zero_lag).shift(
+        jaw_shift
+    )
+    teeth = smma(
+        median_price, period=teeth_period, mamode=mamode, zero_lag=zero_lag
+    ).shift(teeth_shift)
+    lips = smma(
+        median_price, period=lips_period, mamode=mamode, zero_lag=zero_lag
+    ).shift(lips_shift)
+
+    return jaw, teeth, lips
+
+
+def fractal_dimension(
+    prices_array: np.ndarray, period: int, normalize: bool = False
+) -> float:
+    """
+    Calculate fractal dimension of a price window, with optional normalization.
+
+    Args:
+        window: Array of prices for the current window.
+        period: Window size (must be even).
+        normalize: If True, normalize HL values by their window lengths.
+
+    Returns:
+        Fractal dimension (D) clipped between 1.0 and 2.0.
+    """
+    if period % 2 != 0:
+        raise ValueError("FRAMA period must be even")
+
+    half_period = period // 2
+    if half_period < 1 or len(prices_array) < period:
+        return 1.0
+    prices_first_half = prices_array[:half_period]
+    prices_second_half = prices_array[half_period:]
+
+    HL1 = np.max(prices_first_half) - np.min(prices_first_half)
+    HL2 = np.max(prices_second_half) - np.min(prices_second_half)
+    HL3 = np.max(prices_array) - np.min(prices_array)
+
+    if normalize:
+        HL1 /= half_period
+        HL2 /= half_period
+        HL3 /= period
+
+    if HL1 + HL2 == 0 or HL3 == 0:
+        return 1.0
+
+    D = (np.log(HL1 + HL2) - np.log(HL3)) / np.log(2)
+    return np.clip(D, 1.0, 2.0)
+
+
+def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series:
+    """
+    Calculate FRAMA with optional normalization.
+
+    Args:
+        prices: Pandas Series of closing prices.
+        period: Lookback window (default=16).
+        normalize: Enable range normalization (default=False).
+
+    Returns:
+        FRAMA values as a Pandas Series.
+    """
+    if period % 2 != 0:
+        raise ValueError("FRAMA period must be even")
+
+    frama = np.full(len(prices), np.nan)
+
+    for i in range(period - 1, len(prices)):
+        prices_array = prices.iloc[i - period + 1 : i + 1].to_numpy()
+        D = fractal_dimension(prices_array, period, normalize)
+        alpha = np.exp(-4.6 * (D - 1))
+
+        if np.isnan(frama[i - 1]):
+            frama[i] = prices_array[-1]
+        else:
+            frama[i] = alpha * prices_array[-1] + (1 - alpha) * frama[i - 1]
+
+    return pd.Series(frama, index=prices.index)