]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(qav3): make extrema weights computation fully numpy-based
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 15 Dec 2025 13:14:28 +0000 (14:14 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 15 Dec 2025 13:15:37 +0000 (14:15 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index adfc70baba6b8f8c40b8c34481bae62d80b34c1f..67e5e341c403aa7539e144fc6f0dc32754c0ab05 100644 (file)
@@ -73,7 +73,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.132"
+    version = "3.7.133"
 
     _SQRT_2: Final[float] = np.sqrt(2.0)
 
index dfe887182eeb14e492250ec5bb0ef49f17244ded..9ee2965bf38ee0e5a17b371c85675fef222fde79 100644 (file)
@@ -1162,10 +1162,10 @@ class QuickAdapterV3(IStrategy):
             return None
         median_natr = trade_label_natr.median()
 
-        np_trade_label_natr = trade_label_natr.to_numpy()
-        entry_quantile = calculate_quantile(np_trade_label_natr, entry_natr)
-        current_quantile = calculate_quantile(np_trade_label_natr, current_natr)
-        median_quantile = calculate_quantile(np_trade_label_natr, median_natr)
+        trade_label_natr_values = trade_label_natr.to_numpy()
+        entry_quantile = calculate_quantile(trade_label_natr_values, entry_natr)
+        current_quantile = calculate_quantile(trade_label_natr_values, current_natr)
+        median_quantile = calculate_quantile(trade_label_natr_values, median_natr)
 
         if isna(entry_quantile) or isna(current_quantile) or isna(median_quantile):
             return None
index 706874807ef8e1b8710e60d2cdca11c29de55f77..44885a7808db6e89b464eb2fe239efd29b555fcd 100644 (file)
@@ -622,32 +622,34 @@ def normalize_weights(
     return normalized_weights
 
 
-def _weights_array_to_series(
-    index: pd.Index,
+def _build_weights_array(
+    n_extrema: int,
     indices: list[int],
     weights: NDArray[np.floating],
     default_weight: float = DEFAULT_EXTREMA_WEIGHT,
-) -> pd.Series:
+) -> NDArray[np.floating]:
     if len(indices) == 0 or weights.size == 0:
-        return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=index)
+        return np.full(n_extrema, DEFAULT_EXTREMA_WEIGHT, dtype=float)
 
     if len(indices) != weights.size:
         raise ValueError(
             f"Length mismatch: {len(indices)} indices but {weights.size} weights"
         )
 
-    weights_series = pd.Series(default_weight, index=index)
-    mask = pd.Index(indices).isin(index)
+    weights_array = np.full(n_extrema, default_weight, dtype=float)
+
+    indices_array = np.array(indices)
+    mask = (indices_array >= 0) & (indices_array < n_extrema)
+
     if not np.any(mask):
-        return weights_series
+        return weights_array
 
-    valid_indices = [idx for idx, is_valid in zip(indices, mask) if is_valid]
-    weights_series.loc[valid_indices] = weights[mask]
-    return weights_series
+    valid_indices = indices_array[mask]
+    weights_array[valid_indices] = weights[mask]
+    return weights_array
 
 
 def calculate_hybrid_extrema_weights(
-    extrema: pd.Series,
     indices: list[int],
     amplitudes: list[float],
     amplitude_threshold_ratios: list[float],
@@ -675,15 +677,15 @@ def calculate_hybrid_extrema_weights(
     rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
     # Phase 3: Post-processing
     gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
-) -> pd.Series:
+) -> NDArray[np.floating]:
     n = len(indices)
     if n == 0:
-        return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+        return np.array([], dtype=float)
 
     if not isinstance(source_weights, dict):
         source_weights = {}
 
-    weights_by_source: dict[HybridWeightSource, NDArray[np.floating]] = {
+    weights_array_by_source: dict[HybridWeightSource, NDArray[np.floating]] = {
         "amplitude": np.asarray(amplitudes, dtype=float),
         "amplitude_threshold_ratio": np.asarray(
             amplitude_threshold_ratios, dtype=float
@@ -694,7 +696,7 @@ def calculate_hybrid_extrema_weights(
     }
 
     enabled_sources: list[HybridWeightSource] = []
-    source_weights_values: list[float] = []
+    source_weights_list: list[float] = []
     for source in HYBRID_WEIGHT_SOURCES:
         source_weight = source_weights.get(source)
         if source_weight is None:
@@ -706,30 +708,30 @@ def calculate_hybrid_extrema_weights(
         ):
             continue
         enabled_sources.append(source)
-        source_weights_values.append(float(source_weight))
+        source_weights_list.append(float(source_weight))
 
     if len(enabled_sources) == 0:
         enabled_sources = list(HYBRID_WEIGHT_SOURCES)
-        source_weights_values = [1.0 for _ in enabled_sources]
+        source_weights_list = [1.0 for _ in enabled_sources]
 
-    if any(weights_by_source[s].size != n for s in enabled_sources):
+    if any(weights_array_by_source[s].size != n for s in enabled_sources):
         raise ValueError(
             f"Length mismatch: hybrid {n} indices but inconsistent weights lengths"
         )
 
-    np_source_weights: NDArray[np.floating] = np.asarray(
-        source_weights_values, dtype=float
+    source_weights_array: NDArray[np.floating] = np.asarray(
+        source_weights_list, dtype=float
     )
-    source_weights_sum = np.nansum(np.abs(np_source_weights))
-    if not np.isfinite(source_weights_sum) or source_weights_sum <= 0:
-        return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
-    np_source_weights = np_source_weights / source_weights_sum
+    source_weights_array_sum = np.nansum(np.abs(source_weights_array))
+    if not np.isfinite(source_weights_array_sum) or source_weights_array_sum <= 0:
+        return np.array([], dtype=float)
+    source_weights_array = source_weights_array / source_weights_array_sum
 
-    normalized_source_weights: list[NDArray[np.floating]] = []
+    normalized_source_weights_array: list[NDArray[np.floating]] = []
     for source in enabled_sources:
-        normalized_source_weights.append(
+        normalized_source_weights_array.append(
             normalize_weights(
-                weights_by_source[source],
+                weights_array_by_source[source],
                 standardization=standardization,
                 robust_quantiles=robust_quantiles,
                 mmad_scaling_factor=mmad_scaling_factor,
@@ -743,21 +745,23 @@ def calculate_hybrid_extrema_weights(
         )
 
     if aggregation == HYBRID_AGGREGATIONS[0]:  # "weighted_sum"
-        combined_source_weights = np.average(
-            np.vstack(normalized_source_weights), axis=0, weights=np_source_weights
+        combined_source_weights_array: NDArray[np.floating] = np.average(
+            np.vstack(normalized_source_weights_array),
+            axis=0,
+            weights=source_weights_array,
         )
     elif aggregation == HYBRID_AGGREGATIONS[1]:  # "geometric_mean"
-        combined_source_weights = gmean(
-            np.vstack([np.abs(values) for values in normalized_source_weights]),
+        combined_source_weights_array: NDArray[np.floating] = gmean(
+            np.vstack([np.abs(values) for values in normalized_source_weights_array]),
             axis=0,
-            weights=np_source_weights[:, np.newaxis],
+            weights=source_weights_array[:, np.newaxis],
         )
     else:
         raise ValueError(f"Unknown hybrid aggregation method: {aggregation}")
 
     if aggregation_normalization != NORMALIZATION_TYPES[6]:  # "none"
-        combined_source_weights = normalize_weights(
-            combined_source_weights,
+        combined_source_weights_array = normalize_weights(
+            combined_source_weights_array,
             standardization=STANDARDIZATION_TYPES[0],
             robust_quantiles=robust_quantiles,
             mmad_scaling_factor=mmad_scaling_factor,
@@ -770,21 +774,15 @@ def calculate_hybrid_extrema_weights(
         )
 
     if (
-        combined_source_weights.size == 0
-        or not np.isfinite(combined_source_weights).all()
+        combined_source_weights_array.size == 0
+        or not np.isfinite(combined_source_weights_array).all()
     ):
-        return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+        return np.array([], dtype=float)
 
-    return _weights_array_to_series(
-        index=extrema.index,
-        indices=indices,
-        weights=combined_source_weights,
-        default_weight=np.nanmedian(combined_source_weights),
-    )
+    return combined_source_weights_array
 
 
 def calculate_extrema_weights(
-    extrema: pd.Series,
     indices: list[int],
     weights: NDArray[np.floating],
     # Phase 1: Standardization
@@ -803,13 +801,9 @@ def calculate_extrema_weights(
     rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
     # Phase 3: Post-processing
     gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
-) -> pd.Series:
-    """
-    Calculate normalized weights for extrema points.
-    Returns: Series with weights at extrema indices (rest filled with default).
-    """
+) -> NDArray[np.floating]:
     if len(indices) == 0 or len(weights) == 0:
-        return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+        return np.array([], dtype=float)
 
     normalized_weights = normalize_weights(
         weights,
@@ -824,16 +818,11 @@ def calculate_extrema_weights(
         gamma=gamma,
     )
 
-    return _weights_array_to_series(
-        index=extrema.index,
-        indices=indices,
-        weights=normalized_weights,
-        default_weight=np.nanmedian(normalized_weights),
-    )
+    return normalized_weights
 
 
 def compute_extrema_weights(
-    extrema: pd.Series,
+    n_extrema: int,
     indices: list[int],
     amplitudes: list[float],
     amplitude_threshold_ratios: list[float],
@@ -862,9 +851,11 @@ def compute_extrema_weights(
     rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
     # Phase 3: Post-processing
     gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
-) -> pd.Series:
+) -> NDArray[np.floating]:
     if len(indices) == 0 or strategy == WEIGHT_STRATEGIES[0]:  # "none"
-        return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+        return np.full(n_extrema, DEFAULT_EXTREMA_WEIGHT, dtype=float)
+
+    normalized_weights: Optional[NDArray[np.floating]] = None
 
     if (
         strategy
@@ -890,10 +881,9 @@ def compute_extrema_weights(
             weights = np.asarray([], dtype=float)
 
         if weights.size == 0:
-            return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+            return np.full(n_extrema, DEFAULT_EXTREMA_WEIGHT, dtype=float)
 
-        return calculate_extrema_weights(
-            extrema=extrema,
+        normalized_weights = calculate_extrema_weights(
             indices=indices,
             weights=weights,
             standardization=standardization,
@@ -908,8 +898,7 @@ def compute_extrema_weights(
         )
 
     if strategy == WEIGHT_STRATEGIES[6]:  # "hybrid"
-        return calculate_hybrid_extrema_weights(
-            extrema=extrema,
+        normalized_weights = calculate_hybrid_extrema_weights(
             indices=indices,
             amplitudes=amplitudes,
             amplitude_threshold_ratios=amplitude_threshold_ratios,
@@ -930,20 +919,30 @@ def compute_extrema_weights(
             gamma=gamma,
         )
 
+    if normalized_weights is not None:
+        if normalized_weights.size == 0:
+            return np.full(n_extrema, DEFAULT_EXTREMA_WEIGHT, dtype=float)
+
+        return _build_weights_array(
+            n_extrema=n_extrema,
+            indices=indices,
+            weights=normalized_weights,
+            default_weight=np.nanmedian(normalized_weights),
+        )
+
     raise ValueError(f"Unknown extrema weighting strategy: {strategy}")
 
 
-def _apply_weights(extrema: pd.Series, weights: pd.Series) -> pd.Series:
-    weights_values = weights.to_numpy(dtype=float)
-    if weights_values.size == 0:
+def _apply_weights(
+    extrema: NDArray[np.floating], weights: NDArray[np.floating]
+) -> NDArray[np.floating]:
+    if weights.size == 0:
         return extrema
 
-    if not np.isfinite(weights_values).all() or np.allclose(
-        weights_values, weights_values[0]
-    ):
+    if not np.isfinite(weights).all() or np.allclose(weights, weights[0]):
         return extrema
 
-    if np.allclose(weights_values, DEFAULT_EXTREMA_WEIGHT):
+    if np.allclose(weights, DEFAULT_EXTREMA_WEIGHT):
         return extrema
 
     return extrema * weights
@@ -980,8 +979,12 @@ def get_weighted_extrema(
     # Phase 3: Post-processing
     gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
 ) -> tuple[pd.Series, pd.Series]:
+    extrema_values = extrema.to_numpy(dtype=float)
+    extrema_index = extrema.index
+    n_extrema = len(extrema_values)
+
     weights = compute_extrema_weights(
-        extrema=extrema,
+        n_extrema=n_extrema,
         indices=indices,
         amplitudes=amplitudes,
         amplitude_threshold_ratios=amplitude_threshold_ratios,
@@ -1003,8 +1006,9 @@ def get_weighted_extrema(
         gamma=gamma,
     )
 
-    weighted_extrema = _apply_weights(extrema, weights)
-    return weighted_extrema, weights
+    return pd.Series(
+        _apply_weights(extrema_values, weights), index=extrema_index
+    ), pd.Series(weights, index=extrema_index)
 
 
 def get_callable_sha256(fn: Callable[..., Any]) -> str:
@@ -2071,23 +2075,23 @@ def largest_divisor_to_step(integer: int, step: int) -> Optional[int]:
 
 
 def soft_extremum(series: pd.Series, alpha: float) -> float:
-    np_array = series.to_numpy()
-    if np_array.size == 0:
+    values = series.to_numpy()
+    if values.size == 0:
         return np.nan
-    finite_mask = np.isfinite(np_array)
+    finite_mask = np.isfinite(values)
     if not finite_mask.any():
         return np.nan
     if np.isclose(alpha, 0.0):
-        return np.nanmean(np_array)
-    scaled_np_array = alpha * np_array
-    max_scaled_np_array = np.nanmax(scaled_np_array)
-    if np.isinf(max_scaled_np_array) or np.isnan(max_scaled_np_array):
-        return np_array[np.nanargmax(scaled_np_array)]
-    shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array)
+        return np.nanmean(values)
+    scaled_values = alpha * values
+    max_scaled_values = np.nanmax(scaled_values)
+    if np.isinf(max_scaled_values) or np.isnan(max_scaled_values):
+        return values[np.nanargmax(scaled_values)]
+    shifted_exponentials = np.exp(scaled_values - max_scaled_values)
     sum_exponentials = np.nansum(shifted_exponentials)
     if sum_exponentials == 0:
-        return np.nanmax(np_array)
-    return nan_average(np_array, weights=shifted_exponentials)
+        return np.nanmax(values)
+    return nan_average(values, weights=shifted_exponentials)
 
 
 @lru_cache(maxsize=8)