dataframe["%-bb_width"] = (
dataframe["bb_upperband"] - dataframe["bb_lowerband"]
) / dataframe["bb_middleband"]
- dataframe["%-ibs"] = (dataframe["close"] - dataframe["low"]) / (
- dataframe["high"] - dataframe["low"]
- )
+ dataframe["%-ibs"] = (
+ (dataframe["close"] - dataframe["low"])
+ / (dataframe["high"] - dataframe["low"])
+ ).fillna(0.0)
# dataframe["ema_50"] = ta.EMA(dataframe, timeperiod=50)
# dataframe["ema_12"] = ta.EMA(dataframe, timeperiod=12)
# dataframe["ema_26"] = ta.EMA(dataframe, timeperiod=26)
dataframe["close"], dataframe["vwap_lowerband"]
)
dataframe["%-body"] = dataframe["close"] - dataframe["open"]
- dataframe["%-tail"] = (dataframe["close"] - dataframe["low"]).abs()
- dataframe["%-wick"] = (dataframe["high"] - dataframe["close"]).abs()
+ dataframe["%-tail"] = (
+ np.minimum(dataframe["open"], dataframe["close"]) - dataframe["low"]
+ ).clip(lower=0)
+ dataframe["%-wick"] = (
+ dataframe["high"] - np.maximum(dataframe["open"], dataframe["close"])
+ ).clip(lower=0)
pp = pivots_points(dataframe)
dataframe["pivot"] = pp["pivot"]
dataframe["r1"] = pp["r1"]
}[extrema_smoothing]
-def top_percent_change(dataframe: DataFrame, length: int) -> float:
+def top_percent_change(dataframe: DataFrame, length: int) -> Series:
"""
- Percentage change of the current close from the range maximum Open price
+ Percentage change of the current close relative from the range maximum open price
:param dataframe: DataFrame The original OHLC dataframe
- :param length: int The length to look back
+ :param length: int The period length to look back
"""
if length == 0:
- return (dataframe["open"] - dataframe["close"]) / dataframe["close"]
+ return ((dataframe["close"] - dataframe["open"]) / dataframe["open"]).fillna(
+ 0.0
+ )
else:
- return (
- dataframe["open"].rolling(length).max() - dataframe["close"]
- ) / dataframe["close"]
+ open_max = dataframe["open"].rolling(length).max()
+ return ((dataframe["close"] - open_max) / open_max).fillna(0.0)
-def chaikin_mf(df: DataFrame, periods=20):
+def chaikin_mf(df: DataFrame, periods=20) -> Series:
close = df["close"]
low = df["low"]
high = df["high"]
mfv = ((close - low) - (high - close)) / (high - low)
mfv = mfv.fillna(0.0)
mfv *= volume
- cmf = mfv.rolling(periods).sum() / volume.rolling(periods).sum()
+ cmf = mfv.rolling(window=periods).sum() / volume.rolling(window=periods).sum()
return Series(cmf, name="cmf")
# VWAP bands
-def VWAPB(dataframe: DataFrame, window=20, num_of_std=1):
+def VWAPB(dataframe: DataFrame, window=20, num_of_std=1) -> tuple:
vwap = qtpylib.rolling_vwap(dataframe, window=window)
rolling_std = vwap.rolling(window=window).std()
vwap_low = vwap - (rolling_std * num_of_std)
return vwap_low, vwap, vwap_high
-def EWO(dataframe: DataFrame, sma1_length=5, sma2_length=35):
- sma1 = ta.EMA(dataframe, timeperiod=sma1_length)
- sma2 = ta.EMA(dataframe, timeperiod=sma2_length)
- smadif = (sma1 - sma2) / dataframe["close"] * 100
+def EWO(
+ dataframe: DataFrame, sma1_length=5, sma2_length=34, normalize=False
+) -> DataFrame:
+ sma1 = ta.SMA(dataframe, timeperiod=sma1_length)
+ sma2 = ta.SMA(dataframe, timeperiod=sma2_length)
+ smadif = sma1 - sma2
+ if normalize:
+ smadif = ((smadif / dataframe["close"]) * 100).fillna(
+ 0.0
+ ) # Optional normalization
return smadif
-def zlewma(series: Series, length: int):
+def zlewma(series: Series, length: int) -> Series:
"""
Calculate the ZLEWMA (Zero Lag Exponential Weighted Moving Average) of a series.
"""
return series.ewm(span=length).mean()
-def get_distance(p1: float, p2: float):
+def get_distance(p1: Series | float, p2: Series | float) -> Series | float:
return abs((p1) - (p2))