return normalized_weights
-def _weights_array_to_series(
- index: pd.Index,
+def _build_weights_array(
+ n_extrema: int,
indices: list[int],
weights: NDArray[np.floating],
default_weight: float = DEFAULT_EXTREMA_WEIGHT,
-) -> pd.Series:
+) -> NDArray[np.floating]:
if len(indices) == 0 or weights.size == 0:
- return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=index)
+ return np.full(n_extrema, DEFAULT_EXTREMA_WEIGHT, dtype=float)
if len(indices) != weights.size:
raise ValueError(
f"Length mismatch: {len(indices)} indices but {weights.size} weights"
)
- weights_series = pd.Series(default_weight, index=index)
- mask = pd.Index(indices).isin(index)
+ weights_array = np.full(n_extrema, default_weight, dtype=float)
+
+ indices_array = np.array(indices)
+ mask = (indices_array >= 0) & (indices_array < n_extrema)
+
if not np.any(mask):
- return weights_series
+ return weights_array
- valid_indices = [idx for idx, is_valid in zip(indices, mask) if is_valid]
- weights_series.loc[valid_indices] = weights[mask]
- return weights_series
+ valid_indices = indices_array[mask]
+ weights_array[valid_indices] = weights[mask]
+ return weights_array
def calculate_hybrid_extrema_weights(
- extrema: pd.Series,
indices: list[int],
amplitudes: list[float],
amplitude_threshold_ratios: list[float],
rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
# Phase 3: Post-processing
gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
-) -> pd.Series:
+) -> NDArray[np.floating]:
n = len(indices)
if n == 0:
- return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+ return np.array([], dtype=float)
if not isinstance(source_weights, dict):
source_weights = {}
- weights_by_source: dict[HybridWeightSource, NDArray[np.floating]] = {
+ weights_array_by_source: dict[HybridWeightSource, NDArray[np.floating]] = {
"amplitude": np.asarray(amplitudes, dtype=float),
"amplitude_threshold_ratio": np.asarray(
amplitude_threshold_ratios, dtype=float
}
enabled_sources: list[HybridWeightSource] = []
- source_weights_values: list[float] = []
+ source_weights_list: list[float] = []
for source in HYBRID_WEIGHT_SOURCES:
source_weight = source_weights.get(source)
if source_weight is None:
):
continue
enabled_sources.append(source)
- source_weights_values.append(float(source_weight))
+ source_weights_list.append(float(source_weight))
if len(enabled_sources) == 0:
enabled_sources = list(HYBRID_WEIGHT_SOURCES)
- source_weights_values = [1.0 for _ in enabled_sources]
+ source_weights_list = [1.0 for _ in enabled_sources]
- if any(weights_by_source[s].size != n for s in enabled_sources):
+ if any(weights_array_by_source[s].size != n for s in enabled_sources):
raise ValueError(
f"Length mismatch: hybrid {n} indices but inconsistent weights lengths"
)
- np_source_weights: NDArray[np.floating] = np.asarray(
- source_weights_values, dtype=float
+ source_weights_array: NDArray[np.floating] = np.asarray(
+ source_weights_list, dtype=float
)
- source_weights_sum = np.nansum(np.abs(np_source_weights))
- if not np.isfinite(source_weights_sum) or source_weights_sum <= 0:
- return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
- np_source_weights = np_source_weights / source_weights_sum
+ source_weights_array_sum = np.nansum(np.abs(source_weights_array))
+ if not np.isfinite(source_weights_array_sum) or source_weights_array_sum <= 0:
+ return np.array([], dtype=float)
+ source_weights_array = source_weights_array / source_weights_array_sum
- normalized_source_weights: list[NDArray[np.floating]] = []
+ normalized_source_weights_array: list[NDArray[np.floating]] = []
for source in enabled_sources:
- normalized_source_weights.append(
+ normalized_source_weights_array.append(
normalize_weights(
- weights_by_source[source],
+ weights_array_by_source[source],
standardization=standardization,
robust_quantiles=robust_quantiles,
mmad_scaling_factor=mmad_scaling_factor,
)
if aggregation == HYBRID_AGGREGATIONS[0]: # "weighted_sum"
- combined_source_weights = np.average(
- np.vstack(normalized_source_weights), axis=0, weights=np_source_weights
+ combined_source_weights_array: NDArray[np.floating] = np.average(
+ np.vstack(normalized_source_weights_array),
+ axis=0,
+ weights=source_weights_array,
)
elif aggregation == HYBRID_AGGREGATIONS[1]: # "geometric_mean"
- combined_source_weights = gmean(
- np.vstack([np.abs(values) for values in normalized_source_weights]),
+ combined_source_weights_array: NDArray[np.floating] = gmean(
+ np.vstack([np.abs(values) for values in normalized_source_weights_array]),
axis=0,
- weights=np_source_weights[:, np.newaxis],
+ weights=source_weights_array[:, np.newaxis],
)
else:
raise ValueError(f"Unknown hybrid aggregation method: {aggregation}")
if aggregation_normalization != NORMALIZATION_TYPES[6]: # "none"
- combined_source_weights = normalize_weights(
- combined_source_weights,
+ combined_source_weights_array = normalize_weights(
+ combined_source_weights_array,
standardization=STANDARDIZATION_TYPES[0],
robust_quantiles=robust_quantiles,
mmad_scaling_factor=mmad_scaling_factor,
)
if (
- combined_source_weights.size == 0
- or not np.isfinite(combined_source_weights).all()
+ combined_source_weights_array.size == 0
+ or not np.isfinite(combined_source_weights_array).all()
):
- return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+ return np.array([], dtype=float)
- return _weights_array_to_series(
- index=extrema.index,
- indices=indices,
- weights=combined_source_weights,
- default_weight=np.nanmedian(combined_source_weights),
- )
+ return combined_source_weights_array
def calculate_extrema_weights(
- extrema: pd.Series,
indices: list[int],
weights: NDArray[np.floating],
# Phase 1: Standardization
rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
# Phase 3: Post-processing
gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
-) -> pd.Series:
- """
- Calculate normalized weights for extrema points.
- Returns: Series with weights at extrema indices (rest filled with default).
- """
+) -> NDArray[np.floating]:
if len(indices) == 0 or len(weights) == 0:
- return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+ return np.array([], dtype=float)
normalized_weights = normalize_weights(
weights,
gamma=gamma,
)
- return _weights_array_to_series(
- index=extrema.index,
- indices=indices,
- weights=normalized_weights,
- default_weight=np.nanmedian(normalized_weights),
- )
+ return normalized_weights
def compute_extrema_weights(
- extrema: pd.Series,
+ n_extrema: int,
indices: list[int],
amplitudes: list[float],
amplitude_threshold_ratios: list[float],
rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
# Phase 3: Post-processing
gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
-) -> pd.Series:
+) -> NDArray[np.floating]:
if len(indices) == 0 or strategy == WEIGHT_STRATEGIES[0]: # "none"
- return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+ return np.full(n_extrema, DEFAULT_EXTREMA_WEIGHT, dtype=float)
+
+ normalized_weights: Optional[NDArray[np.floating]] = None
if (
strategy
weights = np.asarray([], dtype=float)
if weights.size == 0:
- return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
+ return np.full(n_extrema, DEFAULT_EXTREMA_WEIGHT, dtype=float)
- return calculate_extrema_weights(
- extrema=extrema,
+ normalized_weights = calculate_extrema_weights(
indices=indices,
weights=weights,
standardization=standardization,
)
if strategy == WEIGHT_STRATEGIES[6]: # "hybrid"
- return calculate_hybrid_extrema_weights(
- extrema=extrema,
+ normalized_weights = calculate_hybrid_extrema_weights(
indices=indices,
amplitudes=amplitudes,
amplitude_threshold_ratios=amplitude_threshold_ratios,
gamma=gamma,
)
+ if normalized_weights is not None:
+ if normalized_weights.size == 0:
+ return np.full(n_extrema, DEFAULT_EXTREMA_WEIGHT, dtype=float)
+
+ return _build_weights_array(
+ n_extrema=n_extrema,
+ indices=indices,
+ weights=normalized_weights,
+ default_weight=np.nanmedian(normalized_weights),
+ )
+
raise ValueError(f"Unknown extrema weighting strategy: {strategy}")
-def _apply_weights(extrema: pd.Series, weights: pd.Series) -> pd.Series:
- weights_values = weights.to_numpy(dtype=float)
- if weights_values.size == 0:
+def _apply_weights(
+ extrema: NDArray[np.floating], weights: NDArray[np.floating]
+) -> NDArray[np.floating]:
+ if weights.size == 0:
return extrema
- if not np.isfinite(weights_values).all() or np.allclose(
- weights_values, weights_values[0]
- ):
+ if not np.isfinite(weights).all() or np.allclose(weights, weights[0]):
return extrema
- if np.allclose(weights_values, DEFAULT_EXTREMA_WEIGHT):
+ if np.allclose(weights, DEFAULT_EXTREMA_WEIGHT):
return extrema
return extrema * weights
# Phase 3: Post-processing
gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
) -> tuple[pd.Series, pd.Series]:
+ extrema_values = extrema.to_numpy(dtype=float)
+ extrema_index = extrema.index
+ n_extrema = len(extrema_values)
+
weights = compute_extrema_weights(
- extrema=extrema,
+ n_extrema=n_extrema,
indices=indices,
amplitudes=amplitudes,
amplitude_threshold_ratios=amplitude_threshold_ratios,
gamma=gamma,
)
- weighted_extrema = _apply_weights(extrema, weights)
- return weighted_extrema, weights
+ return pd.Series(
+ _apply_weights(extrema_values, weights), index=extrema_index
+ ), pd.Series(weights, index=extrema_index)
def get_callable_sha256(fn: Callable[..., Any]) -> str:
def soft_extremum(series: pd.Series, alpha: float) -> float:
- np_array = series.to_numpy()
- if np_array.size == 0:
+ values = series.to_numpy()
+ if values.size == 0:
return np.nan
- finite_mask = np.isfinite(np_array)
+ finite_mask = np.isfinite(values)
if not finite_mask.any():
return np.nan
if np.isclose(alpha, 0.0):
- return np.nanmean(np_array)
- scaled_np_array = alpha * np_array
- max_scaled_np_array = np.nanmax(scaled_np_array)
- if np.isinf(max_scaled_np_array) or np.isnan(max_scaled_np_array):
- return np_array[np.nanargmax(scaled_np_array)]
- shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array)
+ return np.nanmean(values)
+ scaled_values = alpha * values
+ max_scaled_values = np.nanmax(scaled_values)
+ if np.isinf(max_scaled_values) or np.isnan(max_scaled_values):
+ return values[np.nanargmax(scaled_values)]
+ shifted_exponentials = np.exp(scaled_values - max_scaled_values)
sum_exponentials = np.nansum(shifted_exponentials)
if sum_exponentials == 0:
- return np.nanmax(np_array)
- return nan_average(np_array, weights=shifted_exponentials)
+ return np.nanmax(values)
+ return nan_average(values, weights=shifted_exponentials)
@lru_cache(maxsize=8)