import numpy as np
import pandas as pd
-import pandas_ta as pta
import talib.abstract as ta
from scipy.signal import convolve
from scipy.signal.windows import gaussian
-from sys import float_info
from technical import qtpylib
def non_zero_range(s1: pd.Series, s2: pd.Series) -> pd.Series:
"""Returns the difference of two series and adds epsilon to any zero values."""
diff = s1 - s2
- if diff.eq(0).any().any():
- diff += float_info.epsilon
+ diff = diff.mask(diff == 0, other=diff + np.finfo(float).eps)
return diff
return vwap_low, vwap, vwap_high
-def get_ma_fn(mamode: str, zero_lag=False) -> callable:
+def zero_lag_series(series: pd.Series, timeperiod: int) -> pd.Series:
+ lag = int(0.5 * (timeperiod - 1))
+ return 2 * series - series.shift(lag)
+
+
+def get_ma_fn(mamode: str) -> callable:
mamodes: dict = {
"sma": ta.SMA,
"ema": ta.EMA,
"kama": ta.KAMA,
"t3": ta.T3,
}
- if zero_lag:
- ma_fn = lambda series, timeperiod: pta.zlma(
- series, length=timeperiod, mamode=mamode
- )
- else:
- ma_fn = mamodes.get(mamode, mamodes["sma"])
- return ma_fn
+ return mamodes.get(mamode, mamodes["sma"])
def fractal_dimension(
if len(series) < period:
return pd.Series(index=series.index, dtype=float)
+ if zero_lag:
+ series = zero_lag_series(series, timeperiod=period)
smma = series.copy()
smma[: period - 1] = np.nan
- ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
smma.iloc[period - 1] = pd.Series(
- ma_fn(series.iloc[:period], timeperiod=period)
+ get_ma_fn(mamode)(series.iloc[:period], timeperiod=period)
).iloc[-1]
for i in range(period, len(series)):
"""
Calculate the Elliott Wave Oscillator (EWO) using two moving averages.
"""
+ ma_fn = get_ma_fn(mamode)
price_series = get_price_fn(pricemode)(dataframe)
- ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
- ma1 = ma_fn(price_series, timeperiod=ma1_length)
- ma2 = ma_fn(price_series, timeperiod=ma2_length)
+ if zero_lag:
+ price_series_ma1_length = zero_lag_series(price_series, timeperiod=ma1_length)
+ price_series_ma2_length = zero_lag_series(price_series, timeperiod=ma2_length)
+ ma1 = ma_fn(price_series_ma1_length, timeperiod=ma1_length)
+ ma2 = ma_fn(price_series_ma2_length, timeperiod=ma2_length)
+ else:
+ ma1 = ma_fn(price_series, timeperiod=ma1_length)
+ ma2 = ma_fn(price_series, timeperiod=ma2_length)
madiff = ma1 - ma2
if normalize:
madiff = (madiff / price_series) * 100