import random
import time
import warnings
-from collections import Counter
from functools import lru_cache
from pathlib import Path
from typing import AbstractSet, Any, Callable, Final, Literal, Optional, Union, cast
) -> dict[str, Any]:
return split_builder(features, labels, weights, dk)
- weight_col_counts = Counter(
- label_weight_column_name(label) for label in dk.label_list
- )
- duplicates = {col: n for col, n in weight_col_counts.items() if n > 1}
- if duplicates:
- raise ValueError(
- f"Duplicate weight column names {duplicates!r} from labels "
- f"{dk.label_list}: each label must produce a unique weight_column_name"
- )
-
logger.info(f"Using data split method: {method}")
return self._train_common(unfiltered_df, pair, dk, split_fn, **kwargs)
) -> NDArray[np.floating]:
"""Build a per-row sample weight vector aligned to features_filtered.index.
- Composes freqtrade's temporal recency weight with the configured
- per-label aggregation (default ``arithmetic_mean``) of every
- per-target weight column present on ``unfiltered_df``. Alignment
- runs before any shuffle/split on ``features_filtered.index``
- (a subset of ``unfiltered_df.index``) to avoid post-hoc reindex
- against shuffled data. Iterates ``dk.label_list`` and only includes
- labels whose ``label_weight_column_name(label)`` exists on
- ``unfiltered_df``.
+ Multiplies freqtrade's per-row base weights (recency-decayed via
+ ``dk.set_weights_higher_recent`` when ``feature_parameters.weight_factor > 0``,
+ else ones) with the label importance weight column produced by
+ ``compute_label_weights`` and stored on ``unfiltered_df`` under
+ ``label_weight_column_name(LABEL_COLUMNS[0])``. Alignment runs before
+ any shuffle/split on ``features_filtered.index`` (a subset of
+ ``unfiltered_df.index``) to avoid post-hoc reindex against shuffled
+ data. When the weight column is absent, ``label_weights=None`` is
+ forwarded to ``compose_sample_weights`` and only the base weights
+ contribute.
"""
if not unfiltered_df.index.is_unique:
raise ValueError(
"unfiltered_df.index (filter_features should preserve original "
"row labels)"
)
+ if LABEL_COLUMNS[0] not in dk.label_list:
+ raise ValueError(
+ f"LABEL_COLUMNS[0]={LABEL_COLUMNS[0]!r} is not in "
+ f"dk.label_list={dk.label_list!r}: project label constant "
+ f"diverged from freqtrade's runtime label list"
+ )
n_rows = len(features_filtered)
feat_dict = self.freqai_info.get("feature_parameters", {})
weight_factor = feat_dict.get("weight_factor", 0)
and isinstance(weight_factor, (int, float))
and weight_factor > 0
):
- temporal = np.asarray(dk.set_weights_higher_recent(n_rows), dtype=float)
- else:
- temporal = np.ones(n_rows, dtype=float)
-
- per_label: dict[str, NDArray[np.floating]] = {}
- missing: list[str] = []
- for label in dk.label_list:
- col = label_weight_column_name(label)
- if col in unfiltered_df.columns:
- per_label[label] = unfiltered_df.loc[
- features_filtered.index, col
- ].to_numpy(dtype=float)
- else:
- missing.append(col)
- if per_label:
- logger.debug(
- f"per-label weight columns active: {sorted(per_label)}"
- + (f" (no weight column for: {sorted(missing)})" if missing else "")
+ base_weights = np.asarray(
+ dk.set_weights_higher_recent(n_rows), dtype=float
)
else:
+ base_weights = np.ones(n_rows, dtype=float)
+
+ weight_col = label_weight_column_name(LABEL_COLUMNS[0])
+ if weight_col in unfiltered_df.columns:
+ label_weights = unfiltered_df.loc[
+ features_filtered.index, weight_col
+ ].to_numpy(dtype=float)
+ logger.debug(f"label weight column active: {weight_col}")
+ else:
+ label_weights = None
logger.warning(
- f"no per-label weight columns found (expected: {sorted(missing)}); "
- f"falling back to temporal weights only"
+ f"label weight column not found ({weight_col!r}); "
+ f"falling back to base weights only"
)
return compose_sample_weights(
- temporal,
- per_label,
+ base_weights,
+ label_weights,
logger=logger,
)
def compose_sample_weights(
base_weights: NDArray[np.floating],
- label_weights_map: dict[str, NDArray[np.floating]],
+ label_weights: NDArray[np.floating] | None,
*,
logger: Logger,
- aggregation: CombinedAggregation = COMBINED_AGGREGATIONS[0],
- softmax_temperature: float = 1.0,
) -> NDArray[np.floating]:
- """Combine base sample weights with per-label importance weights.
+ """Combine base sample weights with the label importance weights.
- Returns w in R+^N with mean(w) == 1. Per-label arrays are sanitized
- (non-finite or <= 0 -> row dropped), individually mean-normalized,
- aggregated row-wise via ``aggregation`` (default arithmetic_mean),
- multiplied with base_weights, zeroed on dropped rows, and renormalized
- to mean=1.
+ Returns w in R+^N with mean(w) == 1. The label weight vector is sanitized
+ (non-finite or <= 0 -> row dropped) and mean-normalized, multiplied with
+ base_weights, zeroed on dropped rows, and renormalized to mean=1.
+
+ The label weight vector is the output of ``compute_label_weights`` (which
+ already aggregates the configured metric sources via ``label_weighting``),
+ co-smoothed with the label column in ``set_freqai_targets`` and clipped
+ to a finite non-negative range. ``LABEL_COLUMNS`` is single-target by
+ design (one prediction target per model).
Raises ValueError on shape mismatch or when every row is dropped.
- Default-weight imputation in compute_label_weights uses full-series
- median (bounded leakage; see AFML chapter 4).
"""
base_weights = np.asarray(base_weights, dtype=float)
- if not label_weights_map:
+ if label_weights is None:
return sanitize_and_renormalize(base_weights)
n = len(base_weights)
- for label, label_values in label_weights_map.items():
- arr = np.asarray(label_values, dtype=float)
- if arr.shape != (n,):
- raise ValueError(
- f"compose_sample_weights: label {label!r} has shape {arr.shape}, "
- f"expected ({n},)"
- )
- normalized_per_label: list[NDArray[np.floating]] = []
- drop_mask = np.zeros(n, dtype=bool)
- for label_values in label_weights_map.values():
- arr = np.asarray(label_values, dtype=float)
- invalid = ~np.isfinite(arr) | (arr <= 0.0)
- drop_mask |= invalid
- arr = np.where(invalid, 1.0, np.maximum(arr, np.finfo(float).tiny))
- normalized_per_label.append(sanitize_and_renormalize(arr))
+ arr = np.asarray(label_weights, dtype=float)
+ if arr.shape != (n,):
+ raise ValueError(
+ f"compose_sample_weights: label_weights has shape {arr.shape}, "
+ f"expected ({n},)"
+ )
+ drop_mask = ~np.isfinite(arr) | (arr <= 0.0)
if drop_mask.all():
raise ValueError(
- f"compose_sample_weights: all rows dropped by per-label zero weights "
- f"(labels={list(label_weights_map)}); no surviving training samples"
- )
- stacked = np.vstack(normalized_per_label)
- agg = _aggregate_metrics(
- stacked_metrics=stacked,
- coefficients=np.ones(stacked.shape[0], dtype=float),
- aggregation=aggregation,
- softmax_temperature=softmax_temperature,
- )
- combined = base_weights * agg
+ "compose_sample_weights: all rows dropped by zero or non-finite "
+ "label weights; no surviving training samples"
+ )
+ sanitized = np.where(drop_mask, 1.0, np.maximum(arr, np.finfo(float).tiny))
+ normalized = sanitize_and_renormalize(sanitized)
+ combined = base_weights * normalized
combined[drop_mask] = 0.0
combined_sum = combined.sum()
if combined_sum > 0 and np.isfinite(combined_sum):
if np.all(np.isfinite(scaled)):
return scaled
logger.warning(
- "compose_sample_weights: aggregated weights collapsed (labels=%s, "
- "aggregation=%s, combined_sum=%r); falling back to base weights",
- list(label_weights_map),
- aggregation,
+ "compose_sample_weights: composed weights collapsed "
+ "(combined_sum=%r); falling back to base weights",
combined_sum,
)
return sanitize_and_renormalize(base_weights, drop_mask=drop_mask)