| _Regressor model_ | | | |
| freqai.regressor | `xgboost` | enum {`xgboost`,`lightgbm`} | Machine learning regressor algorithm. |
| _Extrema smoothing_ | | | |
-| freqai.extrema_smoothing.method | `gaussian` | enum {`gaussian`,`kaiser`,`triang`,`smm`,`sma`} | Extrema smoothing kernel (smm=simple moving median, sma=simple moving average). |
-| freqai.extrema_smoothing.window | 5 | int >= 3 | Window size for extrema smoothing. |
-| freqai.extrema_smoothing.beta | 8.0 | float > 0 | Kaiser kernel shape parameter. |
+| freqai.extrema_smoothing.method | `gaussian` | enum {`gaussian`,`kaiser`,`triang`,`smm`,`sma`,`savgol`,`nadaraya_watson`} | Extrema smoothing method (`smm`=median, `sma`=mean, `savgol`=Savitzky–Golay, `nadaraya_watson`=Gaussian kernel regression). |
+| freqai.extrema_smoothing.window | 5 | int >= 3 | Smoothing window length (candles). |
+| freqai.extrema_smoothing.beta | 8.0 | float > 0 | Shape parameter for `kaiser` kernel. |
+| freqai.extrema_smoothing.polyorder | 3 | int >= 1 | Polynomial order for `savgol` smoothing. |
+| freqai.extrema_smoothing.mode | `mirror` | enum {`mirror`,`constant`,`nearest`,`wrap`,`interp`} | Boundary mode for `savgol` and `nadaraya_watson`. |
+| freqai.extrema_smoothing.bandwidth | 1.0 | float > 0 | Gaussian bandwidth for `nadaraya_watson`. |
| _Extrema weighting_ | | | |
| freqai.extrema_weighting.strategy | `none` | enum {`none`,`amplitude`,`amplitude_threshold_ratio`} | Extrema weighting source: unweighted (`none`), swing amplitude (`amplitude`), or volatility-threshold ratio adjusted swing amplitude (`amplitude_threshold_ratio`). |
| freqai.extrema_weighting.standardization | `none` | enum {`none`,`zscore`,`robust`} | Standardization method applied before normalization. `none`=no standardization, `zscore`=(w-μ)/σ, `robust`=(w-median)/IQR. |
-| freqai.extrema_weighting.robust_quantiles | [0.25, 0.75] | list[float] where 0 <= q_low < q_high <= 1 | Quantile range for robust standardization, Q1 and Q3. |
+| freqai.extrema_weighting.robust_quantiles | [0.25, 0.75] | list[float] where 0 <= Q1 < Q3 <= 1 | Quantile range for robust standardization, Q1 and Q3. |
| freqai.extrema_weighting.normalization | `minmax` | enum {`minmax`,`sigmoid`,`softmax`,`l1`,`l2`,`rank`,`none`} | Normalization method for weights. |
-| freqai.extrema_weighting.minmax_range | [0.0, 1.0] | list[float] | Target range for minmax normalization, min and max. |
-| freqai.extrema_weighting.sigmoid_scale | 1.0 | float > 0 | Scale parameter for sigmoid normalization, controls steepness. |
-| freqai.extrema_weighting.softmax_temperature | 1.0 | float > 0 | Temperature parameter for softmax normalization: lower values sharpen distribution, higher values flatten it. |
-| freqai.extrema_weighting.rank_method | `average` | enum {`average`,`min`,`max`,`dense`,`ordinal`} | Ranking method for rank normalization. |
+| freqai.extrema_weighting.minmax_range | [0.0, 1.0] | list[float] | Target range for `minmax` normalization, min and max. |
+| freqai.extrema_weighting.sigmoid_scale | 1.0 | float > 0 | Scale parameter for `sigmoid` normalization, controls steepness. |
+| freqai.extrema_weighting.softmax_temperature | 1.0 | float > 0 | Temperature parameter for `softmax` normalization: lower values sharpen distribution, higher values flatten it. |
+| freqai.extrema_weighting.rank_method | `average` | enum {`average`,`min`,`max`,`dense`,`ordinal`} | Ranking method for `rank` normalization. |
| freqai.extrema_weighting.gamma | 1.0 | float (0,10] | Contrast exponent applied after normalization: >1 emphasizes extrema, values between 0 and 1 soften. |
| _Feature parameters_ | | | |
| freqai.feature_parameters.label_period_candles | min/max midpoint | int >= 1 | Zigzag labeling NATR horizon. |
| freqai.predictions_extrema.thresholds_smoothing | `mean` | enum {`mean`,`isodata`,`li`,`minimum`,`otsu`,`triangle`,`yen`,`median`,`soft_extremum`} | Thresholding method for prediction thresholds smoothing. |
| freqai.predictions_extrema.thresholds_alpha | 12.0 | float > 0 | Alpha for `soft_extremum`. |
| freqai.predictions_extrema.threshold_outlier | 0.999 | float (0,1) | Quantile threshold for predictions outlier filtering. |
+| freqai.predictions_extrema.extrema_fraction | 1.0 | float (0,1] | Fraction of extrema used for thresholds. `1.0` uses all, lower values keep only most significant. Applies to `rank` and `values`; ignored for `partition`. |
| _Optuna / HPO_ | | | |
| freqai.optuna_hyperopt.enabled | true | bool | Enables HPO. |
| freqai.optuna_hyperopt.sampler | `tpe` | enum {`tpe`,`auto`} | HPO sampler algorithm. `tpe` uses TPESampler with multivariate and group, `auto` uses AutoSampler. |
if not isinstance(thresholds_alpha, (int, float)) or thresholds_alpha < 0:
thresholds_alpha = 12.0
+ extrema_fraction = predictions_extrema.get("extrema_fraction", 1.0)
+ if not isinstance(extrema_fraction, (int, float)) or not (
+ 0 < extrema_fraction <= 1
+ ):
+ extrema_fraction = 1.0
+
return {
"threshold_outlier": float(threshold_outlier),
"selection_method": selection_method,
"thresholds_smoothing": thresholds_smoothing,
"thresholds_alpha": float(thresholds_alpha),
+ "extrema_fraction": float(extrema_fraction),
}
@property
pred_extrema = pred_df.get(EXTREMA_COLUMN).iloc[-thresholds_candles:].copy()
- # Use cached parameters
extrema_selection = self.predictions_extrema["selection_method"]
thresholds_smoothing = self.predictions_extrema["thresholds_smoothing"]
+ extrema_fraction = self.predictions_extrema["extrema_fraction"]
if (
thresholds_smoothing == QuickAdapterRegressorV3._THRESHOLD_METHODS[7]
): # "median"
return QuickAdapterRegressorV3.median_min_max(
- pred_extrema, extrema_selection
+ pred_extrema, extrema_selection, extrema_fraction
)
elif (
thresholds_smoothing == QuickAdapterRegressorV3._THRESHOLD_METHODS[8]
pred_extrema,
self.predictions_extrema["thresholds_alpha"],
extrema_selection,
+ extrema_fraction,
)
elif (
thresholds_smoothing
in QuickAdapterRegressorV3._skimage_threshold_methods_set()
):
return QuickAdapterRegressorV3.skimage_min_max(
- pred_extrema, thresholds_smoothing, extrema_selection
+ pred_extrema, thresholds_smoothing, extrema_selection, extrema_fraction
)
+ @staticmethod
+ def _get_extrema_indices(
+ pred_extrema: pd.Series,
+ ) -> tuple[NDArray[np.intp], NDArray[np.intp]]:
+ minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
+ maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
+ return minima_indices, maxima_indices
+
+ @staticmethod
+ def _get_extrema_values(
+ pred_extrema: pd.Series,
+ minima_indices: NDArray[np.intp],
+ maxima_indices: NDArray[np.intp],
+ extrema_fraction: float = 1.0,
+ ) -> tuple[pd.Series, pd.Series]:
+ n_minima = (
+ max(1, int(round(minima_indices.size * extrema_fraction)))
+ if minima_indices.size > 0
+ else 0
+ )
+ n_maxima = (
+ max(1, int(round(maxima_indices.size * extrema_fraction)))
+ if maxima_indices.size > 0
+ else 0
+ )
+
+ minima = (
+ pred_extrema.loc[
+ pred_extrema.iloc[minima_indices].nsmallest(n_minima).index
+ ]
+ if n_minima > 0
+ else pd.Series(dtype=float)
+ )
+ maxima = (
+ pred_extrema.loc[pred_extrema.iloc[maxima_indices].nlargest(n_maxima).index]
+ if n_maxima > 0
+ else pd.Series(dtype=float)
+ )
+
+ return minima, maxima
+
+ @staticmethod
+ def _get_ranked_extrema(
+ pred_extrema: pd.Series,
+ n_minima: int,
+ n_maxima: int,
+ extrema_fraction: float = 1.0,
+ ) -> tuple[pd.Series, pd.Series]:
+ pred_minima = (
+ pred_extrema.nsmallest(max(1, int(round(n_minima * extrema_fraction))))
+ if n_minima > 0
+ else pd.Series(dtype=float)
+ )
+ pred_maxima = (
+ pred_extrema.nlargest(max(1, int(round(n_maxima * extrema_fraction))))
+ if n_maxima > 0
+ else pd.Series(dtype=float)
+ )
+
+ return pred_minima, pred_maxima
+
@staticmethod
def get_pred_min_max(
pred_extrema: pd.Series,
extrema_selection: ExtremaSelectionMethod,
+ extrema_fraction: float = 1.0,
) -> tuple[pd.Series, pd.Series]:
pred_extrema = (
pd.to_numeric(pred_extrema, errors="coerce")
if (
extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[0]
): # "rank"
- minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
- maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
-
- n_minima = minima_indices.size
- n_maxima = maxima_indices.size
-
- if n_minima > 0:
- pred_minima = pred_extrema.nsmallest(n_minima)
- else:
- pred_minima = pd.Series(dtype=float)
+ minima_indices, maxima_indices = (
+ QuickAdapterRegressorV3._get_extrema_indices(pred_extrema)
+ )
+ pred_minima, pred_maxima = QuickAdapterRegressorV3._get_ranked_extrema(
+ pred_extrema,
+ minima_indices.size,
+ maxima_indices.size,
+ extrema_fraction,
+ )
- if n_maxima > 0:
- pred_maxima = pred_extrema.nlargest(n_maxima)
- else:
- pred_maxima = pd.Series(dtype=float)
elif (
extrema_selection == QuickAdapterRegressorV3._EXTREMA_SELECTION_METHODS[1]
): # "values"
- minima_indices = sp.signal.find_peaks(-pred_extrema)[0]
- maxima_indices = sp.signal.find_peaks(pred_extrema)[0]
-
- pred_minima = (
- pred_extrema.iloc[minima_indices]
- if minima_indices.size > 0
- else pd.Series(dtype=float)
+ minima_indices, maxima_indices = (
+ QuickAdapterRegressorV3._get_extrema_indices(pred_extrema)
)
- pred_maxima = (
- pred_extrema.iloc[maxima_indices]
- if maxima_indices.size > 0
- else pd.Series(dtype=float)
+ pred_minima, pred_maxima = QuickAdapterRegressorV3._get_extrema_values(
+ pred_extrema, minima_indices, maxima_indices, extrema_fraction
)
elif (
pred_extrema: pd.Series,
alpha: float,
extrema_selection: ExtremaSelectionMethod,
+ extrema_fraction: float = 1.0,
) -> tuple[float, float]:
if alpha < 0:
raise ValueError("alpha must be non-negative")
pred_minima, pred_maxima = QuickAdapterRegressorV3.get_pred_min_max(
- pred_extrema, extrema_selection
+ pred_extrema, extrema_selection, extrema_fraction
)
soft_minimum = soft_extremum(pred_minima, alpha=-alpha)
if not np.isfinite(soft_minimum):
def median_min_max(
pred_extrema: pd.Series,
extrema_selection: ExtremaSelectionMethod,
+ extrema_fraction: float = 1.0,
) -> tuple[float, float]:
pred_minima, pred_maxima = QuickAdapterRegressorV3.get_pred_min_max(
- pred_extrema, extrema_selection
+ pred_extrema, extrema_selection, extrema_fraction
)
if pred_minima.empty:
pred_extrema: pd.Series,
method: str,
extrema_selection: ExtremaSelectionMethod,
+ extrema_fraction: float = 1.0,
) -> tuple[float, float]:
pred_minima, pred_maxima = QuickAdapterRegressorV3.get_pred_min_max(
- pred_extrema, extrema_selection
+ pred_extrema, extrema_selection, extrema_fraction
)
try:
min_val = min_func(pred_minima, threshold_func)
if not np.isfinite(min_val):
min_val = QuickAdapterRegressorV3.safe_min_pred(pred_extrema)
+
max_val = max_func(pred_maxima, threshold_func)
if not np.isfinite(max_val):
max_val = QuickAdapterRegressorV3.safe_max_pred(pred_extrema)
MINIMA_THRESHOLD_COLUMN,
NORMALIZATION_TYPES,
RANK_METHODS,
+ SMOOTHING_MODES,
SMOOTHING_METHODS,
STANDARDIZATION_TYPES,
WEIGHT_STRATEGIES,
)
smoothing_beta = DEFAULTS_EXTREMA_SMOOTHING["beta"]
+ smoothing_polyorder = extrema_smoothing.get(
+ "polyorder", DEFAULTS_EXTREMA_SMOOTHING["polyorder"]
+ )
+ if not isinstance(smoothing_polyorder, int) or smoothing_polyorder < 1:
+ logger.warning(
+ f"{pair}: invalid extrema_smoothing polyorder {smoothing_polyorder}, must be an integer >= 1, using default {DEFAULTS_EXTREMA_SMOOTHING['polyorder']}"
+ )
+ smoothing_polyorder = DEFAULTS_EXTREMA_SMOOTHING["polyorder"]
+
+ smoothing_mode = str(
+ extrema_smoothing.get("mode", DEFAULTS_EXTREMA_SMOOTHING["mode"])
+ )
+ if smoothing_mode not in set(SMOOTHING_MODES):
+ logger.warning(
+ f"{pair}: invalid extrema_smoothing mode '{smoothing_mode}', using default '{SMOOTHING_MODES[0]}'"
+ )
+ smoothing_mode = SMOOTHING_MODES[0]
+
+ smoothing_bandwidth = extrema_smoothing.get(
+ "bandwidth", DEFAULTS_EXTREMA_SMOOTHING["bandwidth"]
+ )
+ if (
+ not isinstance(smoothing_bandwidth, (int, float))
+ or smoothing_bandwidth <= 0
+ or not np.isfinite(smoothing_bandwidth)
+ ):
+ logger.warning(
+ f"{pair}: invalid extrema_smoothing bandwidth {smoothing_bandwidth}, must be a positive finite number, using default {DEFAULTS_EXTREMA_SMOOTHING['bandwidth']}"
+ )
+ smoothing_bandwidth = DEFAULTS_EXTREMA_SMOOTHING["bandwidth"]
+
return {
"method": smoothing_method,
"window": int(smoothing_window),
"beta": smoothing_beta,
+ "polyorder": int(smoothing_polyorder),
+ "mode": smoothing_mode,
+ "bandwidth": float(smoothing_bandwidth),
}
@staticmethod
self.extrema_smoothing["method"],
self.extrema_smoothing["window"],
self.extrema_smoothing["beta"],
+ self.extrema_smoothing["polyorder"],
+ self.extrema_smoothing["mode"],
+ self.extrema_smoothing["bandwidth"],
)
if debug:
extrema = dataframe[EXTREMA_COLUMN]
import scipy as sp
import talib.abstract as ta
from numpy.typing import NDArray
+from scipy.ndimage import gaussian_filter1d
from technical import qtpylib
T = TypeVar("T", pd.Series, float)
)
SmoothingKernel = Literal["gaussian", "kaiser", "triang"]
-SmoothingMethod = Union[SmoothingKernel, Literal["smm", "sma"]]
+SmoothingMethod = Union[
+ SmoothingKernel, Literal["smm", "sma", "savgol", "nadaraya_watson"]
+]
SMOOTHING_METHODS: Final[tuple[SmoothingMethod, ...]] = (
"gaussian",
"kaiser",
"triang",
"smm",
"sma",
+ "savgol",
+ "nadaraya_watson",
+)
+
+SmoothingMode = Literal["mirror", "constant", "nearest", "wrap", "interp"]
+SMOOTHING_MODES: Final[tuple[SmoothingMode, ...]] = (
+ "mirror",
+ "constant",
+ "nearest",
+ "wrap",
+ "interp",
)
"method": SMOOTHING_METHODS[0], # "gaussian"
"window": 5,
"beta": 8.0,
+ "polyorder": 3,
+ "mode": SMOOTHING_MODES[0], # "mirror"
+ "bandwidth": 1.0,
}
DEFAULTS_EXTREMA_WEIGHTING: Final[dict[str, Any]] = {
@lru_cache(maxsize=8)
def get_gaussian_std(window: int) -> float:
- # Assuming window = 6 * std + 1 => std = (window - 1) / 6
return (window - 1) / 6.0 if window > 1 else 0.5
+def get_smoothing_params(
+ window: int, polyorder: int, mode: SmoothingMode
+) -> tuple[int, int, str]:
+ if window <= polyorder:
+ window = polyorder + 1
+ window = get_odd_window(window)
+ return window, polyorder, mode
+
+
+def nadaraya_watson(
+ series: pd.Series, bandwidth: float, mode: SmoothingMode
+) -> pd.Series:
+ return pd.Series(
+ gaussian_filter1d(
+ series.to_numpy(),
+ sigma=bandwidth,
+ mode=mode, # type: ignore
+ ),
+ index=series.index,
+ )
+
+
@lru_cache(maxsize=8)
def _calculate_coeffs(
window: int,
method: SmoothingMethod = DEFAULTS_EXTREMA_SMOOTHING["method"],
window: int = DEFAULTS_EXTREMA_SMOOTHING["window"],
beta: float = DEFAULTS_EXTREMA_SMOOTHING["beta"],
+ polyorder: int = DEFAULTS_EXTREMA_SMOOTHING["polyorder"],
+ mode: SmoothingMode = DEFAULTS_EXTREMA_SMOOTHING["mode"],
+ bandwidth: float = DEFAULTS_EXTREMA_SMOOTHING["bandwidth"],
) -> pd.Series:
+ n = len(series)
+ if n == 0:
+ return series
if window < 3:
window = 3
+ if n < window:
+ return series
if beta <= 0 or not np.isfinite(beta):
beta = 1.0
return series.rolling(window=odd_window, center=True).median()
elif method == SMOOTHING_METHODS[4]: # "sma" (Simple Moving Average)
return series.rolling(window=odd_window, center=True).mean()
+ elif method == SMOOTHING_METHODS[5]: # "savgol" (Savitzky-Golay)
+ w, p, m = get_smoothing_params(odd_window, polyorder, mode)
+ if n < w:
+ return series
+ return pd.Series(
+ sp.signal.savgol_filter(
+ series.to_numpy(),
+ window_length=w,
+ polyorder=p,
+ mode=m, # type: ignore
+ ),
+ index=series.index,
+ )
+ elif method == SMOOTHING_METHODS[6]: # "nadaraya_watson"
+ return nadaraya_watson(series, bandwidth, mode)
else:
return zero_phase(
series=series,