| freqai.label_weighting.fill_method | `zero` | enum {`zero`,`epsilon`,`gaussian`} | Off-pivot weighting scheme. `zero` hard-zeros off-pivot rows; `epsilon` applies a flat baseline `fill_epsilon * <fill_epsilon_baseline>(pivot_weights)`; `gaussian` applies heatmap-style decay around each pivot. Switching away from `zero` may require retuning tree-leaf regularization (`min_child_weight`, `lambda`) and resetting any prior Optuna study. Changing this parameter requires deleting trained models. |
| freqai.label_weighting.fill_epsilon | 0.001 | float [0,1] | Off-pivot fraction of the pivot baseline. Ignored when `fill_method != "epsilon"`. |
| freqai.label_weighting.fill_epsilon_baseline | `mean` | enum {`mean`,`median`} | Pivot baseline statistic. `mean` tracks central tendency; `median` is robust against pivot-weight skew. Ignored when `fill_method != "epsilon"`. |
-| freqai.label_weighting.fill_sigma_candles | 3.0 | float >= 0.5 | Gaussian standard deviation in candles for `fill_method == "gaussian"`. Lower bound 0.5 prevents underflow that silently degrades to `zero` mode. Ignored when `fill_method != "gaussian"`. |
+| freqai.label_weighting.fill_sigma_candles | 3.0 | float >= 0.5 | Gaussian standard deviation in candles for `fill_method == "gaussian"`. Acts as the upper bound on per-pivot sigma when `fill_bandwidth == "knn"`. Lower bound 0.5 prevents severe underflow in the Gaussian tail. Ignored when `fill_method != "gaussian"`. |
+| freqai.label_weighting.fill_sigma_min_candles | 0.5 | float >= 0.5 | Lower bound on per-pivot sigma in candles when `fill_bandwidth == "knn"`. Clipped to `fill_sigma_candles` when larger. Ignored when `fill_method != "gaussian"` or `fill_bandwidth != "knn"`. |
+| freqai.label_weighting.fill_bandwidth | `fixed` | enum {`fixed`,`knn`} | Per-pivot Gaussian bandwidth selector. `fixed` applies a constant `fill_sigma_candles` to every pivot (legacy behavior). `knn` adapts each pivot's sigma to local pivot density via `sigma_p = clip(fill_bandwidth_alpha * d_k(p), fill_sigma_min_candles, fill_sigma_candles)` where `d_k(p)` is the index distance to the `k`-th nearest pivot neighbor (Loftsgaarden & Quesenberry 1965; Silverman 1986, §5.2). Mitigates the crushing of weaker pivots by stronger neighbors in dense clusters. Ignored when `fill_method != "gaussian"`. |
+| freqai.label_weighting.fill_bandwidth_neighbors | 1 | int >= 1 | `k` for the k-nearest-neighbor bandwidth selector. Ignored when `fill_method != "gaussian"` or `fill_bandwidth != "knn"`. |
+| freqai.label_weighting.fill_bandwidth_alpha | 1.0 | float > 0 | Multiplicative factor on the k-th neighbor distance. Smaller values produce sharper, more separated Gaussians; larger values approach the `fixed` behavior. Ignored when `fill_method != "gaussian"` or `fill_bandwidth != "knn"`. |
| _Label pipeline_ | | | |
| freqai.label_pipeline.standardization | `none` | enum {`none`,`zscore`,`robust`,`mmad`,`power_yj`} | Standardization method applied to labels before normalization. `none`=w, `zscore`=(w-μ)/σ, `robust`=(w-median)/(Q₃-Q₁), `mmad`=(w-median)/(MAD·k), `power_yj`=YJ(w). |
| freqai.label_pipeline.robust_quantiles | [0.25, 0.75] | list[float] where 0 <= Q1 < Q3 <= 1 | Quantile range for robust standardization, Q1 and Q3. |
DEFAULTS_LABEL_SMOOTHING,
DEFAULTS_LABEL_WEIGHTING,
EXTREMA_SELECTION_METHODS,
+ FILL_BANDWIDTHS,
FILL_EPSILON_BASELINES,
FILL_METHODS,
NORMALIZATION_TYPES,
"fill_sigma_candles": _ParamSpec(
_NumericValidator(min_value=0.5), output_type=float
),
+ "fill_sigma_min_candles": _ParamSpec(
+ _NumericValidator(min_value=0.5), output_type=float
+ ),
+ "fill_bandwidth": _ParamSpec(_EnumValidator(FILL_BANDWIDTHS)),
+ "fill_bandwidth_neighbors": _ParamSpec(
+ _NumericValidator(min_value=1, require_int=True), output_type=int
+ ),
+ "fill_bandwidth_alpha": _ParamSpec(
+ _NumericValidator(min_value=0, min_exclusive=True), output_type=float
+ ),
}
_PIPELINE_SPECS: Final[dict[str, _ParamSpec]] = {
_GAUSSIAN_FILL_DENSITY_WARN: Final[float] = 0.1
+def _compute_pivot_sigmas(
+ pivot_indices: NDArray[np.floating],
+ sigma_candles: float,
+ bandwidth: str,
+ neighbors: int,
+ alpha: float,
+ sigma_min_candles: float,
+) -> NDArray[np.floating]:
+ """Per-pivot Gaussian standard deviation in candles.
+
+ For ``bandwidth == "fixed"`` returns a scalar broadcast (constant ``sigma_candles``).
+ For ``bandwidth == "knn"`` applies a k-nearest-neighbor bandwidth selector
+ (Loftsgaarden & Quesenberry 1965; Silverman 1986, §5.2):
+
+ sigma_p = clip( alpha * d_k(p), sigma_min_candles, sigma_candles )
+
+ where ``d_k(p)`` is the index distance from pivot ``p`` to its ``k``-th
+ nearest pivot neighbor. Only the ``k`` candidates on either side can contain
+ the ``k``-th nearest neighbor on the 1D candle index.
+ """
+ M = pivot_indices.size
+ if bandwidth == FILL_BANDWIDTHS[0] or M <= 1: # "fixed" or trivial
+ return np.full(M, float(sigma_candles), dtype=float)
+ if bandwidth != FILL_BANDWIDTHS[1]: # "knn"
+ raise ValueError(
+ f"Invalid fill_bandwidth value {bandwidth!r}: "
+ f"supported values are {', '.join(FILL_BANDWIDTHS)}"
+ )
+
+ sorted_idx = np.argsort(pivot_indices, kind="stable")
+ sorted_positions = pivot_indices[sorted_idx]
+ k = min(int(neighbors), M - 1)
+
+ d_k_sorted = np.empty(M, dtype=float)
+ for i, position in enumerate(sorted_positions):
+ left = max(0, i - k)
+ right = min(M, i + k + 1)
+ candidate_distances = np.abs(
+ np.concatenate(
+ (
+ sorted_positions[left:i] - position,
+ sorted_positions[i + 1 : right] - position,
+ )
+ )
+ )
+ d_k_sorted[i] = np.partition(candidate_distances, k - 1)[k - 1]
+ d_k = np.empty(M, dtype=float)
+ d_k[sorted_idx] = d_k_sorted
+
+ sigmas = float(alpha) * d_k
+ sigma_max = float(sigma_candles)
+ sigma_min = float(sigma_min_candles)
+ if sigma_min > sigma_max:
+ sigma_min = sigma_max
+ return np.clip(sigmas, sigma_min, sigma_max)
+
+
def _gaussian_fill_weights(
n_values: int,
pivot_indices: NDArray[np.integer],
pivot_weights: NDArray[np.floating],
sigma_candles: float,
*,
+ bandwidth: str = FILL_BANDWIDTHS[0],
+ bandwidth_neighbors: int = 1,
+ bandwidth_alpha: float = 1.0,
+ sigma_min_candles: float = 0.5,
logger: Logger | None = None,
) -> NDArray[np.floating]:
"""Per-row max of Gaussian-decayed pivot weights.
- Out[i] = max over p of ``w_p * exp(-(i - p)**2 / (2 * sigma**2))``.
- With clustered pivots within ``~sigma_candles``, the per-row max
- lets a stronger neighbor dominate weaker ones; pick
- ``sigma_candles <= label_period_candles / 2`` to preserve pivot
- identity.
+ Out[i] = max over p of ``w_p * exp(-(i - p)**2 / (2 * sigma_p**2))``.
+
+ With ``bandwidth == "fixed"``, ``sigma_p == sigma_candles`` for every
+ pivot. Clustered pivots within ``~sigma_candles`` then let the strongest
+ neighbor dominate weaker ones in the per-row max ("crushing" effect):
+ pick ``sigma_candles <= label_period_candles / 2`` to mitigate.
+
+ With ``bandwidth == "knn"``, ``sigma_p`` contracts to ``alpha * d_k(p)``
+ (clipped to ``[sigma_min_candles, sigma_candles]``) so neighboring
+ Gaussians overlap less in dense regions, mitigating the crushing effect
+ while preserving the upper bound ``Out[i] <= max_p w_p``.
"""
if sigma_candles < 0.5:
raise ValueError(
)
pivot_indices_array = pivot_indices.astype(float)
pivot_weights_row = pivot_weights.astype(float)[np.newaxis, :]
- inv_two_sigma_sq = 0.5 / (sigma_candles * sigma_candles)
+ pivot_sigmas = _compute_pivot_sigmas(
+ pivot_indices=pivot_indices_array,
+ sigma_candles=sigma_candles,
+ bandwidth=bandwidth,
+ neighbors=bandwidth_neighbors,
+ alpha=bandwidth_alpha,
+ sigma_min_candles=sigma_min_candles,
+ )
+ inv_two_sigma_sq_row = (0.5 / (pivot_sigmas * pivot_sigmas))[np.newaxis, :]
M = pivot_indices_array.size
if (
logger is not None
chunk = max(1, _GAUSSIAN_FILL_CHUNK_BUDGET // max(M, 1))
if logger is not None and chunk < n_values:
logger.debug(
- "gaussian_fill: N=%d, M=%d, chunk=%d, ~%.0f MB peak buffer",
+ "gaussian_fill: N=%d, M=%d, chunk=%d, ~%.0f MB peak buffer, "
+ "bandwidth=%s, sigma=[%.2f, %.2f]",
n_values,
M,
chunk,
chunk * M * 8 / 1e6,
+ bandwidth,
+ float(pivot_sigmas.min()),
+ float(pivot_sigmas.max()),
)
out = np.zeros(n_values, dtype=float)
for start in range(0, n_values, chunk):
positions = np.arange(start, stop, dtype=float)
buf = positions[:, np.newaxis] - pivot_indices_array[np.newaxis, :]
np.multiply(buf, buf, out=buf)
- np.multiply(buf, -inv_two_sigma_sq, out=buf)
+ np.multiply(buf, -inv_two_sigma_sq_row, out=buf)
np.exp(buf, out=buf)
np.multiply(buf, pivot_weights_row, out=buf)
np.max(buf, axis=1, out=out[start:stop])
pivot_indices=indices_array[valid_mask],
pivot_weights=weights[valid_mask],
sigma_candles=label_weighting["fill_sigma_candles"],
+ bandwidth=label_weighting["fill_bandwidth"],
+ bandwidth_neighbors=label_weighting["fill_bandwidth_neighbors"],
+ bandwidth_alpha=label_weighting["fill_bandwidth_alpha"],
+ sigma_min_candles=label_weighting["fill_sigma_min_candles"],
logger=logger,
)
else: