From 05bfe10b35c0a22059ae911b66edd0ffceaac675 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 21 Nov 2025 16:54:10 +0100 Subject: [PATCH] feat(qav3): add more extrema weighting normalization methods MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- README.md | 5 +- .../user_data/strategies/QuickAdapterV3.py | 142 ++++++++--- quickadapter/user_data/strategies/Utils.py | 229 +++++++++++++----- 3 files changed, 284 insertions(+), 92 deletions(-) diff --git a/README.md b/README.md index cddb991..c54a82d 100644 --- a/README.md +++ b/README.md @@ -57,8 +57,11 @@ docker compose up -d --build | freqai.extrema_smoothing.beta | 8.0 | float > 0 | Kaiser kernel shape parameter. | | _Extrema weighting_ | | | | | freqai.extrema_weighting.strategy | `none` | enum {`none`,`threshold`} | Weighting strategy applied before smoothing. | -| freqai.extrema_weighting.normalization | `minmax` | enum {`minmax`,`zscore`,`l1`,`l2`,`none`} | Normalization method for weights. | +| freqai.extrema_weighting.normalization | `minmax` | enum {`minmax`,`zscore`,`l1`,`l2`,`robust`,`softmax`,`tanh`,`rank`,`none`} | Normalization method for weights. | | freqai.extrema_weighting.gamma | 1.0 | float (0,10] | Contrast exponent applied after normalization (>1 emphasizes extremes, 0 0 | Temperature parameter for softmax normalization (lower values sharpen distribution, higher values flatten it). | +| freqai.extrema_weighting.robust_quantiles | [0.25, 0.75] | tuple[float, float] where 0 <= q_low < q_high <= 1 | Quantile range for robust normalization (IQR-based scaling). | +| freqai.extrema_weighting.rank_method | `average` | enum {`average`,`min`,`max`,`dense`,`ordinal`} | Ranking method for rank normalization (how tied values are handled). | | _Feature parameters_ | | | | | freqai.feature_parameters.label_period_candles | min/max midpoint | int >= 1 | Zigzag labeling NATR horizon. | | freqai.feature_parameters.min_label_period_candles | 12 | int >= 1 | Minimum labeling NATR horizon used for reversals labeling HPO. | diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index d2ec3a4..517cb58 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -31,6 +31,7 @@ from Utils import ( DEFAULTS_EXTREMA_SMOOTHING, DEFAULTS_EXTREMA_WEIGHTING, NORMALIZATION_TYPES, + RANK_METHODS, SMOOTHING_METHODS, WEIGHT_STRATEGIES, TrendDirection, @@ -582,6 +583,104 @@ class QuickAdapterV3(IStrategy): ) return self.get_label_natr_ratio(pair) * percent + @staticmethod + def _get_extrema_weighting_params( + extrema_weighting: dict[str, Any], pair: str + ) -> dict[str, Any]: + weighting_strategy = str( + extrema_weighting.get("strategy", DEFAULTS_EXTREMA_WEIGHTING["strategy"]) + ) + if weighting_strategy not in set(WEIGHT_STRATEGIES): + logger.warning( + f"{pair}: invalid extrema_weighting strategy '{weighting_strategy}', using default '{WEIGHT_STRATEGIES[0]}'" + ) + weighting_strategy = WEIGHT_STRATEGIES[0] + + weighting_normalization = str( + extrema_weighting.get( + "normalization", DEFAULTS_EXTREMA_WEIGHTING["normalization"] + ) + ) + if weighting_normalization not in set(NORMALIZATION_TYPES): + logger.warning( + f"{pair}: invalid extrema_weighting normalization '{weighting_normalization}', using default '{NORMALIZATION_TYPES[0]}'" + ) + weighting_normalization = NORMALIZATION_TYPES[0] + + weighting_gamma = extrema_weighting.get( + "gamma", DEFAULTS_EXTREMA_WEIGHTING["gamma"] + ) + if ( + not isinstance(weighting_gamma, (int, float)) + or not np.isfinite(weighting_gamma) + or not (0 < float(weighting_gamma) <= 10.0) + ): + logger.warning( + f"{pair}: invalid extrema_weighting gamma {weighting_gamma}, must be a finite number in (0, 10], using default 1.0" + ) + weighting_gamma = DEFAULTS_EXTREMA_WEIGHTING["gamma"] + else: + weighting_gamma = float(weighting_gamma) + + weighting_softmax_temperature = extrema_weighting.get( + "softmax_temperature", DEFAULTS_EXTREMA_WEIGHTING["softmax_temperature"] + ) + if ( + not isinstance(weighting_softmax_temperature, (int, float)) + or not np.isfinite(weighting_softmax_temperature) + or weighting_softmax_temperature <= 0 + ): + logger.warning( + f"{pair}: invalid extrema_weighting softmax_temperature {weighting_softmax_temperature}, must be > 0, using default 1.0" + ) + weighting_softmax_temperature = DEFAULTS_EXTREMA_WEIGHTING[ + "softmax_temperature" + ] + else: + weighting_softmax_temperature = float(weighting_softmax_temperature) + + weighting_robust_quantiles = extrema_weighting.get( + "robust_quantiles", DEFAULTS_EXTREMA_WEIGHTING["robust_quantiles"] + ) + if ( + not isinstance(weighting_robust_quantiles, (list, tuple)) + or len(weighting_robust_quantiles) != 2 + or not all( + isinstance(q, (int, float)) and np.isfinite(q) and 0 <= q <= 1 + for q in weighting_robust_quantiles + ) + or weighting_robust_quantiles[0] >= weighting_robust_quantiles[1] + ): + logger.warning( + f"{pair}: invalid extrema_weighting robust_quantiles {weighting_robust_quantiles}, must be (q_low, q_high) with 0 <= q_low < q_high <= 1, using default (0.25, 0.75)" + ) + weighting_robust_quantiles = DEFAULTS_EXTREMA_WEIGHTING["robust_quantiles"] + else: + weighting_robust_quantiles = ( + float(weighting_robust_quantiles[0]), + float(weighting_robust_quantiles[1]), + ) + + weighting_rank_method = str( + extrema_weighting.get( + "rank_method", DEFAULTS_EXTREMA_WEIGHTING["rank_method"] + ) + ) + if weighting_rank_method not in set(RANK_METHODS): + logger.warning( + f"{pair}: invalid extrema_weighting rank_method '{weighting_rank_method}', using default '{RANK_METHODS[0]}'" + ) + weighting_rank_method = RANK_METHODS[0] + + return { + "strategy": weighting_strategy, + "normalization": weighting_normalization, + "gamma": weighting_gamma, + "softmax_temperature": weighting_softmax_temperature, + "robust_quantiles": weighting_robust_quantiles, + "rank_method": weighting_rank_method, + } + @staticmethod @lru_cache(maxsize=128) def td_format( @@ -652,47 +751,20 @@ class QuickAdapterV3(IStrategy): extrema_weighting = self.freqai_info.get("extrema_weighting", {}) if not isinstance(extrema_weighting, dict): extrema_weighting = {} - - weighting_strategy = str( - extrema_weighting.get("strategy", DEFAULTS_EXTREMA_WEIGHTING["strategy"]) - ) - if weighting_strategy not in set(WEIGHT_STRATEGIES): - logger.warning( - f"{pair}: invalid extrema_weighting strategy '{weighting_strategy}', using default '{WEIGHT_STRATEGIES[0]}'" - ) - weighting_strategy = WEIGHT_STRATEGIES[0] - weighting_normalization = str( - extrema_weighting.get( - "normalization", DEFAULTS_EXTREMA_WEIGHTING["normalization"] - ) + extrema_weighting_params = QuickAdapterV3._get_extrema_weighting_params( + extrema_weighting, pair ) - if weighting_normalization not in set(NORMALIZATION_TYPES): - logger.warning( - f"{pair}: invalid extrema_weighting normalization '{weighting_normalization}', using default '{NORMALIZATION_TYPES[0]}'" - ) - weighting_normalization = NORMALIZATION_TYPES[0] - weighting_gamma = extrema_weighting.get( - "gamma", DEFAULTS_EXTREMA_WEIGHTING["gamma"] - ) - if ( - not isinstance(weighting_gamma, (int, float)) - or not np.isfinite(weighting_gamma) - or not (0 < float(weighting_gamma) <= 10.0) - ): - logger.warning( - f"{pair}: invalid extrema_weighting gamma {weighting_gamma}, must be a finite number in (0, 10], using default 1.0" - ) - weighting_gamma = 1.0 - else: - weighting_gamma = float(weighting_gamma) weighted_extrema, _ = get_weighted_extrema( extrema=dataframe[EXTREMA_COLUMN], indices=pivots_indices, weights=np.array(pivots_thresholds), - strategy=weighting_strategy, - normalization=weighting_normalization, - gamma=weighting_gamma, + strategy=extrema_weighting_params["strategy"], + normalization=extrema_weighting_params["normalization"], + gamma=extrema_weighting_params["gamma"], + softmax_temperature=extrema_weighting_params["softmax_temperature"], + robust_quantiles=extrema_weighting_params["robust_quantiles"], + rank_method=extrema_weighting_params["rank_method"], ) dataframe[EXTREMA_COLUMN] = smooth_extrema( diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 6b5cbda..20b86e8 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -21,13 +21,28 @@ T = TypeVar("T", pd.Series, float) WeightStrategy = Literal["none", "threshold"] WEIGHT_STRATEGIES: Final[tuple[WeightStrategy, ...]] = ("none", "threshold") -NormalizationType = Literal["minmax", "zscore", "l1", "l2", "none"] +NormalizationType = Literal[ + "minmax", "zscore", "l1", "l2", "robust", "softmax", "tanh", "rank", "none" +] NORMALIZATION_TYPES: Final[tuple[NormalizationType, ...]] = ( "minmax", # 0 "zscore", # 1 "l1", # 2 "l2", # 3 - "none", # 4 + "robust", # 4 + "softmax", # 5 + "tanh", # 6 + "rank", # 7 + "none", # 8 +) + +RankMethod = Literal["average", "min", "max", "dense", "ordinal"] +RANK_METHODS: Final[tuple[RankMethod, ...]] = ( + "average", + "min", + "max", + "dense", + "ordinal", ) SmoothingKernel = Literal["gaussian", "kaiser", "triang"] @@ -51,9 +66,12 @@ DEFAULTS_EXTREMA_WEIGHTING: Final[dict[str, Any]] = { "normalization": NORMALIZATION_TYPES[0], # "minmax" "gamma": 1.0, "strategy": WEIGHT_STRATEGIES[0], # "none" + "softmax_temperature": 1.0, + "robust_quantiles": (0.25, 0.75), + "rank_method": RANK_METHODS[0], # "average" } -DEFAULT_EXTREMA_WEIGHT = 1.0 +DEFAULT_EXTREMA_WEIGHT: Final[float] = 1.0 def get_distance(p1: T, p2: T) -> T: @@ -172,7 +190,7 @@ def smooth_extrema( ) -def zscore_normalize_weights( +def _normalize_zscore( weights: NDArray[np.floating], rescale_to_unit_range: bool = True, ) -> NDArray[np.floating]: @@ -182,21 +200,21 @@ def zscore_normalize_weights( weights = weights.astype(float, copy=False) if np.isnan(weights).any(): - return np.full_like(weights, 1.0, dtype=float) + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) if weights.size == 1 or np.allclose(weights, weights[0]): if rescale_to_unit_range: - return np.full_like(weights, 1.0, dtype=float) + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) else: return np.zeros_like(weights, dtype=float) try: z_scores = sp.stats.zscore(weights, ddof=1, nan_policy="raise") except Exception: - return np.full_like(weights, 1.0, dtype=float) + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) if np.isnan(z_scores).any() or not np.isfinite(z_scores).all(): - return np.full_like(weights, 1.0, dtype=float) + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) if not rescale_to_unit_range: return z_scores @@ -206,13 +224,105 @@ def zscore_normalize_weights( z_range = z_max - z_min if np.isclose(z_range, 0.0): - return np.full_like(weights, 1.0, dtype=float) + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) normalized_weights = (z_scores - z_min) / z_range if np.isnan(normalized_weights).any(): - return np.full_like(weights, 1.0, dtype=float) + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + + return normalized_weights + + +def _normalize_minmax(weights: NDArray[np.floating]) -> NDArray[np.floating]: + weights = weights.astype(float, copy=False) + if np.isnan(weights).any(): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + w_min = np.min(weights) + w_max = np.max(weights) + if not (np.isfinite(w_min) and np.isfinite(w_max)): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + w_range = w_max - w_min + if np.isclose(w_range, 0.0): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + normalized_weights = (weights - w_min) / w_range + return normalized_weights + + +def _normalize_l1(weights: NDArray[np.floating]) -> NDArray[np.floating]: + weights_sum = np.sum(np.abs(weights)) + if weights_sum <= 0 or not np.isfinite(weights_sum): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + normalized_weights = weights / weights_sum + return normalized_weights + + +def _normalize_l2(weights: NDArray[np.floating]) -> NDArray[np.floating]: + weights = weights.astype(float, copy=False) + if np.isnan(weights).any(): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + + l2_norm = np.linalg.norm(weights, ord=2) + + if l2_norm <= 0 or not np.isfinite(l2_norm): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + + normalized_weights = weights / l2_norm + return normalized_weights + + +def _normalize_robust( + weights: NDArray[np.floating], quantiles: tuple[float, float] = (0.25, 0.75) +) -> NDArray[np.floating]: + weights = weights.astype(float, copy=False) + if np.isnan(weights).any(): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + + median = np.median(weights) + q_low, q_high = np.quantile(weights, quantiles) + iqr = q_high - q_low + if np.isclose(iqr, 0.0): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + + normalized_weights = (weights - median) / iqr + return normalized_weights + + +def _normalize_softmax( + weights: NDArray[np.floating], temperature: float = 1.0 +) -> NDArray[np.floating]: + weights = weights.astype(float, copy=False) + if np.isnan(weights).any(): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + if not np.isclose(temperature, 1.0) and temperature > 0: + weights = weights / temperature + return sp.special.softmax(weights) + + +def _normalize_tanh(weights: NDArray[np.floating]) -> NDArray[np.floating]: + weights = weights.astype(float, copy=False) + if np.isnan(weights).any(): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + + z_scores = _normalize_zscore(weights, rescale_to_unit_range=False) + normalized_weights = 0.5 * (np.tanh(z_scores) + 1.0) + return normalized_weights + + +def _normalize_rank( + weights: NDArray[np.floating], method: RankMethod = "average" +) -> NDArray[np.floating]: + weights = weights.astype(float, copy=False) + if np.isnan(weights).any(): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + + ranks = sp.stats.rankdata(weights, method=method) + n = len(weights) + if n <= 1: + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) + + normalized_weights = (ranks - 1) / (n - 1) return normalized_weights @@ -220,56 +330,44 @@ def normalize_weights( weights: NDArray[np.floating], normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING["normalization"], gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"], + softmax_temperature: float = DEFAULTS_EXTREMA_WEIGHTING["softmax_temperature"], + robust_quantiles: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING[ + "robust_quantiles" + ], + rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"], ) -> NDArray[np.floating]: if weights.size == 0: return weights - if normalization == NORMALIZATION_TYPES[4]: # "none" + if normalization == NORMALIZATION_TYPES[8]: # "none" return weights normalized_weights: NDArray[np.floating] if normalization == NORMALIZATION_TYPES[0]: # "minmax" - weights = weights.astype(float, copy=False) - if np.isnan(weights).any(): - return np.full_like(weights, 1.0, dtype=float) - w_min = np.min(weights) - w_max = np.max(weights) - if not (np.isfinite(w_min) and np.isfinite(w_max)): - return np.full_like(weights, 1.0, dtype=float) - w_range = w_max - w_min - if np.isclose(w_range, 0.0): - return np.full_like(weights, 1.0, dtype=float) - normalized_weights = (weights - w_min) / w_range - if np.isnan(normalized_weights).any(): - return np.full_like(weights, 1.0, dtype=float) + normalized_weights = _normalize_minmax(weights) elif normalization == NORMALIZATION_TYPES[1]: # "zscore" - normalized_weights = zscore_normalize_weights( - weights, rescale_to_unit_range=True - ) + normalized_weights = _normalize_zscore(weights, rescale_to_unit_range=True) elif normalization == NORMALIZATION_TYPES[2]: # "l1" - weights_sum = np.sum(np.abs(weights)) - if weights_sum <= 0 or not np.isfinite(weights_sum): - return np.full_like(weights, 1.0, dtype=float) - normalized_weights = weights / weights_sum - if np.isnan(normalized_weights).any(): - return np.full_like(weights, 1.0, dtype=float) + normalized_weights = _normalize_l1(weights) elif normalization == NORMALIZATION_TYPES[3]: # "l2" - weights = weights.astype(float, copy=False) - if np.isnan(weights).any(): - return np.full_like(weights, 1.0, dtype=float) + normalized_weights = _normalize_l2(weights) - l2_norm = np.linalg.norm(weights, ord=2) + elif normalization == NORMALIZATION_TYPES[4]: # "robust" + normalized_weights = _normalize_robust(weights, quantiles=robust_quantiles) - if l2_norm <= 0 or not np.isfinite(l2_norm): - return np.full_like(weights, 1.0, dtype=float) + elif normalization == NORMALIZATION_TYPES[5]: # "softmax" + normalized_weights = _normalize_softmax( + weights, temperature=softmax_temperature + ) - normalized_weights = weights / l2_norm + elif normalization == NORMALIZATION_TYPES[6]: # "tanh" + normalized_weights = _normalize_tanh(weights) - if np.isnan(normalized_weights).any(): - return np.full_like(weights, 1.0, dtype=float) + elif normalization == NORMALIZATION_TYPES[7]: # "rank" + normalized_weights = _normalize_rank(weights, method=rank_method) else: raise ValueError(f"Unknown normalization method: {normalization}") @@ -278,8 +376,9 @@ def normalize_weights( normalized_weights = np.power(np.abs(normalized_weights), gamma) * np.sign( normalized_weights ) - if np.isnan(normalized_weights).any(): - return np.full_like(weights, 1.0, dtype=float) + + if np.isnan(normalized_weights).any(): + return np.full_like(weights, float(DEFAULT_EXTREMA_WEIGHT), dtype=float) return normalized_weights @@ -290,6 +389,11 @@ def calculate_extrema_weights( weights: NDArray[np.floating], normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING["normalization"], gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"], + softmax_temperature: float = DEFAULTS_EXTREMA_WEIGHTING["softmax_temperature"], + robust_quantiles: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING[ + "robust_quantiles" + ], + rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"], ) -> pd.Series: if len(indices) == 0 or len(weights) == 0: return pd.Series(float(DEFAULT_EXTREMA_WEIGHT), index=series.index) @@ -299,7 +403,14 @@ def calculate_extrema_weights( f"Length mismatch: {len(indices)} indices but {len(weights)} weights" ) - normalized_weights = normalize_weights(weights, normalization, gamma) + normalized_weights = normalize_weights( + weights, + normalization, + gamma, + softmax_temperature, + robust_quantiles, + rank_method, + ) if normalized_weights.size == 0 or np.allclose( normalized_weights, normalized_weights[0] @@ -324,6 +435,11 @@ def get_weighted_extrema( strategy: WeightStrategy = DEFAULTS_EXTREMA_WEIGHTING["strategy"], normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING["normalization"], gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"], + softmax_temperature: float = DEFAULTS_EXTREMA_WEIGHTING["softmax_temperature"], + robust_quantiles: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING[ + "robust_quantiles" + ], + rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"], ) -> tuple[pd.Series, pd.Series]: default_weights = pd.Series(float(DEFAULT_EXTREMA_WEIGHT), index=extrema.index) if ( @@ -338,6 +454,9 @@ def get_weighted_extrema( weights=weights, normalization=normalization, gamma=gamma, + softmax_temperature=softmax_temperature, + robust_quantiles=robust_quantiles, + rank_method=rank_method, ) if np.allclose(extrema_weights, DEFAULT_EXTREMA_WEIGHT): return extrema, default_weights @@ -660,7 +779,9 @@ def ewo( if zero_lag: if mamode == "ema": - ma_fn = lambda series, timeperiod: zlema(series, period=timeperiod) + + def ma_fn(series, timeperiod): + return zlema(series, period=timeperiod) else: ma_fn = get_zl_ma_fn(mamode) else: @@ -1369,16 +1490,12 @@ def validate_range( def _validate_component( value: float | int | None, name: str, default_value: float | int ) -> float | int: - ok = True - if not isinstance(value, (int, float)): - ok = False - elif isinstance(value, bool): - ok = False - elif finite_only and not np.isfinite(value): - ok = False - elif non_negative and value < 0: - ok = False - if not ok: + if ( + not isinstance(value, (int, float)) + or isinstance(value, bool) + or (finite_only and not np.isfinite(value)) + or (non_negative and value < 0) + ): logger.warning( f"{name}: invalid value {value!r}, using default {default_value}" ) -- 2.43.0