candles_step: int,
model_training_parameters: dict,
) -> float:
+ def calculate_min_extrema(
+ length: int, fit_live_predictions_candles: int, min_extrema: float = 2.0
+ ) -> int:
+ return int(round((length / fit_live_predictions_candles) * min_extrema))
+
test_ok = True
test_length = len(X_test)
if debug:
n_test_minima: int = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size
n_test_maxima: int = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size
n_test_extrema: int = n_test_minima + n_test_maxima
- min_test_extrema: int = int(
- round((test_length / fit_live_predictions_candles) * 2)
+ min_test_extrema: int = calculate_min_extrema(
+ test_length, fit_live_predictions_candles
)
logger.info(
f"{test_length=}, {n_test_minima=}, {n_test_maxima=}, {n_test_extrema=}, {min_test_extrema=}"
n_test_minima = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size
n_test_maxima = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size
n_test_extrema = n_test_minima + n_test_maxima
- min_test_extrema: int = int(round((test_window / fit_live_predictions_candles) * 2))
+ min_test_extrema: int = calculate_min_extrema(
+ test_window, fit_live_predictions_candles
+ )
if n_test_extrema < min_test_extrema:
if debug:
logger.warning(
- f"Insufficient extrema in test data with {test_window}: {n_test_extrema} < {min_test_extrema}"
+ f"Insufficient extrema in test data with {test_window=}: {n_test_extrema=} < {min_test_extrema=}"
)
test_ok = False
test_weights = test_weights[-test_window:]
n_train_minima: int = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size
n_train_maxima: int = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size
n_train_extrema: int = n_train_minima + n_train_maxima
- min_train_extrema: int = int(
- round((train_length / fit_live_predictions_candles) * 2)
+ min_train_extrema: int = calculate_min_extrema(
+ train_length, fit_live_predictions_candles
)
logger.info(
f"{train_length=}, {n_train_minima=}, {n_train_maxima=}, {n_train_extrema=}, {min_train_extrema=}"
n_train_minima = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size
n_train_maxima = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size
n_train_extrema = n_train_minima + n_train_maxima
- min_train_extrema: int = int(
- round((train_window / fit_live_predictions_candles) * 2)
+ min_train_extrema: int = calculate_min_extrema(
+ train_window, fit_live_predictions_candles
)
if n_train_extrema < min_train_extrema:
if debug:
logger.warning(
- f"Insufficient extrema in train data with {train_window} : {n_train_extrema} < {min_train_extrema}"
+ f"Insufficient extrema in train data with {train_window=}: {n_train_extrema=} < {min_train_extrema=}"
)
train_ok = False
train_weights = train_weights[-train_window:]
bottom_change_percent,
calculate_quantile,
get_zl_ma_fn,
+ zero_phase,
zigzag,
ewo,
non_zero_diff,
get_gaussian_window,
get_odd_window,
derive_gaussian_std_from_window,
- zero_phase_gaussian,
zlema,
)
logger.error(
f"Failed to calculate KAMA for pair {pair}: {str(e)}", exc_info=True
)
- return zlema(label_natr, period=trade_duration_candles).iloc[-1]
+ return label_natr.iloc[-1]
def get_trade_natr(
self, df: DataFrame, trade: Trade, trade_duration_candles: int
self,
series: Series,
window: int,
- std: Optional[float] = None,
) -> Series:
extrema_smoothing = self.freqai_info.get("extrema_smoothing", "gaussian")
- if std is None:
- std = derive_gaussian_std_from_window(window)
+ extrema_smoothing_zero_phase = self.freqai_info.get(
+ "extrema_smoothing_zero_phase", True
+ )
+ extrema_smoothing_beta = float(
+ self.freqai_info.get("extrema_smoothing_beta", 10.0)
+ )
+ std = derive_gaussian_std_from_window(window)
+ if debug:
+ logger.info(
+ f"{extrema_smoothing=}, {extrema_smoothing_zero_phase=}, {extrema_smoothing_beta=}, {window=}, {std=}"
+ )
gaussian_window = get_gaussian_window(std, True)
odd_window = get_odd_window(window)
smoothing_methods: dict[str, Series] = {
- "gaussian": series.rolling(
+ "gaussian": zero_phase(
+ series=series,
+ window=window,
+ win_type="gaussian",
+ std=std,
+ beta=extrema_smoothing_beta,
+ )
+ if extrema_smoothing_zero_phase
+ else series.rolling(
window=gaussian_window,
win_type="gaussian",
center=True,
).mean(std=std),
- "zero_phase_gaussian": zero_phase_gaussian(
- series=series, window=window, std=std
- ),
- "boxcar": series.rolling(
- window=odd_window, win_type="boxcar", center=True
- ).mean(),
- "triang": series.rolling(
- window=odd_window, win_type="triang", center=True
- ).mean(),
+ "kaiser": zero_phase(
+ series=series,
+ window=window,
+ win_type="kaiser",
+ std=std,
+ beta=extrema_smoothing_beta,
+ )
+ if extrema_smoothing_zero_phase
+ else series.rolling(
+ window=odd_window,
+ win_type="kaiser",
+ center=True,
+ ).mean(beta=extrema_smoothing_beta),
"smm": series.rolling(window=odd_window, center=True).median(),
"sma": series.rolling(window=odd_window, center=True).mean(),
"ewma": series.ewm(span=window).mean(),
- "zlewma": zlema(series, period=window),
}
return smoothing_methods.get(
extrema_smoothing,
import pandas as pd
import scipy as sp
import talib.abstract as ta
-from typing import Callable, Union
+from typing import Callable, Literal, Union
from technical import qtpylib
@lru_cache(maxsize=64)
-def _calculate_gaussian_coeffs(window: int, std: float) -> np.ndarray:
- gaussian_coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True)
- return gaussian_coeffs / np.sum(gaussian_coeffs)
+def _calculate_coeffs(
+ window: int, win_type: Literal["gaussian", "kaiser"], std: float, beta: float
+) -> np.ndarray:
+ if win_type == "gaussian":
+ coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True)
+ elif win_type == "kaiser":
+ coeffs = sp.signal.windows.kaiser(M=window, beta=beta, sym=True)
+ else:
+ raise ValueError(f"Unknown window type: {win_type}")
+ return coeffs / np.sum(coeffs)
-def zero_phase_gaussian(series: pd.Series, window: int, std: float) -> pd.Series:
+def zero_phase(
+ series: pd.Series,
+ window: int,
+ win_type: Literal["gaussian", "kaiser"],
+ std: float,
+ beta: float,
+) -> pd.Series:
if len(series) == 0:
return series
if len(series) < window:
raise ValueError("Series length must be greater than or equal to window size")
series_values = series.to_numpy()
- b = _calculate_gaussian_coeffs(window, std)
+ b = _calculate_coeffs(window=window, win_type=win_type, std=std, beta=beta)
a = 1.0
filtered_values = sp.signal.filtfilt(b, a, series_values)
return pd.Series(filtered_values, index=series.index)
lag = max((period - 1) / 2, 0)
alpha = 2 / (period + 1)
zl_series = 2 * series - series.shift(int(lag))
- return zl_series.ewm(alpha=alpha).mean()
+ return zl_series.ewm(alpha=alpha, adjust=False).mean()
def _fractal_dimension(highs: np.ndarray, lows: np.ndarray, period: int) -> float: