From 6aa73cb8963cf0273d345d110d6e8a9cafd689fd Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 13 Dec 2025 21:10:56 +0100 Subject: [PATCH] feat(qav3): add hybrid extrema weighting strategy MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- README.md | 5 +- .../user_data/strategies/QuickAdapterV3.py | 146 ++++--- quickadapter/user_data/strategies/Utils.py | 381 +++++++++++++++--- 3 files changed, 423 insertions(+), 109 deletions(-) diff --git a/README.md b/README.md index 3855def..d572cab 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,10 @@ docker compose up -d --build | freqai.extrema_smoothing.mode | `mirror` | enum {`mirror`,`constant`,`nearest`,`wrap`,`interp`} | Boundary mode for `savgol` and `nadaraya_watson`. | | freqai.extrema_smoothing.bandwidth | 1.0 | float > 0 | Gaussian bandwidth for `nadaraya_watson`. | | _Extrema weighting_ | | | | -| freqai.extrema_weighting.strategy | `none` | enum {`none`,`amplitude`,`amplitude_threshold_ratio`,`volume`,`speed`,`efficiency_ratio`} | Extrema weighting source: unweighted (`none`), swing amplitude (`amplitude`), swing amplitude / median volatility-threshold ratio (`amplitude_threshold_ratio`), swing volume (`volume`), swing speed (`speed`), or swing efficiency ratio (`efficiency_ratio`). | +| freqai.extrema_weighting.strategy | `none` | enum {`none`,`amplitude`,`amplitude_threshold_ratio`,`volume`,`speed`,`efficiency_ratio`,`hybrid`} | Extrema weighting source: unweighted (`none`), single-source (`amplitude`,`amplitude_threshold_ratio`,`volume`,`speed`,`efficiency_ratio`), or `hybrid` (combine multiple sources via `freqai.extrema_weighting.source_weights`). | +| freqai.extrema_weighting.source_weights | all sources = 1.0 | object with keys in {`amplitude`,`amplitude_threshold_ratio`,`volume`,`speed`,`efficiency_ratio`} and float >= 0 | Hybrid-only: per-source coefficients. If not set (or invalid/empty), defaults to equal weights. Coefficients are L1-normalized (sum to 1.0) before aggregation; zero/NaN/Inf entries are ignored. | +| freqai.extrema_weighting.aggregation | `weighted_sum` | enum {`weighted_sum`,`geometric_mean`} | Hybrid-only: how normalized per-source weights are combined into a single per-extremum weight. `geometric_mean` uses abs(values) for numerical stability. | +| freqai.extrema_weighting.aggregation_normalization | `none` | enum {`minmax`,`sigmoid`,`softmax`,`l1`,`l2`,`rank`,`none`} | Hybrid-only: optional post-aggregation normalization. Applies after aggregation without re-standardization and without a second gamma correction. | | freqai.extrema_weighting.standardization | `none` | enum {`none`,`zscore`,`robust`,`mmad`} | Standardization method applied before normalization. `none`=no standardization, `zscore`=(w-μ)/σ, `robust`=(w-median)/IQR, `mmad`=(w-median)/MAD. | | freqai.extrema_weighting.robust_quantiles | [0.25, 0.75] | list[float] where 0 <= Q1 < Q3 <= 1 | Quantile range for robust standardization, Q1 and Q3. | | freqai.extrema_weighting.mmad_scaling_factor | 1.4826 | float > 0 | Scaling factor for MMAD standardization. | diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 52da129..537243e 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -22,7 +22,6 @@ from freqtrade.exchange import timeframe_to_minutes, timeframe_to_prev_date from freqtrade.persistence import Trade from freqtrade.strategy import stoploss_from_absolute from freqtrade.strategy.interface import IStrategy -from numpy.typing import NDArray from pandas import DataFrame, Series, isna from scipy.stats import t from technical.pivots_points import pivots_points @@ -31,6 +30,8 @@ from Utils import ( DEFAULTS_EXTREMA_SMOOTHING, DEFAULTS_EXTREMA_WEIGHTING, EXTREMA_COLUMN, + HYBRID_AGGREGATIONS, + HYBRID_WEIGHT_SOURCES, MAXIMA_THRESHOLD_COLUMN, MINIMA_THRESHOLD_COLUMN, NORMALIZATION_TYPES, @@ -40,7 +41,6 @@ from Utils import ( STANDARDIZATION_TYPES, WEIGHT_STRATEGIES, TrendDirection, - WeightStrategy, alligator, bottom_change_percent, calculate_n_extrema, @@ -287,14 +287,14 @@ class QuickAdapterV3(IStrategy): extrema_weighting = self.freqai_info.get("extrema_weighting", {}) if not isinstance(extrema_weighting, dict): extrema_weighting = {} - return QuickAdapterV3._get_extrema_weighting_params(extrema_weighting, {}) + return QuickAdapterV3._get_extrema_weighting_params(extrema_weighting) @cached_property def extrema_smoothing(self) -> dict[str, Any]: extrema_smoothing = self.freqai_info.get("extrema_smoothing", {}) if not isinstance(extrema_smoothing, dict): extrema_smoothing = {} - return QuickAdapterV3._get_extrema_smoothing_params(extrema_smoothing, {}) + return QuickAdapterV3._get_extrema_smoothing_params(extrema_smoothing) def bot_start(self, **kwargs) -> None: self.pairs: list[str] = self.config.get("exchange", {}).get("pair_whitelist") @@ -629,7 +629,7 @@ class QuickAdapterV3(IStrategy): @staticmethod def _get_extrema_weighting_params( - extrema_weighting: dict[str, Any], pair: str + extrema_weighting: dict[str, Any], ) -> dict[str, Any]: # Strategy weighting_strategy = str( @@ -637,7 +637,7 @@ class QuickAdapterV3(IStrategy): ) if weighting_strategy not in set(WEIGHT_STRATEGIES): logger.warning( - f"{pair}: invalid extrema_weighting strategy '{weighting_strategy}', using default '{WEIGHT_STRATEGIES[0]}'" + f"Invalid extrema_weighting strategy '{weighting_strategy}', using default '{WEIGHT_STRATEGIES[0]}'" ) weighting_strategy = WEIGHT_STRATEGIES[0] @@ -649,7 +649,7 @@ class QuickAdapterV3(IStrategy): ) if weighting_standardization not in set(STANDARDIZATION_TYPES): logger.warning( - f"{pair}: invalid extrema_weighting standardization '{weighting_standardization}', using default '{STANDARDIZATION_TYPES[0]}'" + f"Invalid extrema_weighting standardization '{weighting_standardization}', using default '{STANDARDIZATION_TYPES[0]}'" ) weighting_standardization = STANDARDIZATION_TYPES[0] @@ -666,7 +666,7 @@ class QuickAdapterV3(IStrategy): or weighting_robust_quantiles[0] >= weighting_robust_quantiles[1] ): logger.warning( - f"{pair}: invalid extrema_weighting robust_quantiles {weighting_robust_quantiles}, must be (q1, q3) with 0 <= q1 < q3 <= 1, using default {DEFAULTS_EXTREMA_WEIGHTING['robust_quantiles']}" + f"Invalid extrema_weighting robust_quantiles {weighting_robust_quantiles}, must be (q1, q3) with 0 <= q1 < q3 <= 1, using default {DEFAULTS_EXTREMA_WEIGHTING['robust_quantiles']}" ) weighting_robust_quantiles = DEFAULTS_EXTREMA_WEIGHTING["robust_quantiles"] else: @@ -684,7 +684,7 @@ class QuickAdapterV3(IStrategy): or weighting_mmad_scaling_factor <= 0 ): logger.warning( - f"{pair}: invalid extrema_weighting mmad_scaling_factor {weighting_mmad_scaling_factor}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['mmad_scaling_factor']}" + f"Invalid extrema_weighting mmad_scaling_factor {weighting_mmad_scaling_factor}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['mmad_scaling_factor']}" ) weighting_mmad_scaling_factor = DEFAULTS_EXTREMA_WEIGHTING[ "mmad_scaling_factor" @@ -698,7 +698,7 @@ class QuickAdapterV3(IStrategy): ) if weighting_normalization not in set(NORMALIZATION_TYPES): logger.warning( - f"{pair}: invalid extrema_weighting normalization '{weighting_normalization}', using default '{NORMALIZATION_TYPES[0]}'" + f"Invalid extrema_weighting normalization '{weighting_normalization}', using default '{NORMALIZATION_TYPES[0]}'" ) weighting_normalization = NORMALIZATION_TYPES[0] @@ -713,7 +713,7 @@ class QuickAdapterV3(IStrategy): } ): raise ValueError( - f"{pair}: invalid extrema_weighting configuration: " + f"Invalid extrema_weighting configuration: " f"standardization='{weighting_standardization}' with normalization='{weighting_normalization}' " "can produce negative weights and flip ternary extrema labels. " f"Use normalization in {{'{NORMALIZATION_TYPES[0]}','{NORMALIZATION_TYPES[1]}','{NORMALIZATION_TYPES[2]}','{NORMALIZATION_TYPES[5]}'}} " @@ -733,7 +733,7 @@ class QuickAdapterV3(IStrategy): or weighting_minmax_range[0] >= weighting_minmax_range[1] ): logger.warning( - f"{pair}: invalid extrema_weighting minmax_range {weighting_minmax_range}, must be (min, max) with min < max, using default {DEFAULTS_EXTREMA_WEIGHTING['minmax_range']}" + f"Invalid extrema_weighting minmax_range {weighting_minmax_range}, must be (min, max) with min < max, using default {DEFAULTS_EXTREMA_WEIGHTING['minmax_range']}" ) weighting_minmax_range = DEFAULTS_EXTREMA_WEIGHTING["minmax_range"] else: @@ -751,7 +751,7 @@ class QuickAdapterV3(IStrategy): or weighting_sigmoid_scale <= 0 ): logger.warning( - f"{pair}: invalid extrema_weighting sigmoid_scale {weighting_sigmoid_scale}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['sigmoid_scale']}" + f"Invalid extrema_weighting sigmoid_scale {weighting_sigmoid_scale}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['sigmoid_scale']}" ) weighting_sigmoid_scale = DEFAULTS_EXTREMA_WEIGHTING["sigmoid_scale"] @@ -764,7 +764,7 @@ class QuickAdapterV3(IStrategy): or weighting_softmax_temperature <= 0 ): logger.warning( - f"{pair}: invalid extrema_weighting softmax_temperature {weighting_softmax_temperature}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['softmax_temperature']}" + f"Invalid extrema_weighting softmax_temperature {weighting_softmax_temperature}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['softmax_temperature']}" ) weighting_softmax_temperature = DEFAULTS_EXTREMA_WEIGHTING[ "softmax_temperature" @@ -777,7 +777,7 @@ class QuickAdapterV3(IStrategy): ) if weighting_rank_method not in set(RANK_METHODS): logger.warning( - f"{pair}: invalid extrema_weighting rank_method '{weighting_rank_method}', using default '{RANK_METHODS[0]}'" + f"Invalid extrema_weighting rank_method '{weighting_rank_method}', using default '{RANK_METHODS[0]}'" ) weighting_rank_method = RANK_METHODS[0] @@ -791,33 +791,91 @@ class QuickAdapterV3(IStrategy): or not (0 < weighting_gamma <= 10.0) ): logger.warning( - f"{pair}: invalid extrema_weighting gamma {weighting_gamma}, must be a finite number in (0, 10], using default {DEFAULTS_EXTREMA_WEIGHTING['gamma']}" + f"Invalid extrema_weighting gamma {weighting_gamma}, must be a finite number in (0, 10], using default {DEFAULTS_EXTREMA_WEIGHTING['gamma']}" ) weighting_gamma = DEFAULTS_EXTREMA_WEIGHTING["gamma"] + weighting_source_weights = extrema_weighting.get( + "source_weights", DEFAULTS_EXTREMA_WEIGHTING["source_weights"] + ) + if not isinstance(weighting_source_weights, dict): + logger.warning( + f"Invalid extrema_weighting source_weights {weighting_source_weights}, must be a dict of source name to weight, using default {DEFAULTS_EXTREMA_WEIGHTING['source_weights']}" + ) + weighting_source_weights = DEFAULTS_EXTREMA_WEIGHTING["source_weights"] + else: + sanitized_source_weights: dict[str, float] = {} + for source, weight in weighting_source_weights.items(): + if source not in set(HYBRID_WEIGHT_SOURCES): + continue + if ( + not isinstance(weight, (int, float)) + or not np.isfinite(weight) + or weight < 0 + ): + continue + sanitized_source_weights[str(source)] = float(weight) + if not sanitized_source_weights: + logger.warning( + f"Invalid/empty extrema_weighting source_weights, using default {DEFAULTS_EXTREMA_WEIGHTING['source_weights']}" + ) + weighting_source_weights = DEFAULTS_EXTREMA_WEIGHTING["source_weights"] + else: + weighting_source_weights = sanitized_source_weights + weighting_aggregation = str( + extrema_weighting.get( + "aggregation", + DEFAULTS_EXTREMA_WEIGHTING["aggregation"], + ) + ) + if weighting_aggregation not in set(HYBRID_AGGREGATIONS): + logger.warning( + f"Invalid extrema_weighting aggregation '{weighting_aggregation}', using default '{HYBRID_AGGREGATIONS[0]}'" + ) + weighting_aggregation = DEFAULTS_EXTREMA_WEIGHTING["aggregation"] + weighting_aggregation_normalization = str( + extrema_weighting.get( + "aggregation_normalization", + DEFAULTS_EXTREMA_WEIGHTING["aggregation_normalization"], + ) + ) + if weighting_aggregation_normalization not in set(NORMALIZATION_TYPES): + logger.warning( + f"Invalid extrema_weighting aggregation_normalization '{weighting_aggregation_normalization}', using default '{NORMALIZATION_TYPES[6]}'" + ) + weighting_aggregation_normalization = DEFAULTS_EXTREMA_WEIGHTING[ + "aggregation_normalization" + ] + return { "strategy": weighting_strategy, + "source_weights": weighting_source_weights, + "aggregation": weighting_aggregation, + "aggregation_normalization": weighting_aggregation_normalization, + # Phase 1: Standardization "standardization": weighting_standardization, "robust_quantiles": weighting_robust_quantiles, "mmad_scaling_factor": weighting_mmad_scaling_factor, + # Phase 2: Normalization "normalization": weighting_normalization, "minmax_range": weighting_minmax_range, "sigmoid_scale": weighting_sigmoid_scale, "softmax_temperature": weighting_softmax_temperature, "rank_method": weighting_rank_method, + # Phase 3: Post-processing "gamma": weighting_gamma, } @staticmethod def _get_extrema_smoothing_params( - extrema_smoothing: dict[str, Any], pair: str + extrema_smoothing: dict[str, Any], ) -> dict[str, Any]: smoothing_method = str( extrema_smoothing.get("method", DEFAULTS_EXTREMA_SMOOTHING["method"]) ) if smoothing_method not in set(SMOOTHING_METHODS): logger.warning( - f"{pair}: invalid extrema_smoothing method '{smoothing_method}', using default '{SMOOTHING_METHODS[0]}'" + f"Invalid extrema_smoothing method '{smoothing_method}', using default '{SMOOTHING_METHODS[0]}'" ) smoothing_method = SMOOTHING_METHODS[0] @@ -826,7 +884,7 @@ class QuickAdapterV3(IStrategy): ) if not isinstance(smoothing_window, int) or smoothing_window < 3: logger.warning( - f"{pair}: invalid extrema_smoothing window {smoothing_window}, must be an integer >= 3, using default {DEFAULTS_EXTREMA_SMOOTHING['window']}" + f"Invalid extrema_smoothing window {smoothing_window}, must be an integer >= 3, using default {DEFAULTS_EXTREMA_SMOOTHING['window']}" ) smoothing_window = DEFAULTS_EXTREMA_SMOOTHING["window"] @@ -839,7 +897,7 @@ class QuickAdapterV3(IStrategy): or smoothing_beta <= 0 ): logger.warning( - f"{pair}: invalid extrema_smoothing beta {smoothing_beta}, must be a finite number > 0, using default {DEFAULTS_EXTREMA_SMOOTHING['beta']}" + f"Invalid extrema_smoothing beta {smoothing_beta}, must be a finite number > 0, using default {DEFAULTS_EXTREMA_SMOOTHING['beta']}" ) smoothing_beta = DEFAULTS_EXTREMA_SMOOTHING["beta"] @@ -848,7 +906,7 @@ class QuickAdapterV3(IStrategy): ) if not isinstance(smoothing_polyorder, int) or smoothing_polyorder < 1: logger.warning( - f"{pair}: invalid extrema_smoothing polyorder {smoothing_polyorder}, must be an integer >= 1, using default {DEFAULTS_EXTREMA_SMOOTHING['polyorder']}" + f"Invalid extrema_smoothing polyorder {smoothing_polyorder}, must be an integer >= 1, using default {DEFAULTS_EXTREMA_SMOOTHING['polyorder']}" ) smoothing_polyorder = DEFAULTS_EXTREMA_SMOOTHING["polyorder"] @@ -857,7 +915,7 @@ class QuickAdapterV3(IStrategy): ) if smoothing_mode not in set(SMOOTHING_MODES): logger.warning( - f"{pair}: invalid extrema_smoothing mode '{smoothing_mode}', using default '{SMOOTHING_MODES[0]}'" + f"Invalid extrema_smoothing mode '{smoothing_mode}', using default '{SMOOTHING_MODES[0]}'" ) smoothing_mode = SMOOTHING_MODES[0] @@ -870,7 +928,7 @@ class QuickAdapterV3(IStrategy): or not np.isfinite(smoothing_bandwidth) ): logger.warning( - f"{pair}: invalid extrema_smoothing bandwidth {smoothing_bandwidth}, must be a positive finite number, using default {DEFAULTS_EXTREMA_SMOOTHING['bandwidth']}" + f"Invalid extrema_smoothing bandwidth {smoothing_bandwidth}, must be a positive finite number, using default {DEFAULTS_EXTREMA_SMOOTHING['bandwidth']}" ) smoothing_bandwidth = DEFAULTS_EXTREMA_SMOOTHING["bandwidth"] @@ -900,27 +958,6 @@ class QuickAdapterV3(IStrategy): except (KeyError, ValueError) as e: raise ValueError(f"Invalid pattern '{pattern}': {repr(e)}") - @staticmethod - def _get_weights( - strategy: WeightStrategy, - amplitudes: list[float], - amplitude_threshold_ratios: list[float], - volumes: list[float], - speeds: list[float], - efficiency_ratios: list[float], - ) -> NDArray[np.floating]: - if strategy == WEIGHT_STRATEGIES[1]: # "amplitude" - return np.array(amplitudes) - if strategy == WEIGHT_STRATEGIES[2]: # "amplitude_threshold_ratio" - return np.array(amplitude_threshold_ratios) - if strategy == WEIGHT_STRATEGIES[3]: # "volume" - return np.array(volumes) - if strategy == WEIGHT_STRATEGIES[4]: # "speed" - return np.array(speeds) - if strategy == WEIGHT_STRATEGIES[5]: # "efficiency_ratio" - return np.array(efficiency_ratios) - return np.array([]) - def set_freqai_targets( self, dataframe: DataFrame, metadata: dict[str, Any], **kwargs ) -> DataFrame: @@ -962,19 +999,20 @@ class QuickAdapterV3(IStrategy): f"{pair}: labeled {len(pivots_indices)} extrema (label_period={QuickAdapterV3._td_format(label_period)} / {label_period_candles=} / {label_natr_ratio=:.2f})" ) - pivot_weights = QuickAdapterV3._get_weights( - self.extrema_weighting["strategy"], - pivots_amplitudes, - pivots_amplitude_threshold_ratios, - pivots_volumes, - pivots_speeds, - pivots_efficiency_ratios, - ) weighted_extrema, _ = get_weighted_extrema( - extrema=dataframe[EXTREMA_COLUMN], + series=dataframe[EXTREMA_COLUMN], indices=pivots_indices, - weights=pivot_weights, + amplitudes=pivots_amplitudes, + amplitude_threshold_ratios=pivots_amplitude_threshold_ratios, + volumes=pivots_volumes, + speeds=pivots_speeds, + efficiency_ratios=pivots_efficiency_ratios, + source_weights=self.extrema_weighting["source_weights"], strategy=self.extrema_weighting["strategy"], + aggregation=self.extrema_weighting["aggregation"], + aggregation_normalization=self.extrema_weighting[ + "aggregation_normalization" + ], standardization=self.extrema_weighting["standardization"], robust_quantiles=self.extrema_weighting["robust_quantiles"], mmad_scaling_factor=self.extrema_weighting["mmad_scaling_factor"], diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 823a4cf..250f7e6 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -26,6 +26,7 @@ WeightStrategy = Literal[ "volume", "speed", "efficiency_ratio", + "hybrid", ] WEIGHT_STRATEGIES: Final[tuple[WeightStrategy, ...]] = ( "none", @@ -34,6 +35,28 @@ WEIGHT_STRATEGIES: Final[tuple[WeightStrategy, ...]] = ( "volume", "speed", "efficiency_ratio", + "hybrid", +) + +HybridWeightSource = Literal[ + "amplitude", + "amplitude_threshold_ratio", + "volume", + "speed", + "efficiency_ratio", +] +HYBRID_WEIGHT_SOURCES: Final[tuple[HybridWeightSource, ...]] = ( + "amplitude", + "amplitude_threshold_ratio", + "volume", + "speed", + "efficiency_ratio", +) + +HybridAggregation = Literal["weighted_sum", "geometric_mean"] +HYBRID_AGGREGATIONS: Final[tuple[HybridAggregation, ...]] = ( + "weighted_sum", + "geometric_mean", ) EXTREMA_COLUMN: Final = "&s-extrema" @@ -103,6 +126,9 @@ DEFAULTS_EXTREMA_SMOOTHING: Final[dict[str, Any]] = { DEFAULTS_EXTREMA_WEIGHTING: Final[dict[str, Any]] = { "strategy": WEIGHT_STRATEGIES[0], # "none" + "source_weights": {s: 1.0 for s in HYBRID_WEIGHT_SOURCES}, + "aggregation": HYBRID_AGGREGATIONS[0], # "weighted_sum" + "aggregation_normalization": NORMALIZATION_TYPES[6], # "none" # Phase 1: Standardization "standardization": STANDARDIZATION_TYPES[0], # "none" "robust_quantiles": (0.25, 0.75), @@ -427,7 +453,7 @@ def _normalize_minmax( def _normalize_l1(weights: NDArray[np.floating]) -> NDArray[np.floating]: """L1 normalization: w / Σ|w| → Σ|w| = 1""" - weights_sum = np.sum(np.abs(weights)) + weights_sum = np.nansum(np.abs(weights)) if weights_sum <= 0 or not np.isfinite(weights_sum): return np.full_like(weights, DEFAULT_EXTREMA_WEIGHT, dtype=float) return weights / weights_sum @@ -579,6 +605,168 @@ def normalize_weights( return normalized_weights +def _weights_array_to_series( + index: pd.Index, + indices: list[int], + weights: NDArray[np.floating], + default_weight: float = DEFAULT_EXTREMA_WEIGHT, +) -> pd.Series: + weights_series = pd.Series(default_weight, index=index) + + if len(indices) == 0 or weights.size == 0: + return weights_series + + if len(indices) != weights.size: + raise ValueError( + f"Length mismatch: {len(indices)} indices but {weights.size} weights" + ) + + mask = pd.Index(indices).isin(index) + if not np.any(mask): + return weights_series + + valid_indices = [idx for idx, is_valid in zip(indices, mask) if is_valid] + weights_series.loc[valid_indices] = weights[mask] + return weights_series + + +def calculate_hybrid_extrema_weights( + series: pd.Series, + indices: list[int], + amplitudes: list[float], + amplitude_threshold_ratios: list[float], + volumes: list[float], + speeds: list[float], + efficiency_ratios: list[float], + source_weights: dict[str, float], + aggregation: HybridAggregation = DEFAULTS_EXTREMA_WEIGHTING["aggregation"], + aggregation_normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING[ + "aggregation_normalization" + ], + # Phase 1: Standardization + standardization: StandardizationType = DEFAULTS_EXTREMA_WEIGHTING[ + "standardization" + ], + robust_quantiles: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING[ + "robust_quantiles" + ], + mmad_scaling_factor: float = DEFAULTS_EXTREMA_WEIGHTING["mmad_scaling_factor"], + # Phase 2: Normalization + normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING["normalization"], + minmax_range: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING["minmax_range"], + sigmoid_scale: float = DEFAULTS_EXTREMA_WEIGHTING["sigmoid_scale"], + softmax_temperature: float = DEFAULTS_EXTREMA_WEIGHTING["softmax_temperature"], + rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"], + # Phase 3: Post-processing + gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"], +) -> pd.Series: + n = len(indices) + if n == 0: + return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index) + + if not isinstance(source_weights, dict): + source_weights = {} + + weights_by_source: dict[HybridWeightSource, NDArray[np.floating]] = { + "amplitude": np.asarray(amplitudes, dtype=float), + "amplitude_threshold_ratio": np.asarray( + amplitude_threshold_ratios, dtype=float + ), + "volume": np.asarray(volumes, dtype=float), + "speed": np.asarray(speeds, dtype=float), + "efficiency_ratio": np.asarray(efficiency_ratios, dtype=float), + } + + enabled_sources: list[HybridWeightSource] = [] + source_weights_values: list[float] = [] + for source in HYBRID_WEIGHT_SOURCES: + source_weight = source_weights.get(source) + if source_weight is None: + continue + if ( + not isinstance(source_weight, (int, float)) + or not np.isfinite(source_weight) + or source_weight < 0 + ): + continue + enabled_sources.append(source) + source_weights_values.append(float(source_weight)) + + if len(enabled_sources) == 0: + enabled_sources = list(HYBRID_WEIGHT_SOURCES) + source_weights_values = [1.0 for _ in enabled_sources] + + if any(weights_by_source[s].size != n for s in enabled_sources): + raise ValueError( + f"Length mismatch: hybrid {n} indices but inconsistent weights lengths" + ) + + np_source_weights: NDArray[np.floating] = np.asarray( + source_weights_values, dtype=float + ) + source_weights_sum = np.nansum(np.abs(np_source_weights)) + if not np.isfinite(source_weights_sum) or source_weights_sum <= 0: + return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index) + np_source_weights = np_source_weights / source_weights_sum + + normalized_source_weights: list[NDArray[np.floating]] = [] + for source in enabled_sources: + normalized_source_weights.append( + normalize_weights( + weights_by_source[source], + standardization=standardization, + robust_quantiles=robust_quantiles, + mmad_scaling_factor=mmad_scaling_factor, + normalization=normalization, + minmax_range=minmax_range, + sigmoid_scale=sigmoid_scale, + softmax_temperature=softmax_temperature, + rank_method=rank_method, + gamma=gamma, + ) + ) + + if aggregation == HYBRID_AGGREGATIONS[0]: # "weighted_sum" + combined_source_weights = np.zeros(n, dtype=float) + for source_weight, values in zip(np_source_weights, normalized_source_weights): + combined_source_weights = combined_source_weights + source_weight * values + elif aggregation == HYBRID_AGGREGATIONS[1]: # "geometric_mean" + combined_source_weights = sp.stats.gmean( + np.vstack([np.abs(values) for values in normalized_source_weights]), + axis=0, + weights=np_source_weights, + ) + else: + raise ValueError(f"Unknown hybrid aggregation method: {aggregation}") + + if aggregation_normalization != NORMALIZATION_TYPES[6]: # "none" + combined_source_weights = normalize_weights( + combined_source_weights, + standardization=STANDARDIZATION_TYPES[0], + robust_quantiles=robust_quantiles, + mmad_scaling_factor=mmad_scaling_factor, + normalization=aggregation_normalization, + minmax_range=minmax_range, + sigmoid_scale=sigmoid_scale, + softmax_temperature=softmax_temperature, + rank_method=rank_method, + gamma=1.0, + ) + + if ( + combined_source_weights.size == 0 + or not np.isfinite(combined_source_weights).all() + ): + return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index) + + return _weights_array_to_series( + index=series.index, + indices=indices, + weights=combined_source_weights, + default_weight=np.nanmedian(combined_source_weights), + ) + + def calculate_extrema_weights( series: pd.Series, indices: list[int], @@ -630,20 +818,28 @@ def calculate_extrema_weights( ): normalized_weights = np.full_like(normalized_weights, DEFAULT_EXTREMA_WEIGHT) - weights_series = pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index) - mask = pd.Index(indices).isin(series.index) - normalized_weights = normalized_weights[mask] - valid_indices = [idx for idx, is_valid in zip(indices, mask) if is_valid] - if len(valid_indices) > 0: - weights_series.loc[valid_indices] = normalized_weights - return weights_series + return _weights_array_to_series( + index=series.index, + indices=indices, + weights=normalized_weights, + default_weight=np.nanmedian(normalized_weights), + ) -def get_weighted_extrema( - extrema: pd.Series, +def compute_extrema_weights( + series: pd.Series, indices: list[int], - weights: NDArray[np.floating], + amplitudes: list[float], + amplitude_threshold_ratios: list[float], + volumes: list[float], + speeds: list[float], + efficiency_ratios: list[float], + source_weights: dict[str, float], strategy: WeightStrategy = DEFAULTS_EXTREMA_WEIGHTING["strategy"], + aggregation: HybridAggregation = DEFAULTS_EXTREMA_WEIGHTING["aggregation"], + aggregation_normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING[ + "aggregation_normalization" + ], # Phase 1: Standardization standardization: StandardizationType = DEFAULTS_EXTREMA_WEIGHTING[ "standardization" @@ -660,46 +856,35 @@ def get_weighted_extrema( rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"], # Phase 3: Post-processing gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"], -) -> tuple[pd.Series, pd.Series]: - """ - Apply weighted normalization to extrema series. - - Args: - extrema: Extrema series - indices: Indices of extrema points - weights: Raw weights for each extremum - strategy: Weight strategy ("none", "amplitude", "amplitude_threshold_ratio", "volume", "speed", "efficiency_ratio") - standardization: Standardization method - robust_quantiles: Quantiles for robust standardization - mmad_scaling_factor: Scaling factor for MMAD standardization - normalization: Normalization method - minmax_range: Target range for minmax - sigmoid_scale: Scale for sigmoid - softmax_temperature: Temperature for softmax - rank_method: Method for rank normalization - gamma: Gamma correction - - Returns: - Tuple of (weighted_extrema, extrema_weights) - """ - default_weights = pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index) - if ( - len(indices) == 0 or len(weights) == 0 or strategy == WEIGHT_STRATEGIES[0] - ): # "none" - return extrema, default_weights +) -> pd.Series: + if len(indices) == 0 or strategy == WEIGHT_STRATEGIES[0]: # "none" + return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index) - if ( - strategy - in { - WEIGHT_STRATEGIES[1], - WEIGHT_STRATEGIES[2], - WEIGHT_STRATEGIES[3], - WEIGHT_STRATEGIES[4], - WEIGHT_STRATEGIES[5], - } - ): # "amplitude" / "amplitude_threshold_ratio" / "volume" / "speed" / "efficiency_ratio" - extrema_weights = calculate_extrema_weights( - series=extrema, + if strategy in { + WEIGHT_STRATEGIES[1], + WEIGHT_STRATEGIES[2], + WEIGHT_STRATEGIES[3], + WEIGHT_STRATEGIES[4], + WEIGHT_STRATEGIES[5], + }: + if strategy == WEIGHT_STRATEGIES[1]: # "amplitude" + weights = np.asarray(amplitudes, dtype=float) + elif strategy == WEIGHT_STRATEGIES[2]: # "amplitude_threshold_ratio" + weights = np.asarray(amplitude_threshold_ratios, dtype=float) + elif strategy == WEIGHT_STRATEGIES[3]: # "volume" + weights = np.asarray(volumes, dtype=float) + elif strategy == WEIGHT_STRATEGIES[4]: # "speed" + weights = np.asarray(speeds, dtype=float) + elif strategy == WEIGHT_STRATEGIES[5]: # "efficiency_ratio" + weights = np.asarray(efficiency_ratios, dtype=float) + else: + weights = np.asarray([], dtype=float) + + if weights.size == 0: + return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index) + + return calculate_extrema_weights( + series=series, indices=indices, weights=weights, standardization=standardization, @@ -712,11 +897,99 @@ def get_weighted_extrema( rank_method=rank_method, gamma=gamma, ) - if np.allclose(extrema_weights, DEFAULT_EXTREMA_WEIGHT): - return extrema, default_weights - return extrema * extrema_weights, extrema_weights - raise ValueError(f"Unknown weight strategy: {strategy}") + if strategy == WEIGHT_STRATEGIES[6]: # "hybrid" + return calculate_hybrid_extrema_weights( + series=series, + indices=indices, + amplitudes=amplitudes, + amplitude_threshold_ratios=amplitude_threshold_ratios, + volumes=volumes, + speeds=speeds, + efficiency_ratios=efficiency_ratios, + source_weights=source_weights, + aggregation=aggregation, + aggregation_normalization=aggregation_normalization, + standardization=standardization, + robust_quantiles=robust_quantiles, + mmad_scaling_factor=mmad_scaling_factor, + normalization=normalization, + minmax_range=minmax_range, + sigmoid_scale=sigmoid_scale, + softmax_temperature=softmax_temperature, + rank_method=rank_method, + gamma=gamma, + ) + + raise ValueError(f"Unknown extrema weighting strategy: {strategy}") + + +def apply_weights(series: pd.Series, weights: pd.Series) -> pd.Series: + if weights.empty: + return series + if np.allclose(weights.to_numpy(dtype=float), DEFAULT_EXTREMA_WEIGHT): + return series + return series * weights + + +def get_weighted_extrema( + series: pd.Series, + indices: list[int], + amplitudes: list[float], + amplitude_threshold_ratios: list[float], + volumes: list[float], + speeds: list[float], + efficiency_ratios: list[float], + source_weights: dict[str, float], + strategy: WeightStrategy = DEFAULTS_EXTREMA_WEIGHTING["strategy"], + aggregation: HybridAggregation = DEFAULTS_EXTREMA_WEIGHTING["aggregation"], + aggregation_normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING[ + "aggregation_normalization" + ], + # Phase 1: Standardization + standardization: StandardizationType = DEFAULTS_EXTREMA_WEIGHTING[ + "standardization" + ], + robust_quantiles: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING[ + "robust_quantiles" + ], + mmad_scaling_factor: float = DEFAULTS_EXTREMA_WEIGHTING["mmad_scaling_factor"], + # Phase 2: Normalization + normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING["normalization"], + minmax_range: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING["minmax_range"], + sigmoid_scale: float = DEFAULTS_EXTREMA_WEIGHTING["sigmoid_scale"], + softmax_temperature: float = DEFAULTS_EXTREMA_WEIGHTING["softmax_temperature"], + rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"], + # Phase 3: Post-processing + gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"], +) -> tuple[pd.Series, pd.Series]: + """Apply extrema weighting and return (weighted_extrema, extrema_weights).""" + + weights = compute_extrema_weights( + series=series, + indices=indices, + amplitudes=amplitudes, + amplitude_threshold_ratios=amplitude_threshold_ratios, + volumes=volumes, + speeds=speeds, + efficiency_ratios=efficiency_ratios, + source_weights=source_weights, + strategy=strategy, + aggregation=aggregation, + aggregation_normalization=aggregation_normalization, + standardization=standardization, + robust_quantiles=robust_quantiles, + mmad_scaling_factor=mmad_scaling_factor, + normalization=normalization, + minmax_range=minmax_range, + sigmoid_scale=sigmoid_scale, + softmax_temperature=softmax_temperature, + rank_method=rank_method, + gamma=gamma, + ) + + weighted_extrema = apply_weights(series, weights) + return weighted_extrema, weights def get_callable_sha256(fn: Callable[..., Any]) -> str: -- 2.43.0