from freqtrade.persistence import Trade
from freqtrade.strategy import stoploss_from_absolute
from freqtrade.strategy.interface import IStrategy
-from numpy.typing import NDArray
from pandas import DataFrame, Series, isna
from scipy.stats import t
from technical.pivots_points import pivots_points
DEFAULTS_EXTREMA_SMOOTHING,
DEFAULTS_EXTREMA_WEIGHTING,
EXTREMA_COLUMN,
+ HYBRID_AGGREGATIONS,
+ HYBRID_WEIGHT_SOURCES,
MAXIMA_THRESHOLD_COLUMN,
MINIMA_THRESHOLD_COLUMN,
NORMALIZATION_TYPES,
STANDARDIZATION_TYPES,
WEIGHT_STRATEGIES,
TrendDirection,
- WeightStrategy,
alligator,
bottom_change_percent,
calculate_n_extrema,
extrema_weighting = self.freqai_info.get("extrema_weighting", {})
if not isinstance(extrema_weighting, dict):
extrema_weighting = {}
- return QuickAdapterV3._get_extrema_weighting_params(extrema_weighting, {})
+ return QuickAdapterV3._get_extrema_weighting_params(extrema_weighting)
@cached_property
def extrema_smoothing(self) -> dict[str, Any]:
extrema_smoothing = self.freqai_info.get("extrema_smoothing", {})
if not isinstance(extrema_smoothing, dict):
extrema_smoothing = {}
- return QuickAdapterV3._get_extrema_smoothing_params(extrema_smoothing, {})
+ return QuickAdapterV3._get_extrema_smoothing_params(extrema_smoothing)
def bot_start(self, **kwargs) -> None:
self.pairs: list[str] = self.config.get("exchange", {}).get("pair_whitelist")
@staticmethod
def _get_extrema_weighting_params(
- extrema_weighting: dict[str, Any], pair: str
+ extrema_weighting: dict[str, Any],
) -> dict[str, Any]:
# Strategy
weighting_strategy = str(
)
if weighting_strategy not in set(WEIGHT_STRATEGIES):
logger.warning(
- f"{pair}: invalid extrema_weighting strategy '{weighting_strategy}', using default '{WEIGHT_STRATEGIES[0]}'"
+ f"Invalid extrema_weighting strategy '{weighting_strategy}', using default '{WEIGHT_STRATEGIES[0]}'"
)
weighting_strategy = WEIGHT_STRATEGIES[0]
)
if weighting_standardization not in set(STANDARDIZATION_TYPES):
logger.warning(
- f"{pair}: invalid extrema_weighting standardization '{weighting_standardization}', using default '{STANDARDIZATION_TYPES[0]}'"
+ f"Invalid extrema_weighting standardization '{weighting_standardization}', using default '{STANDARDIZATION_TYPES[0]}'"
)
weighting_standardization = STANDARDIZATION_TYPES[0]
or weighting_robust_quantiles[0] >= weighting_robust_quantiles[1]
):
logger.warning(
- f"{pair}: invalid extrema_weighting robust_quantiles {weighting_robust_quantiles}, must be (q1, q3) with 0 <= q1 < q3 <= 1, using default {DEFAULTS_EXTREMA_WEIGHTING['robust_quantiles']}"
+ f"Invalid extrema_weighting robust_quantiles {weighting_robust_quantiles}, must be (q1, q3) with 0 <= q1 < q3 <= 1, using default {DEFAULTS_EXTREMA_WEIGHTING['robust_quantiles']}"
)
weighting_robust_quantiles = DEFAULTS_EXTREMA_WEIGHTING["robust_quantiles"]
else:
or weighting_mmad_scaling_factor <= 0
):
logger.warning(
- f"{pair}: invalid extrema_weighting mmad_scaling_factor {weighting_mmad_scaling_factor}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['mmad_scaling_factor']}"
+ f"Invalid extrema_weighting mmad_scaling_factor {weighting_mmad_scaling_factor}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['mmad_scaling_factor']}"
)
weighting_mmad_scaling_factor = DEFAULTS_EXTREMA_WEIGHTING[
"mmad_scaling_factor"
)
if weighting_normalization not in set(NORMALIZATION_TYPES):
logger.warning(
- f"{pair}: invalid extrema_weighting normalization '{weighting_normalization}', using default '{NORMALIZATION_TYPES[0]}'"
+ f"Invalid extrema_weighting normalization '{weighting_normalization}', using default '{NORMALIZATION_TYPES[0]}'"
)
weighting_normalization = NORMALIZATION_TYPES[0]
}
):
raise ValueError(
- f"{pair}: invalid extrema_weighting configuration: "
+ f"Invalid extrema_weighting configuration: "
f"standardization='{weighting_standardization}' with normalization='{weighting_normalization}' "
"can produce negative weights and flip ternary extrema labels. "
f"Use normalization in {{'{NORMALIZATION_TYPES[0]}','{NORMALIZATION_TYPES[1]}','{NORMALIZATION_TYPES[2]}','{NORMALIZATION_TYPES[5]}'}} "
or weighting_minmax_range[0] >= weighting_minmax_range[1]
):
logger.warning(
- f"{pair}: invalid extrema_weighting minmax_range {weighting_minmax_range}, must be (min, max) with min < max, using default {DEFAULTS_EXTREMA_WEIGHTING['minmax_range']}"
+ f"Invalid extrema_weighting minmax_range {weighting_minmax_range}, must be (min, max) with min < max, using default {DEFAULTS_EXTREMA_WEIGHTING['minmax_range']}"
)
weighting_minmax_range = DEFAULTS_EXTREMA_WEIGHTING["minmax_range"]
else:
or weighting_sigmoid_scale <= 0
):
logger.warning(
- f"{pair}: invalid extrema_weighting sigmoid_scale {weighting_sigmoid_scale}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['sigmoid_scale']}"
+ f"Invalid extrema_weighting sigmoid_scale {weighting_sigmoid_scale}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['sigmoid_scale']}"
)
weighting_sigmoid_scale = DEFAULTS_EXTREMA_WEIGHTING["sigmoid_scale"]
or weighting_softmax_temperature <= 0
):
logger.warning(
- f"{pair}: invalid extrema_weighting softmax_temperature {weighting_softmax_temperature}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['softmax_temperature']}"
+ f"Invalid extrema_weighting softmax_temperature {weighting_softmax_temperature}, must be > 0, using default {DEFAULTS_EXTREMA_WEIGHTING['softmax_temperature']}"
)
weighting_softmax_temperature = DEFAULTS_EXTREMA_WEIGHTING[
"softmax_temperature"
)
if weighting_rank_method not in set(RANK_METHODS):
logger.warning(
- f"{pair}: invalid extrema_weighting rank_method '{weighting_rank_method}', using default '{RANK_METHODS[0]}'"
+ f"Invalid extrema_weighting rank_method '{weighting_rank_method}', using default '{RANK_METHODS[0]}'"
)
weighting_rank_method = RANK_METHODS[0]
or not (0 < weighting_gamma <= 10.0)
):
logger.warning(
- f"{pair}: invalid extrema_weighting gamma {weighting_gamma}, must be a finite number in (0, 10], using default {DEFAULTS_EXTREMA_WEIGHTING['gamma']}"
+ f"Invalid extrema_weighting gamma {weighting_gamma}, must be a finite number in (0, 10], using default {DEFAULTS_EXTREMA_WEIGHTING['gamma']}"
)
weighting_gamma = DEFAULTS_EXTREMA_WEIGHTING["gamma"]
+ weighting_source_weights = extrema_weighting.get(
+ "source_weights", DEFAULTS_EXTREMA_WEIGHTING["source_weights"]
+ )
+ if not isinstance(weighting_source_weights, dict):
+ logger.warning(
+ f"Invalid extrema_weighting source_weights {weighting_source_weights}, must be a dict of source name to weight, using default {DEFAULTS_EXTREMA_WEIGHTING['source_weights']}"
+ )
+ weighting_source_weights = DEFAULTS_EXTREMA_WEIGHTING["source_weights"]
+ else:
+ sanitized_source_weights: dict[str, float] = {}
+ for source, weight in weighting_source_weights.items():
+ if source not in set(HYBRID_WEIGHT_SOURCES):
+ continue
+ if (
+ not isinstance(weight, (int, float))
+ or not np.isfinite(weight)
+ or weight < 0
+ ):
+ continue
+ sanitized_source_weights[str(source)] = float(weight)
+ if not sanitized_source_weights:
+ logger.warning(
+ f"Invalid/empty extrema_weighting source_weights, using default {DEFAULTS_EXTREMA_WEIGHTING['source_weights']}"
+ )
+ weighting_source_weights = DEFAULTS_EXTREMA_WEIGHTING["source_weights"]
+ else:
+ weighting_source_weights = sanitized_source_weights
+ weighting_aggregation = str(
+ extrema_weighting.get(
+ "aggregation",
+ DEFAULTS_EXTREMA_WEIGHTING["aggregation"],
+ )
+ )
+ if weighting_aggregation not in set(HYBRID_AGGREGATIONS):
+ logger.warning(
+ f"Invalid extrema_weighting aggregation '{weighting_aggregation}', using default '{HYBRID_AGGREGATIONS[0]}'"
+ )
+ weighting_aggregation = DEFAULTS_EXTREMA_WEIGHTING["aggregation"]
+ weighting_aggregation_normalization = str(
+ extrema_weighting.get(
+ "aggregation_normalization",
+ DEFAULTS_EXTREMA_WEIGHTING["aggregation_normalization"],
+ )
+ )
+ if weighting_aggregation_normalization not in set(NORMALIZATION_TYPES):
+ logger.warning(
+ f"Invalid extrema_weighting aggregation_normalization '{weighting_aggregation_normalization}', using default '{NORMALIZATION_TYPES[6]}'"
+ )
+ weighting_aggregation_normalization = DEFAULTS_EXTREMA_WEIGHTING[
+ "aggregation_normalization"
+ ]
+
return {
"strategy": weighting_strategy,
+ "source_weights": weighting_source_weights,
+ "aggregation": weighting_aggregation,
+ "aggregation_normalization": weighting_aggregation_normalization,
+ # Phase 1: Standardization
"standardization": weighting_standardization,
"robust_quantiles": weighting_robust_quantiles,
"mmad_scaling_factor": weighting_mmad_scaling_factor,
+ # Phase 2: Normalization
"normalization": weighting_normalization,
"minmax_range": weighting_minmax_range,
"sigmoid_scale": weighting_sigmoid_scale,
"softmax_temperature": weighting_softmax_temperature,
"rank_method": weighting_rank_method,
+ # Phase 3: Post-processing
"gamma": weighting_gamma,
}
@staticmethod
def _get_extrema_smoothing_params(
- extrema_smoothing: dict[str, Any], pair: str
+ extrema_smoothing: dict[str, Any],
) -> dict[str, Any]:
smoothing_method = str(
extrema_smoothing.get("method", DEFAULTS_EXTREMA_SMOOTHING["method"])
)
if smoothing_method not in set(SMOOTHING_METHODS):
logger.warning(
- f"{pair}: invalid extrema_smoothing method '{smoothing_method}', using default '{SMOOTHING_METHODS[0]}'"
+ f"Invalid extrema_smoothing method '{smoothing_method}', using default '{SMOOTHING_METHODS[0]}'"
)
smoothing_method = SMOOTHING_METHODS[0]
)
if not isinstance(smoothing_window, int) or smoothing_window < 3:
logger.warning(
- f"{pair}: invalid extrema_smoothing window {smoothing_window}, must be an integer >= 3, using default {DEFAULTS_EXTREMA_SMOOTHING['window']}"
+ f"Invalid extrema_smoothing window {smoothing_window}, must be an integer >= 3, using default {DEFAULTS_EXTREMA_SMOOTHING['window']}"
)
smoothing_window = DEFAULTS_EXTREMA_SMOOTHING["window"]
or smoothing_beta <= 0
):
logger.warning(
- f"{pair}: invalid extrema_smoothing beta {smoothing_beta}, must be a finite number > 0, using default {DEFAULTS_EXTREMA_SMOOTHING['beta']}"
+ f"Invalid extrema_smoothing beta {smoothing_beta}, must be a finite number > 0, using default {DEFAULTS_EXTREMA_SMOOTHING['beta']}"
)
smoothing_beta = DEFAULTS_EXTREMA_SMOOTHING["beta"]
)
if not isinstance(smoothing_polyorder, int) or smoothing_polyorder < 1:
logger.warning(
- f"{pair}: invalid extrema_smoothing polyorder {smoothing_polyorder}, must be an integer >= 1, using default {DEFAULTS_EXTREMA_SMOOTHING['polyorder']}"
+ f"Invalid extrema_smoothing polyorder {smoothing_polyorder}, must be an integer >= 1, using default {DEFAULTS_EXTREMA_SMOOTHING['polyorder']}"
)
smoothing_polyorder = DEFAULTS_EXTREMA_SMOOTHING["polyorder"]
)
if smoothing_mode not in set(SMOOTHING_MODES):
logger.warning(
- f"{pair}: invalid extrema_smoothing mode '{smoothing_mode}', using default '{SMOOTHING_MODES[0]}'"
+ f"Invalid extrema_smoothing mode '{smoothing_mode}', using default '{SMOOTHING_MODES[0]}'"
)
smoothing_mode = SMOOTHING_MODES[0]
or not np.isfinite(smoothing_bandwidth)
):
logger.warning(
- f"{pair}: invalid extrema_smoothing bandwidth {smoothing_bandwidth}, must be a positive finite number, using default {DEFAULTS_EXTREMA_SMOOTHING['bandwidth']}"
+ f"Invalid extrema_smoothing bandwidth {smoothing_bandwidth}, must be a positive finite number, using default {DEFAULTS_EXTREMA_SMOOTHING['bandwidth']}"
)
smoothing_bandwidth = DEFAULTS_EXTREMA_SMOOTHING["bandwidth"]
except (KeyError, ValueError) as e:
raise ValueError(f"Invalid pattern '{pattern}': {repr(e)}")
- @staticmethod
- def _get_weights(
- strategy: WeightStrategy,
- amplitudes: list[float],
- amplitude_threshold_ratios: list[float],
- volumes: list[float],
- speeds: list[float],
- efficiency_ratios: list[float],
- ) -> NDArray[np.floating]:
- if strategy == WEIGHT_STRATEGIES[1]: # "amplitude"
- return np.array(amplitudes)
- if strategy == WEIGHT_STRATEGIES[2]: # "amplitude_threshold_ratio"
- return np.array(amplitude_threshold_ratios)
- if strategy == WEIGHT_STRATEGIES[3]: # "volume"
- return np.array(volumes)
- if strategy == WEIGHT_STRATEGIES[4]: # "speed"
- return np.array(speeds)
- if strategy == WEIGHT_STRATEGIES[5]: # "efficiency_ratio"
- return np.array(efficiency_ratios)
- return np.array([])
-
def set_freqai_targets(
self, dataframe: DataFrame, metadata: dict[str, Any], **kwargs
) -> DataFrame:
f"{pair}: labeled {len(pivots_indices)} extrema (label_period={QuickAdapterV3._td_format(label_period)} / {label_period_candles=} / {label_natr_ratio=:.2f})"
)
- pivot_weights = QuickAdapterV3._get_weights(
- self.extrema_weighting["strategy"],
- pivots_amplitudes,
- pivots_amplitude_threshold_ratios,
- pivots_volumes,
- pivots_speeds,
- pivots_efficiency_ratios,
- )
weighted_extrema, _ = get_weighted_extrema(
- extrema=dataframe[EXTREMA_COLUMN],
+ series=dataframe[EXTREMA_COLUMN],
indices=pivots_indices,
- weights=pivot_weights,
+ amplitudes=pivots_amplitudes,
+ amplitude_threshold_ratios=pivots_amplitude_threshold_ratios,
+ volumes=pivots_volumes,
+ speeds=pivots_speeds,
+ efficiency_ratios=pivots_efficiency_ratios,
+ source_weights=self.extrema_weighting["source_weights"],
strategy=self.extrema_weighting["strategy"],
+ aggregation=self.extrema_weighting["aggregation"],
+ aggregation_normalization=self.extrema_weighting[
+ "aggregation_normalization"
+ ],
standardization=self.extrema_weighting["standardization"],
robust_quantiles=self.extrema_weighting["robust_quantiles"],
mmad_scaling_factor=self.extrema_weighting["mmad_scaling_factor"],
"volume",
"speed",
"efficiency_ratio",
+ "hybrid",
]
WEIGHT_STRATEGIES: Final[tuple[WeightStrategy, ...]] = (
"none",
"volume",
"speed",
"efficiency_ratio",
+ "hybrid",
+)
+
+HybridWeightSource = Literal[
+ "amplitude",
+ "amplitude_threshold_ratio",
+ "volume",
+ "speed",
+ "efficiency_ratio",
+]
+HYBRID_WEIGHT_SOURCES: Final[tuple[HybridWeightSource, ...]] = (
+ "amplitude",
+ "amplitude_threshold_ratio",
+ "volume",
+ "speed",
+ "efficiency_ratio",
+)
+
+HybridAggregation = Literal["weighted_sum", "geometric_mean"]
+HYBRID_AGGREGATIONS: Final[tuple[HybridAggregation, ...]] = (
+ "weighted_sum",
+ "geometric_mean",
)
EXTREMA_COLUMN: Final = "&s-extrema"
DEFAULTS_EXTREMA_WEIGHTING: Final[dict[str, Any]] = {
"strategy": WEIGHT_STRATEGIES[0], # "none"
+ "source_weights": {s: 1.0 for s in HYBRID_WEIGHT_SOURCES},
+ "aggregation": HYBRID_AGGREGATIONS[0], # "weighted_sum"
+ "aggregation_normalization": NORMALIZATION_TYPES[6], # "none"
# Phase 1: Standardization
"standardization": STANDARDIZATION_TYPES[0], # "none"
"robust_quantiles": (0.25, 0.75),
def _normalize_l1(weights: NDArray[np.floating]) -> NDArray[np.floating]:
"""L1 normalization: w / Σ|w| → Σ|w| = 1"""
- weights_sum = np.sum(np.abs(weights))
+ weights_sum = np.nansum(np.abs(weights))
if weights_sum <= 0 or not np.isfinite(weights_sum):
return np.full_like(weights, DEFAULT_EXTREMA_WEIGHT, dtype=float)
return weights / weights_sum
return normalized_weights
+def _weights_array_to_series(
+ index: pd.Index,
+ indices: list[int],
+ weights: NDArray[np.floating],
+ default_weight: float = DEFAULT_EXTREMA_WEIGHT,
+) -> pd.Series:
+ weights_series = pd.Series(default_weight, index=index)
+
+ if len(indices) == 0 or weights.size == 0:
+ return weights_series
+
+ if len(indices) != weights.size:
+ raise ValueError(
+ f"Length mismatch: {len(indices)} indices but {weights.size} weights"
+ )
+
+ mask = pd.Index(indices).isin(index)
+ if not np.any(mask):
+ return weights_series
+
+ valid_indices = [idx for idx, is_valid in zip(indices, mask) if is_valid]
+ weights_series.loc[valid_indices] = weights[mask]
+ return weights_series
+
+
+def calculate_hybrid_extrema_weights(
+ series: pd.Series,
+ indices: list[int],
+ amplitudes: list[float],
+ amplitude_threshold_ratios: list[float],
+ volumes: list[float],
+ speeds: list[float],
+ efficiency_ratios: list[float],
+ source_weights: dict[str, float],
+ aggregation: HybridAggregation = DEFAULTS_EXTREMA_WEIGHTING["aggregation"],
+ aggregation_normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING[
+ "aggregation_normalization"
+ ],
+ # Phase 1: Standardization
+ standardization: StandardizationType = DEFAULTS_EXTREMA_WEIGHTING[
+ "standardization"
+ ],
+ robust_quantiles: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING[
+ "robust_quantiles"
+ ],
+ mmad_scaling_factor: float = DEFAULTS_EXTREMA_WEIGHTING["mmad_scaling_factor"],
+ # Phase 2: Normalization
+ normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING["normalization"],
+ minmax_range: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING["minmax_range"],
+ sigmoid_scale: float = DEFAULTS_EXTREMA_WEIGHTING["sigmoid_scale"],
+ softmax_temperature: float = DEFAULTS_EXTREMA_WEIGHTING["softmax_temperature"],
+ rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
+ # Phase 3: Post-processing
+ gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
+) -> pd.Series:
+ n = len(indices)
+ if n == 0:
+ return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index)
+
+ if not isinstance(source_weights, dict):
+ source_weights = {}
+
+ weights_by_source: dict[HybridWeightSource, NDArray[np.floating]] = {
+ "amplitude": np.asarray(amplitudes, dtype=float),
+ "amplitude_threshold_ratio": np.asarray(
+ amplitude_threshold_ratios, dtype=float
+ ),
+ "volume": np.asarray(volumes, dtype=float),
+ "speed": np.asarray(speeds, dtype=float),
+ "efficiency_ratio": np.asarray(efficiency_ratios, dtype=float),
+ }
+
+ enabled_sources: list[HybridWeightSource] = []
+ source_weights_values: list[float] = []
+ for source in HYBRID_WEIGHT_SOURCES:
+ source_weight = source_weights.get(source)
+ if source_weight is None:
+ continue
+ if (
+ not isinstance(source_weight, (int, float))
+ or not np.isfinite(source_weight)
+ or source_weight < 0
+ ):
+ continue
+ enabled_sources.append(source)
+ source_weights_values.append(float(source_weight))
+
+ if len(enabled_sources) == 0:
+ enabled_sources = list(HYBRID_WEIGHT_SOURCES)
+ source_weights_values = [1.0 for _ in enabled_sources]
+
+ if any(weights_by_source[s].size != n for s in enabled_sources):
+ raise ValueError(
+ f"Length mismatch: hybrid {n} indices but inconsistent weights lengths"
+ )
+
+ np_source_weights: NDArray[np.floating] = np.asarray(
+ source_weights_values, dtype=float
+ )
+ source_weights_sum = np.nansum(np.abs(np_source_weights))
+ if not np.isfinite(source_weights_sum) or source_weights_sum <= 0:
+ return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index)
+ np_source_weights = np_source_weights / source_weights_sum
+
+ normalized_source_weights: list[NDArray[np.floating]] = []
+ for source in enabled_sources:
+ normalized_source_weights.append(
+ normalize_weights(
+ weights_by_source[source],
+ standardization=standardization,
+ robust_quantiles=robust_quantiles,
+ mmad_scaling_factor=mmad_scaling_factor,
+ normalization=normalization,
+ minmax_range=minmax_range,
+ sigmoid_scale=sigmoid_scale,
+ softmax_temperature=softmax_temperature,
+ rank_method=rank_method,
+ gamma=gamma,
+ )
+ )
+
+ if aggregation == HYBRID_AGGREGATIONS[0]: # "weighted_sum"
+ combined_source_weights = np.zeros(n, dtype=float)
+ for source_weight, values in zip(np_source_weights, normalized_source_weights):
+ combined_source_weights = combined_source_weights + source_weight * values
+ elif aggregation == HYBRID_AGGREGATIONS[1]: # "geometric_mean"
+ combined_source_weights = sp.stats.gmean(
+ np.vstack([np.abs(values) for values in normalized_source_weights]),
+ axis=0,
+ weights=np_source_weights,
+ )
+ else:
+ raise ValueError(f"Unknown hybrid aggregation method: {aggregation}")
+
+ if aggregation_normalization != NORMALIZATION_TYPES[6]: # "none"
+ combined_source_weights = normalize_weights(
+ combined_source_weights,
+ standardization=STANDARDIZATION_TYPES[0],
+ robust_quantiles=robust_quantiles,
+ mmad_scaling_factor=mmad_scaling_factor,
+ normalization=aggregation_normalization,
+ minmax_range=minmax_range,
+ sigmoid_scale=sigmoid_scale,
+ softmax_temperature=softmax_temperature,
+ rank_method=rank_method,
+ gamma=1.0,
+ )
+
+ if (
+ combined_source_weights.size == 0
+ or not np.isfinite(combined_source_weights).all()
+ ):
+ return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index)
+
+ return _weights_array_to_series(
+ index=series.index,
+ indices=indices,
+ weights=combined_source_weights,
+ default_weight=np.nanmedian(combined_source_weights),
+ )
+
+
def calculate_extrema_weights(
series: pd.Series,
indices: list[int],
):
normalized_weights = np.full_like(normalized_weights, DEFAULT_EXTREMA_WEIGHT)
- weights_series = pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index)
- mask = pd.Index(indices).isin(series.index)
- normalized_weights = normalized_weights[mask]
- valid_indices = [idx for idx, is_valid in zip(indices, mask) if is_valid]
- if len(valid_indices) > 0:
- weights_series.loc[valid_indices] = normalized_weights
- return weights_series
+ return _weights_array_to_series(
+ index=series.index,
+ indices=indices,
+ weights=normalized_weights,
+ default_weight=np.nanmedian(normalized_weights),
+ )
-def get_weighted_extrema(
- extrema: pd.Series,
+def compute_extrema_weights(
+ series: pd.Series,
indices: list[int],
- weights: NDArray[np.floating],
+ amplitudes: list[float],
+ amplitude_threshold_ratios: list[float],
+ volumes: list[float],
+ speeds: list[float],
+ efficiency_ratios: list[float],
+ source_weights: dict[str, float],
strategy: WeightStrategy = DEFAULTS_EXTREMA_WEIGHTING["strategy"],
+ aggregation: HybridAggregation = DEFAULTS_EXTREMA_WEIGHTING["aggregation"],
+ aggregation_normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING[
+ "aggregation_normalization"
+ ],
# Phase 1: Standardization
standardization: StandardizationType = DEFAULTS_EXTREMA_WEIGHTING[
"standardization"
rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
# Phase 3: Post-processing
gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
-) -> tuple[pd.Series, pd.Series]:
- """
- Apply weighted normalization to extrema series.
-
- Args:
- extrema: Extrema series
- indices: Indices of extrema points
- weights: Raw weights for each extremum
- strategy: Weight strategy ("none", "amplitude", "amplitude_threshold_ratio", "volume", "speed", "efficiency_ratio")
- standardization: Standardization method
- robust_quantiles: Quantiles for robust standardization
- mmad_scaling_factor: Scaling factor for MMAD standardization
- normalization: Normalization method
- minmax_range: Target range for minmax
- sigmoid_scale: Scale for sigmoid
- softmax_temperature: Temperature for softmax
- rank_method: Method for rank normalization
- gamma: Gamma correction
-
- Returns:
- Tuple of (weighted_extrema, extrema_weights)
- """
- default_weights = pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
- if (
- len(indices) == 0 or len(weights) == 0 or strategy == WEIGHT_STRATEGIES[0]
- ): # "none"
- return extrema, default_weights
+) -> pd.Series:
+ if len(indices) == 0 or strategy == WEIGHT_STRATEGIES[0]: # "none"
+ return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index)
- if (
- strategy
- in {
- WEIGHT_STRATEGIES[1],
- WEIGHT_STRATEGIES[2],
- WEIGHT_STRATEGIES[3],
- WEIGHT_STRATEGIES[4],
- WEIGHT_STRATEGIES[5],
- }
- ): # "amplitude" / "amplitude_threshold_ratio" / "volume" / "speed" / "efficiency_ratio"
- extrema_weights = calculate_extrema_weights(
- series=extrema,
+ if strategy in {
+ WEIGHT_STRATEGIES[1],
+ WEIGHT_STRATEGIES[2],
+ WEIGHT_STRATEGIES[3],
+ WEIGHT_STRATEGIES[4],
+ WEIGHT_STRATEGIES[5],
+ }:
+ if strategy == WEIGHT_STRATEGIES[1]: # "amplitude"
+ weights = np.asarray(amplitudes, dtype=float)
+ elif strategy == WEIGHT_STRATEGIES[2]: # "amplitude_threshold_ratio"
+ weights = np.asarray(amplitude_threshold_ratios, dtype=float)
+ elif strategy == WEIGHT_STRATEGIES[3]: # "volume"
+ weights = np.asarray(volumes, dtype=float)
+ elif strategy == WEIGHT_STRATEGIES[4]: # "speed"
+ weights = np.asarray(speeds, dtype=float)
+ elif strategy == WEIGHT_STRATEGIES[5]: # "efficiency_ratio"
+ weights = np.asarray(efficiency_ratios, dtype=float)
+ else:
+ weights = np.asarray([], dtype=float)
+
+ if weights.size == 0:
+ return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=series.index)
+
+ return calculate_extrema_weights(
+ series=series,
indices=indices,
weights=weights,
standardization=standardization,
rank_method=rank_method,
gamma=gamma,
)
- if np.allclose(extrema_weights, DEFAULT_EXTREMA_WEIGHT):
- return extrema, default_weights
- return extrema * extrema_weights, extrema_weights
- raise ValueError(f"Unknown weight strategy: {strategy}")
+ if strategy == WEIGHT_STRATEGIES[6]: # "hybrid"
+ return calculate_hybrid_extrema_weights(
+ series=series,
+ indices=indices,
+ amplitudes=amplitudes,
+ amplitude_threshold_ratios=amplitude_threshold_ratios,
+ volumes=volumes,
+ speeds=speeds,
+ efficiency_ratios=efficiency_ratios,
+ source_weights=source_weights,
+ aggregation=aggregation,
+ aggregation_normalization=aggregation_normalization,
+ standardization=standardization,
+ robust_quantiles=robust_quantiles,
+ mmad_scaling_factor=mmad_scaling_factor,
+ normalization=normalization,
+ minmax_range=minmax_range,
+ sigmoid_scale=sigmoid_scale,
+ softmax_temperature=softmax_temperature,
+ rank_method=rank_method,
+ gamma=gamma,
+ )
+
+ raise ValueError(f"Unknown extrema weighting strategy: {strategy}")
+
+
+def apply_weights(series: pd.Series, weights: pd.Series) -> pd.Series:
+ if weights.empty:
+ return series
+ if np.allclose(weights.to_numpy(dtype=float), DEFAULT_EXTREMA_WEIGHT):
+ return series
+ return series * weights
+
+
+def get_weighted_extrema(
+ series: pd.Series,
+ indices: list[int],
+ amplitudes: list[float],
+ amplitude_threshold_ratios: list[float],
+ volumes: list[float],
+ speeds: list[float],
+ efficiency_ratios: list[float],
+ source_weights: dict[str, float],
+ strategy: WeightStrategy = DEFAULTS_EXTREMA_WEIGHTING["strategy"],
+ aggregation: HybridAggregation = DEFAULTS_EXTREMA_WEIGHTING["aggregation"],
+ aggregation_normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING[
+ "aggregation_normalization"
+ ],
+ # Phase 1: Standardization
+ standardization: StandardizationType = DEFAULTS_EXTREMA_WEIGHTING[
+ "standardization"
+ ],
+ robust_quantiles: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING[
+ "robust_quantiles"
+ ],
+ mmad_scaling_factor: float = DEFAULTS_EXTREMA_WEIGHTING["mmad_scaling_factor"],
+ # Phase 2: Normalization
+ normalization: NormalizationType = DEFAULTS_EXTREMA_WEIGHTING["normalization"],
+ minmax_range: tuple[float, float] = DEFAULTS_EXTREMA_WEIGHTING["minmax_range"],
+ sigmoid_scale: float = DEFAULTS_EXTREMA_WEIGHTING["sigmoid_scale"],
+ softmax_temperature: float = DEFAULTS_EXTREMA_WEIGHTING["softmax_temperature"],
+ rank_method: RankMethod = DEFAULTS_EXTREMA_WEIGHTING["rank_method"],
+ # Phase 3: Post-processing
+ gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
+) -> tuple[pd.Series, pd.Series]:
+ """Apply extrema weighting and return (weighted_extrema, extrema_weights)."""
+
+ weights = compute_extrema_weights(
+ series=series,
+ indices=indices,
+ amplitudes=amplitudes,
+ amplitude_threshold_ratios=amplitude_threshold_ratios,
+ volumes=volumes,
+ speeds=speeds,
+ efficiency_ratios=efficiency_ratios,
+ source_weights=source_weights,
+ strategy=strategy,
+ aggregation=aggregation,
+ aggregation_normalization=aggregation_normalization,
+ standardization=standardization,
+ robust_quantiles=robust_quantiles,
+ mmad_scaling_factor=mmad_scaling_factor,
+ normalization=normalization,
+ minmax_range=minmax_range,
+ sigmoid_scale=sigmoid_scale,
+ softmax_temperature=softmax_temperature,
+ rank_method=rank_method,
+ gamma=gamma,
+ )
+
+ weighted_extrema = apply_weights(series, weights)
+ return weighted_extrema, weights
def get_callable_sha256(fn: Callable[..., Any]) -> str: