from enum import IntEnum
+from functools import lru_cache
import numpy as np
import pandas as pd
import scipy as sp
return (window - 1) / 6.0 if window > 1 else 0.5
+@lru_cache(maxsize=64)
+def _calculate_gaussian_coeffs(window: int, std: float) -> np.ndarray:
+ gaussian_coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True)
+ return gaussian_coeffs / np.sum(gaussian_coeffs)
+
+
def zero_phase_gaussian(series: pd.Series, window: int, std: float) -> pd.Series:
if len(series) == 0:
return series
if len(series) < window:
raise ValueError("Series length must be greater than or equal to window size")
series_values = series.to_numpy()
- gaussian_coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True)
- b = gaussian_coeffs / np.sum(gaussian_coeffs)
+ b = _calculate_gaussian_coeffs(window, std)
a = 1.0
filtered_values = sp.signal.filtfilt(b, a, series_values)
return pd.Series(filtered_values, index=series.index)
for i in range(period, n):
window_highs = highs.iloc[i - period : i]
window_lows = lows.iloc[i - period : i]
- fd.iloc[i] = _fractal_dimension(window_highs.values, window_lows.values, period)
+ fd.iloc[i] = _fractal_dimension(
+ window_highs.to_numpy(), window_lows.to_numpy(), period
+ )
alpha = np.exp(-4.6 * (fd - 1)).clip(0.01, 1)
prices = get_price_fn(pricemode)(dataframe)
if zero_lag:
- prices_ma1 = calculate_zero_lag(prices, period=ma1_length)
- prices_ma2 = calculate_zero_lag(prices, period=ma2_length)
+ if mamode == "ema":
+ ma_fn = lambda series, timeperiod: zlema(series, period=timeperiod)
+ else:
+ ma_fn = get_zl_ma_fn(mamode)
else:
- prices_ma1 = prices
- prices_ma2 = prices
+ ma_fn = get_ma_fn(mamode)
- ma_fn = get_ma_fn(mamode)
- ma1 = ma_fn(prices_ma1, timeperiod=ma1_length)
- ma2 = ma_fn(prices_ma2, timeperiod=ma2_length)
+ ma1 = ma_fn(prices, timeperiod=ma1_length)
+ ma2 = ma_fn(prices, timeperiod=ma2_length)
madiff = ma1 - ma2
if normalize:
madiff = (madiff / prices) * 100.0
if n < 2 * period + 1:
return [], []
- highs = df.get("high").values
- lows = df.get("low").values
-
- fractal_candidate_indices = np.arange(period, n - period)
-
- fractal_candidate_indices_length = len(fractal_candidate_indices)
- is_fractal_high = np.ones(fractal_candidate_indices_length, dtype=bool)
- is_fractal_low = np.ones(fractal_candidate_indices_length, dtype=bool)
+ highs = df.get("high").to_numpy()
+ lows = df.get("low").to_numpy()
- for i in range(1, period + 1):
- is_fractal_high &= (
- highs[fractal_candidate_indices] > highs[fractal_candidate_indices - i]
- ) & (highs[fractal_candidate_indices] > highs[fractal_candidate_indices + i])
+ fractal_highs = []
+ fractal_lows = []
- is_fractal_low &= (
- lows[fractal_candidate_indices] < lows[fractal_candidate_indices - i]
- ) & (lows[fractal_candidate_indices] < lows[fractal_candidate_indices + i])
+ for i in range(period, n - period):
+ is_high_fractal = all(
+ highs[i] > highs[i - j] and highs[i] > highs[i + j]
+ for j in range(1, period + 1)
+ )
+ is_low_fractal = all(
+ lows[i] < lows[i - j] and lows[i] < lows[i + j]
+ for j in range(1, period + 1)
+ )
- if not np.any(is_fractal_high) and not np.any(is_fractal_low):
- break
+ if is_high_fractal:
+ fractal_highs.append(i)
+ if is_low_fractal:
+ fractal_lows.append(i)
- return (
- fractal_candidate_indices[is_fractal_high].tolist(),
- fractal_candidate_indices[is_fractal_low].tolist(),
- )
+ return fractal_highs, fractal_lows
def calculate_quantile(values: np.ndarray, value: float) -> float:
if period not in natr_values_cache:
natr_values_cache[period] = (
ta.NATR(df, timeperiod=period).fillna(method="bfill") / 100.0
- ).values
+ ).to_numpy()
return natr_values_cache[period]
indices = df.index.tolist()
thresholds = get_natr_values(natr_period) * natr_ratio
- closes = df.get("close").values
- highs = df.get("high").values
- lows = df.get("low").values
+ closes = df.get("close").to_numpy()
+ highs = df.get("high").to_numpy()
+ lows = df.get("low").to_numpy()
state: TrendDirection = TrendDirection.NEUTRAL
depth = -1