| freqai.label_pipeline.gamma | 1.0 | float (0,10] | Contrast exponent applied to labels after normalization: >1 emphasizes extrema, values between 0 and 1 soften. |
| _Feature parameters_ | | | |
| freqai.feature_parameters.label_period_candles | min/max midpoint | int >= 1 | Zigzag labeling NATR horizon. |
-| freqai.feature_parameters.label_horizon_candles | `label_period_candles` | int >= 1 | Number of candles after a label row before the label is considered known by causal split guards. Recommended: cover the label generator's lookahead (zigzag pivot confirmation lag plus any smoothing kernel half-width). Used by causal split guards and `<label>_known_at_index` metadata. When unset, falls back to `label_period_candles`. |
+| freqai.feature_parameters.label_horizon_candles | `label_period_candles` | int >= 1 | Number of candles after a label row before the label is considered known by causal split guards. Recommended: cover the zigzag pivot confirmation lag (the smoothing kernel half-width is added automatically by `set_freqai_targets`). Used by causal split guards and `<label>_known_at_index` metadata. When unset, falls back to `label_period_candles`. |
| freqai.feature_parameters.causal_mode | true | bool | Causal split guard toggle. When `true` (default): rejects `data_split_parameters.shuffle=true`, `shuffle_after_split=true`, `reverse_train_test_order=true`; for `timeseries_split` auto-sets `gap=label_horizon_candles` when unset/`0` (rejects explicit `gap<label_horizon_candles`); for `train_test_split` drops train rows where position `>=first_test_position-label_horizon_candles`; with `<label>_known_at_index` columns, additionally drops rows where row-wise max `>=first_test_position`. `false` is deprecated; acausal baselines only. |
| freqai.feature_parameters.min_label_period_candles | 12 | int >= 1 | Minimum labeling NATR horizon used for reversals labeling HPO. |
| freqai.feature_parameters.max_label_period_candles | 24 | int >= 1 | Maximum labeling NATR horizon used for reversals labeling HPO. |
| freqai.feature_parameters.label_p_order | None | float \| None | Lp exponent for parameterized metrics. Used by `minkowski` distance (default 2.0) and `power_mean` aggregation (default 1.0). Ignored by other metrics. |
| freqai.feature_parameters.label_method | `compromise_programming` | enum {`compromise_programming`,`topsis`,`kmeans`,`kmeans2`,`kmedoids`,`knn`,`medoid`} | HPO `label` Pareto front trial selection method. |
| freqai.feature_parameters.label_distance_metric | `euclidean` | enum {`euclidean`,`minkowski`,`chebyshev`,`cityblock`,`sqeuclidean`,`seuclidean`,`mahalanobis`,`harmonic_mean`,`geometric_mean`,`arithmetic_mean`,`quadratic_mean`,`cubic_mean`,`power_mean`,`weighted_sum`} | Distance metric for `compromise_programming` and `topsis` methods. Invalid values warn and fall back to `euclidean`. |
-| freqai.feature_parameters.label_cluster_metric | `euclidean` | enum {`euclidean`,`minkowski`,`chebyshev`,`cityblock`,`sqeuclidean`,`seuclidean`,`mahalanobis`,`harmonic_mean`,`geometric_mean`,`arithmetic_mean`,`quadratic_mean`,`cubic_mean`,`power_mean`,`weighted_sum`} | Distance metric for `kmeans`, `kmeans2`, and `kmedoids` methods. Invalid values warn and fall back to `euclidean`. |
+| freqai.feature_parameters.label_cluster_metric | `euclidean` | enum {`euclidean`,`minkowski`,`chebyshev`,`cityblock`,`sqeuclidean`,`seuclidean`,`mahalanobis`} | Distance metric for `kmeans`, `kmeans2`, and `kmedoids` methods. Invalid values warn and fall back to `euclidean`. |
| freqai.feature_parameters.label_cluster_selection_method | `topsis` | enum {`compromise_programming`,`topsis`} | Cluster selection method for clustering-based label methods. |
| freqai.feature_parameters.label_cluster_trial_selection_method | `topsis` | enum {`compromise_programming`,`topsis`} | Best cluster trial selection method for clustering-based label methods. |
-| freqai.feature_parameters.label_density_metric | method-dependent | enum {`euclidean`,`minkowski`,`chebyshev`,`cityblock`,`sqeuclidean`,`seuclidean`,`mahalanobis`,`harmonic_mean`,`geometric_mean`,`arithmetic_mean`,`quadratic_mean`,`cubic_mean`,`power_mean`,`weighted_sum`} | Distance metric for `knn` and `medoid` methods. Invalid values warn and fall back to the method's natural default (`minkowski` for `knn`, `euclidean` for `medoid`). |
+| freqai.feature_parameters.label_density_metric | method-dependent | enum {`euclidean`,`minkowski`,`chebyshev`,`cityblock`,`sqeuclidean`,`seuclidean`,`mahalanobis`} | Distance metric for `knn` and `medoid` methods. Invalid values warn and fall back to the method's natural default (`minkowski` for `knn`, `euclidean` for `medoid`). |
| freqai.feature_parameters.label_density_aggregation | `power_mean` | enum {`power_mean`,`quantile`,`min`,`max`} | Aggregation method for KNN neighbor distances. |
| freqai.feature_parameters.label_density_n_neighbors | 5 | int >= 1 | Number of neighbors for KNN. |
| freqai.feature_parameters.label_density_aggregation_param | aggregation-dependent | float \| None | Tunable for KNN neighbor distance aggregation: Lp exponent (`power_mean`) or quantile value (`quantile`). |
import copy
import logging
+import math
import random
import time
import warnings
LabelWeightSupportError,
REGRESSORS,
Regressor,
+ WEIGHT_STRATEGIES,
_OPTUNA_NAMESPACES,
+ _OPTUNA_LABEL_SELECTION_SCHEMA_VERSION,
OptunaNamespace,
compose_sample_weights,
ensure_datetime_series,
"shellinger",
)
- _OPTUNA_LABEL_SELECTION_SCHEMA_VERSION: Final[int] = 1
- """Version of the label-namespace Optuna best-trial selection algorithm.
-
- Incremented on any change to tie-break, normalization, distance-metric
- whitelist, or selection metadata. Independent of
- ``Utils._OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION``.
- """
-
# Absolute tolerance (rtol=0) for constant-column detection in
# `_non_constant_objective_indices`; valid on the [0,1]-normalized
# output of `_normalize_objective_values`.
def _power_mean_metrics_set() -> set[str]:
return set(QuickAdapterRegressorV3._POWER_MEAN_MAP.keys())
+ @staticmethod
+ @lru_cache(maxsize=None)
+ def _aggregate_distance_metrics_set() -> set[str]:
+ """Aggregate metrics: distance metrics computed by reduction over
+ objective coordinates rather than SciPy/sklearn pairwise routines.
+
+ Computed as the complement of SciPy and probability metrics over
+ ``_DISTANCE_METRICS``: ``harmonic_mean``, ``geometric_mean``,
+ ``arithmetic_mean``, ``quadratic_mean``, ``cubic_mean``,
+ ``power_mean``, ``weighted_sum``. Accepted by
+ ``compromise_programming``/``topsis`` via
+ ``_calculate_trial_distance_to_ideal``; rejected by
+ cluster/density categories that route to
+ ``pairwise_distances``/``KMeans``/``KMedoids``/``NearestNeighbors``.
+ """
+ return (
+ QuickAdapterRegressorV3._distance_metrics_set()
+ - QuickAdapterRegressorV3._scipy_metrics_set()
+ - QuickAdapterRegressorV3._probability_distance_metrics_set()
+ )
+
+ @staticmethod
+ @lru_cache(maxsize=None)
+ def _cluster_density_distance_metrics_set() -> set[str]:
+ """SciPy-compatible non-probability metrics.
+
+ Accepted by cluster/density categories that route to
+ ``pairwise_distances``/``KMeans``/``KMedoids``/``NearestNeighbors``;
+ rejected by the aggregate set and by probability metrics.
+ """
+ return (
+ QuickAdapterRegressorV3._scipy_metrics_set()
+ - QuickAdapterRegressorV3._probability_distance_metrics_set()
+ )
+
@staticmethod
def _coerce_int(value: Any, name: str, *, minimum: int) -> int:
if isinstance(value, bool) or not isinstance(value, int) or value < minimum:
*,
context: str,
) -> NDArray[np.floating]:
- if label_weights is None:
- return compose_sample_weights(base_weights, None, logger=logger)
-
policy = cast(
LabelWeightSupportPolicy, label_weighting_config["support_policy"]
)
+ if label_weights is None:
+ # Non-"none" label-weighting strategy with no available label
+ # weights (zigzag produced zero pivots): the support policy
+ # governs the contract -- ``raise`` raises, ``fallback``
+ # warns. A direct return to base weights would bypass the
+ # policy silently.
+ strategy = label_weighting_config.get("strategy", WEIGHT_STRATEGIES[0])
+ if strategy != WEIGHT_STRATEGIES[0]: # "none"
+ return QuickAdapterRegressorV3._apply_support_policy(
+ base_weights,
+ context=context,
+ policy=policy,
+ reasons=[
+ f"label_weighting.strategy={strategy!r} configured but "
+ f"no label weights available (no pivots detected)"
+ ],
+ )
+ return compose_sample_weights(base_weights, None, logger=logger)
+
try:
composed = compose_sample_weights(
base_weights, label_weights, logger=logger
*,
ctx: str,
default: str,
+ aggregate_allowed: bool,
mode: ValidationMode = "warn",
) -> str:
- valid_metrics = QuickAdapterRegressorV3._label_selection_distance_metrics_set()
+ if aggregate_allowed:
+ valid_metrics = (
+ QuickAdapterRegressorV3._label_selection_distance_metrics_set()
+ )
+ else:
+ # Cluster/density paths route the metric to SciPy/sklearn APIs
+ # (pairwise_distances, KMeans, KMedoids, NearestNeighbors) which
+ # reject `_aggregate_distance_metrics_set()`; restrict the
+ # valid set to SciPy-compatible non-probability metrics.
+ valid_metrics = (
+ QuickAdapterRegressorV3._cluster_density_distance_metrics_set()
+ )
valid_options = tuple(
candidate
for candidate in QuickAdapterRegressorV3._DISTANCE_METRICS
ctx="label_distance_metric",
mode="warn",
default=QuickAdapterRegressorV3.LABEL_DISTANCE_METRIC_DEFAULT,
+ aggregate_allowed=True,
)
config["distance_metric"] = distance_metric
elif category == "cluster":
ctx="label_cluster_metric",
mode="warn",
default=QuickAdapterRegressorV3.LABEL_CLUSTER_METRIC_DEFAULT,
+ aggregate_allowed=False,
)
config["distance_metric"] = distance_metric
ctx="label_density_metric",
mode="warn",
default=density_metric_default,
+ aggregate_allowed=False,
)
config["distance_metric"] = distance_metric
label_method = self.ft_params.get(
"label_method", QuickAdapterRegressorV3.LABEL_METHOD_DEFAULT
)
+ label_weights = self.ft_params.get("label_weights")
+ label_p_order = self.ft_params.get("label_p_order")
+ if label_weights is not None and not all(
+ math.isfinite(float(w)) for w in label_weights
+ ):
+ raise ValueError(
+ f"label_weights contains non-finite values: {label_weights!r}"
+ )
+ if label_p_order is not None and not math.isfinite(float(label_p_order)):
+ raise ValueError(f"label_p_order is non-finite: {label_p_order!r}")
return {
- "schema_version": QuickAdapterRegressorV3._OPTUNA_LABEL_SELECTION_SCHEMA_VERSION,
+ "schema_version": _OPTUNA_LABEL_SELECTION_SCHEMA_VERSION,
"method_config": self._resolve_label_method_config(label_method),
+ "label_weights": (
+ [float(w) for w in label_weights] if label_weights is not None else None
+ ),
+ "label_p_order": (
+ float(label_p_order) if label_p_order is not None else None
+ ),
}
@property
"label_period_candles",
default_label_period_candles,
),
- "label_horizon_candles": get_label_horizon_candles(
- self.ft_params, logger
- ),
"label_natr_multiplier": float(
self.ft_params.get(
"label_natr_multiplier",
label_weights = self.ft_params.get("label_weights")
if (
label_weights is not None
- and objective_indices is not None
and original_n_objectives is not None
+ and objective_indices is not None
and original_n_objectives != n_objectives
):
try:
label_weights_array = np.asarray(label_weights, dtype=float)
except (ValueError, TypeError):
label_weights_array = None
- if (
- label_weights_array is not None
- and label_weights_array.ndim == 1
- and label_weights_array.size == original_n_objectives
- ):
- label_weights = label_weights_array[objective_indices]
- logger.debug(
- "label_weights sliced to non-constant objectives "
- "(indices=%s, original_size=%d, sliced_size=%d)",
- objective_indices.tolist(),
- label_weights_array.size,
- label_weights.size,
- )
+ if label_weights_array is not None and label_weights_array.ndim == 1:
+ if label_weights_array.size != original_n_objectives:
+ raise ValueError(
+ f"Invalid label_weights size {label_weights_array.size}: "
+ f"must match original objective count "
+ f"{original_n_objectives}"
+ )
+ sliced_weights = label_weights_array[objective_indices]
+ if np.all(sliced_weights == 0.0):
+ # All user-positive weights project onto dropped
+ # (constant) objectives; uniform fallback keeps
+ # selection deterministic and avoids
+ # ``_validate_label_weights`` raising on sum-zero.
+ # Negative or non-finite slices flow through to the
+ # validator.
+ logger.warning(
+ "label_weights sliced to non-constant objectives "
+ "is all-zero (indices=%s, original=%s); "
+ "falling back to uniform weights",
+ objective_indices.tolist(),
+ label_weights_array.tolist(),
+ )
+ label_weights = None
+ else:
+ label_weights = sliced_weights
+ logger.debug(
+ "label_weights sliced to non-constant objectives "
+ "(indices=%s, original_size=%d, sliced_size=%d)",
+ objective_indices.tolist(),
+ label_weights_array.size,
+ sliced_weights.size,
+ )
weights = QuickAdapterRegressorV3._validate_label_weights(
label_weights,
n_objectives,
if isinstance(existing_selection_metadata, dict)
else None
)
- target_version = (
- QuickAdapterRegressorV3._OPTUNA_LABEL_SELECTION_SCHEMA_VERSION
- )
- if existing_schema_version is None:
- logger.info(
- f"[{pair}] Optuna {namespace} study {study_name}: "
- f"selection schema (none -> v{target_version}); "
- f"{len(existing_study.trials)} trial(s) preserved"
+ target_version = _OPTUNA_LABEL_SELECTION_SCHEMA_VERSION
+ if (
+ isinstance(existing_schema_version, bool)
+ or not isinstance(existing_schema_version, (int, np.integer))
+ or existing_schema_version != target_version
+ ):
+ version_repr = (
+ "none"
+ if existing_schema_version is None
+ else f"v{existing_schema_version}"
)
- elif existing_schema_version != target_version:
logger.warning(
f"[{pair}] Optuna {namespace} study {study_name}: "
- f"selection schema v{existing_schema_version!r} incompatible "
+ f"selection schema {version_repr} incompatible "
f"with v{target_version}; resetting study"
)
QuickAdapterRegressorV3.optuna_delete_study(
def optuna_load_best_params(
self, pair: str, namespace: OptunaNamespace
) -> Optional[dict[str, Any]]:
- return optuna_load_best_params(self.full_path, pair, namespace, logger)
+ expected = (
+ self._optuna_label_selection_metadata()
+ if namespace == _OPTUNA_NAMESPACES.label
+ else None
+ )
+ return optuna_load_best_params(
+ self.full_path,
+ pair,
+ namespace,
+ logger,
+ expected_selection_metadata=expected,
+ )
@staticmethod
def optuna_delete_study(
nan_average,
non_zero_diff,
optuna_load_best_params,
+ get_smoothing_kernel_half_width,
price_retracement_percent,
safe_divide,
smooth,
"label_period_candles",
default_label_period_candles,
),
- "label_horizon_candles": get_label_horizon_candles(
- feature_parameters, logger
- ),
"label_natr_multiplier": float(
feature_parameters.get(
"label_natr_multiplier",
self._label_params[pair]["label_period_candles"] = label_period_candles
def get_label_horizon_candles(self, pair: str) -> int:
+ period = self.get_label_period_candles(pair)
label_params = self._label_params.get(pair, {})
feature_parameters = self.freqai_info.get("feature_parameters", {})
- return get_label_horizon_candles({**feature_parameters, **label_params}, logger)
+ return get_label_horizon_candles(
+ {**feature_parameters, **label_params, "label_period_candles": period},
+ logger,
+ )
def get_label_natr_multiplier(self, pair: str) -> float:
label_natr_multiplier = self._label_params.get(pair, {}).get(
label_weighting = self.label_weighting
label_smoothing = self.label_smoothing
+ series_length = len(dataframe)
for label_col in LABEL_COLUMNS:
label_params = self.get_label_params(pair, label_col)
- label_data = generate_label_data(dataframe, label_col, label_params)
+ label_data = generate_label_data(dataframe, label_col, label_params, logger)
if len(label_data.indices) == 0:
logger.warning(
0.0,
)
+ # Zero-phase smoothing reads future candles within the kernel
+ # half-width; advance the known-at index so causal split guards
+ # account for the smoothing lookahead.
+ known_at_column = label_known_at_column_name(label_col)
+ if known_at_column in dataframe.columns:
+ kernel_half_width = get_smoothing_kernel_half_width(
+ col_smoothing_config, series_length=series_length
+ )
+ if kernel_half_width > 0:
+ dataframe[known_at_column] = (
+ dataframe[known_at_column] + kernel_half_width
+ )
+
if label_col == EXTREMA_COLUMN:
dataframe[EXTREMA_DIRECTION_SMOOTHED_COLUMN] = dataframe[label_col]
if is_weighting_active:
def optuna_load_best_params(
self, pair: str, namespace: OptunaNamespace
) -> Optional[dict[str, Any]]:
+ # Strategy consumes only output tunables (``label_period_candles``,
+ # ``label_horizon_candles``, ``label_natr_multiplier``);
+ # selection-metadata drift on cached label best-params is
+ # tolerable here. The regressor's ``optuna_load_best_params``
+ # passes ``expected_selection_metadata`` and rejects drift before
+ # re-running HPO selection.
return optuna_load_best_params(self.models_full_path, pair, namespace, logger)
import copy
import functools
import hashlib
+import inspect
import json
import math
import re
NamedTuple,
TypeVar,
assert_never,
+ cast,
)
import numpy as np
context: str = "safe_divide",
logger: Logger | None = None,
) -> Any:
- """Element-wise division with non-finite and near-zero denominator guards.
+ """Element-wise division with non-finite and zero denominator guards.
Replaces results from divisions whose numerator or denominator is non-finite,
- or whose denominator satisfies ``np.isclose(denom, 0.0)`` (default
- ``atol=1e-8``), with ``fallback``. The fallback is also substituted for
- any non-finite division output (e.g. ``inf`` from a subnormal denominator
- that escapes the ``np.isclose`` gate).
+ or whose denominator is exactly ``0.0``, with ``fallback``. Subnormal or
+ satoshi-scale denominators (e.g. ``1e-8`` price quotes) pass through and
+ any resulting non-finite division output (e.g. ``inf``) is then coerced
+ to ``fallback`` by the post-division finite mask.
Returns a ``pd.Series`` indexed on the first Series among the inputs when
shapes align, a Python ``float`` for 0-d results, otherwise an ``ndarray``.
valid_mask = (
np.isfinite(numerator_arr)
& np.isfinite(denominator_arr)
- & ~np.isclose(denominator_arr, 0.0)
+ & (denominator_arr != 0.0)
)
with np.errstate(divide="ignore", invalid="ignore"):
result = np.divide(
known_at_index: pd.Series | None = None
-LabelGenerator = Callable[[pd.DataFrame, dict[str, Any]], LabelData]
+LabelGenerator = Callable[[pd.DataFrame, dict[str, Any], Logger | None], LabelData]
_LABEL_GENERATORS: dict[str, LabelGenerator] = {}
-def register_label_generator(label_column: str, generator: LabelGenerator) -> None:
- _LABEL_GENERATORS[label_column] = generator
+def _adapt_label_generator(
+ generator: Callable[..., LabelData],
+) -> LabelGenerator:
+ """Adapt a label generator to the canonical 3-arg shape.
+
+ Detects the canonical ``(dataframe, params, logger) -> LabelData``
+ shape by a positional parameter named ``logger`` at index 2 (with or
+ without a default). Generators without such a parameter are wrapped
+ to drop the logger argument at dispatch; defaulted positionals after
+ index 1 stay at their defaults. ``*args``, ``**kwargs``, keyword-only
+ ``logger``, fewer than 2 required positionals, more than 3 required
+ positionals, and 3 required positionals whose third name is not
+ ``logger`` raise ``ValueError`` at registration. Inspection runs
+ once at registration; dispatch in ``generate_label_data`` is a
+ direct call.
+ """
+ sig = inspect.signature(generator)
+ params = list(sig.parameters.values())
+ if any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in params):
+ raise ValueError(
+ f"Invalid label generator {generator!r}: ``*args`` is not "
+ f"supported; declare an explicit (dataframe, params) or "
+ f"(dataframe, params, logger) signature"
+ )
+ if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params):
+ raise ValueError(
+ f"Invalid label generator {generator!r}: ``**kwargs`` is not "
+ f"supported; declare an explicit (dataframe, params) or "
+ f"(dataframe, params, logger) signature"
+ )
+ if any(
+ p.kind == inspect.Parameter.KEYWORD_ONLY and p.name == "logger" for p in params
+ ):
+ raise ValueError(
+ f"Invalid label generator {generator!r}: keyword-only "
+ f"``logger`` is not supported; declare ``logger`` as the "
+ f"third positional parameter"
+ )
+ positional = [
+ p
+ for p in params
+ if p.kind
+ in (
+ inspect.Parameter.POSITIONAL_ONLY,
+ inspect.Parameter.POSITIONAL_OR_KEYWORD,
+ )
+ ]
+ n_total = len(positional)
+ n_required = sum(1 for p in positional if p.default is inspect.Parameter.empty)
+ if n_required < 2:
+ raise ValueError(
+ f"Invalid label generator {generator!r}: {n_required} "
+ f"required positional parameter(s); expected at least 2 "
+ f"(dataframe, params)"
+ )
+ if n_required > 3:
+ raise ValueError(
+ f"Invalid label generator {generator!r}: {n_required} "
+ f"required positional parameter(s); expected 2 "
+ f"(dataframe, params) or 3 (dataframe, params, logger)"
+ )
+ has_logger_at_third = n_total >= 3 and positional[2].name == "logger"
+ if has_logger_at_third:
+ return cast(LabelGenerator, generator)
+ if n_required == 3:
+ raise ValueError(
+ f"Invalid label generator {generator!r}: third positional "
+ f"parameter is named {positional[2].name!r}, expected "
+ f"``logger``"
+ )
+
+ @functools.wraps(generator)
+ def adapted(
+ dataframe: pd.DataFrame,
+ params: dict[str, Any],
+ logger: Logger | None = None,
+ ) -> LabelData:
+ return generator(dataframe, params)
+
+ return adapted
+
+
+def register_label_generator(
+ label_column: str,
+ generator: Callable[..., LabelData],
+) -> None:
+ _LABEL_GENERATORS[label_column] = _adapt_label_generator(generator)
def _generate_extrema_label(
dataframe: pd.DataFrame,
params: dict[str, Any],
+ logger: Logger | None = None,
) -> LabelData:
natr_period = params.get("natr_period", 14)
natr_multiplier = params.get("natr_multiplier", 9.0)
dataframe: pd.DataFrame,
label_column: str,
params: dict[str, Any],
+ logger: Logger | None = None,
) -> LabelData:
generator = _LABEL_GENERATORS.get(label_column)
if generator is None:
f"No label generator registered for column '{label_column}'. "
f"Available columns: {list(_LABEL_GENERATORS.keys())}"
)
- return generator(dataframe, params)
+ return generator(dataframe, params, logger)
SmoothingKernel = Literal["gaussian", "kaiser", "kaiser_bessel_derived", "triang"]
-SMOOTHING_KERNELS: Final[tuple[SmoothingKernel, ...]] = (
+SMOOTHING_KERNELS: Final[tuple[SmoothingKernel, ...]] = SMOOTHING_METHODS[1:5]
+assert SMOOTHING_KERNELS == (
"gaussian",
"kaiser",
"kaiser_bessel_derived",
"triang",
+), (
+ f"SMOOTHING_KERNELS slice drift: {SMOOTHING_KERNELS}; "
+ "the SmoothingKernel Literal and SMOOTHING_METHODS[1:5] must agree"
)
+
+def get_smoothing_kernel_half_width(
+ config: dict[str, Any],
+ *,
+ series_length: int,
+) -> int:
+ """Half-width (in candles) of the smoothing kernel's lookahead.
+
+ Equals the lookahead applied to ``known_at_index`` after smoothing.
+ Mirrors ``smooth()`` window normalization and short-series gating
+ via shared primitives (``get_odd_window``, ``get_even_window``,
+ ``get_savgol_params``).
+
+ For zero-phase ``filtfilt``-routed kernels (members of
+ ``SMOOTHING_KERNELS``) the lookahead equals ``effective_window - 1``
+ because each forward+backward pass extends the dependency window to
+ the full filter length on both sides. For ``smm``/``sma``/``savgol``
+ (single-pass centered windows) the half-width equals
+ ``effective_window // 2``. For ``gaussian_filter1d`` the lookahead
+ matches ``scipy.ndimage`` default truncation at
+ ``int(4.0 * sigma + 0.5)`` (scipy's ``round`` form). Returns 0 for
+ ``method == "none"``, for ``series_length < max(window_candles, 3)``
+ (``smooth()`` top-level no-op), and for the filtfilt/savgol routes
+ when ``series_length < effective_window`` (downstream short-series
+ no-op in ``zero_phase_filter`` / ``savgol_filter``).
+ """
+ method = config.get("method", SMOOTHING_METHODS[0])
+ if method == SMOOTHING_METHODS[0]: # "none"
+ return 0
+ raw_window = max(
+ int(config.get("window_candles", DEFAULTS_LABEL_SMOOTHING["window_candles"])),
+ 3,
+ )
+ # ``smooth()`` top-level short-series gate (``if n < window_candles: return series``)
+ if series_length < raw_window:
+ return 0
+ if method == SMOOTHING_METHODS[8]: # "gaussian_filter1d"
+ sigma = max(float(config.get("sigma", DEFAULTS_LABEL_SMOOTHING["sigma"])), 0.0)
+ return int(4.0 * sigma + 0.5)
+ if method == SMOOTHING_METHODS[7]: # "savgol"
+ polyorder = max(
+ int(config.get("polyorder", DEFAULTS_LABEL_SMOOTHING["polyorder"])), 0
+ )
+ effective_window, _, _ = get_savgol_params(raw_window, polyorder, "mirror")
+ elif method == SMOOTHING_METHODS[3]: # "kaiser_bessel_derived"
+ effective_window = get_even_window(raw_window)
+ else:
+ effective_window = get_odd_window(raw_window)
+ # ``zero_phase_filter`` / ``savgol_filter`` short-series gate
+ if (
+ method in SMOOTHING_KERNELS or method == SMOOTHING_METHODS[7]
+ ) and series_length < effective_window:
+ return 0
+ if method in SMOOTHING_KERNELS:
+ return effective_window - 1
+ return effective_window // 2
+
+
TradePriceTarget = Literal[
"moving_average", "quantile_interpolation", "weighted_average"
]
"""
-def _is_unversioned_label_best_params_shape(best_params: Any) -> bool:
- """Detect an unversioned Optuna label best params dict.
+_OPTUNA_LABEL_SELECTION_SCHEMA_VERSION: Final[int] = 2
+"""Version of the label-namespace Optuna best-trial selection algorithm.
- An unversioned dict is a raw best params mapping missing
- ``schema_version``; its field shape matches the inner ``params`` of
- a schema-versioned ``{schema_version, params}`` dict.
- """
- return (
- isinstance(best_params, dict)
- and "schema_version" not in best_params
- and "label_period_candles" in best_params
- and "label_natr_multiplier" in best_params
- )
+Incremented on any change to tie-break, normalization, distance-metric
+whitelist, or selection metadata. Independent of
+``_OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION`` (on-disk JSON layout vs
+selection-algorithm semantics are versioned separately).
+"""
def _validate_optuna_label_best_params(
best_params: Any,
pair: str,
logger: Logger | None,
+ *,
+ expected_selection_metadata: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
- if _is_unversioned_label_best_params_shape(best_params):
- if logger is not None:
- logger.info(
- f"[{pair}] Optuna label best params (no schema_version) "
- f"read as v{_OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION} in-memory."
- )
- best_params = {
- "schema_version": _OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION,
- "params": best_params,
- }
if not isinstance(best_params, dict):
if logger is not None:
logger.warning(f"[{pair}] Ignoring Optuna label best params: not a dict")
f"(expected {_OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION})"
)
return None
+ selection_metadata = best_params.get("selection_metadata")
+ if not isinstance(selection_metadata, dict):
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best params: missing or invalid "
+ f"selection_metadata"
+ )
+ return None
+ selection_schema_version = selection_metadata.get("schema_version")
+ if isinstance(selection_schema_version, bool) or not isinstance(
+ selection_schema_version, (int, np.integer)
+ ):
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best params: invalid "
+ f"selection_metadata.schema_version={selection_schema_version!r} "
+ f"(must be int)"
+ )
+ return None
+ if selection_schema_version != _OPTUNA_LABEL_SELECTION_SCHEMA_VERSION:
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best params: incompatible "
+ f"selection_metadata.schema_version={selection_schema_version!r} "
+ f"(expected {_OPTUNA_LABEL_SELECTION_SCHEMA_VERSION})"
+ )
+ return None
+ if (
+ expected_selection_metadata is not None
+ and selection_metadata != expected_selection_metadata
+ ):
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best params: "
+ f"selection_metadata drift "
+ f"(stored: {selection_metadata!r}, "
+ f"expected: {expected_selection_metadata!r})"
+ )
+ return None
params = best_params.get("params")
if not isinstance(params, dict):
if logger is not None:
pair: str,
namespace: OptunaNamespace,
logger: Logger | None = None,
+ *,
+ expected_selection_metadata: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
best_params_path = (
base_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
with best_params_path.open("r", encoding="utf-8") as read_file:
best_params = json.load(read_file)
if namespace == _OPTUNA_NAMESPACES.label:
- return _validate_optuna_label_best_params(best_params, pair, logger)
+ return _validate_optuna_label_best_params(
+ best_params,
+ pair,
+ logger,
+ expected_selection_metadata=expected_selection_metadata,
+ )
return best_params
return None