| freqai.data_split_parameters.method | `train_test_split` | enum {`train_test_split`,`timeseries_split`} | Data splitting strategy. `train_test_split` for sequential split, `timeseries_split` for chronological split with configurable gap. |
| freqai.data_split_parameters.test_size | 0.1 / None | float (0,1) \| int >= 1 \| None | Test set size. Float for fraction, int for count. Default: 0.1 for `train_test_split`, None for `timeseries_split` (sklearn dynamic sizing). |
| freqai.data_split_parameters.n_splits | 5 | int >= 2 | Controls train/test proportions for `timeseries_split` (higher = larger train set). |
-| freqai.data_split_parameters.gap | 0 | int >= 0 | Samples to exclude between train/test for `timeseries_split`. When 0, auto-calculated from `label_period_candles` to prevent look-ahead bias. |
+| freqai.data_split_parameters.gap | 0 | int >= 0 | Samples to exclude between train/test for `timeseries_split`. When `0` and `causal_mode=true` (default), auto-set from `label_horizon_candles`; when `0` and `causal_mode=false`, auto-set from `label_period_candles`. Under `causal_mode=true`, an explicit `gap<label_horizon_candles` is rejected. |
| freqai.data_split_parameters.max_train_size | None | int >= 1 \| None | Maximum training set size for `timeseries_split`. When set, creates a sliding window instead of expanding train set. None = no limit. |
| _Label smoothing_ | | | |
| freqai.label_smoothing.method | `gaussian` | enum {`none`,`gaussian`,`kaiser`,`kaiser_bessel_derived`,`triang`,`smm`,`sma`,`savgol`,`gaussian_filter1d`} | Label smoothing method (`kaiser_bessel_derived` uses an even-length Kaiser-Bessel-derived zero-phase kernel; `smm`=median, `sma`=mean, `savgol`=Savitzky–Golay). |
| freqai.label_pipeline.gamma | 1.0 | float (0,10] | Contrast exponent applied to labels after normalization: >1 emphasizes extrema, values between 0 and 1 soften. |
| _Feature parameters_ | | | |
| freqai.feature_parameters.label_period_candles | min/max midpoint | int >= 1 | Zigzag labeling NATR horizon. |
+| freqai.feature_parameters.label_horizon_candles | `label_period_candles` | int >= 1 | Number of candles after a label row before the label is considered known by causal split guards. Recommended: cover the label generator's lookahead (zigzag pivot confirmation lag plus any smoothing kernel half-width). Used by causal split guards and `<label>_known_at_index` metadata. When unset, falls back to `label_period_candles`. |
+| freqai.feature_parameters.causal_mode | true | bool | Causal split guard toggle. When `true` (default): rejects `data_split_parameters.shuffle=true`, `shuffle_after_split=true`, `reverse_train_test_order=true`; for `timeseries_split` auto-sets `gap=label_horizon_candles` when unset/`0` (rejects explicit `gap<label_horizon_candles`); for `train_test_split` drops train rows where position `>=first_test_position-label_horizon_candles`; with `<label>_known_at_index` columns, additionally drops rows where row-wise max `>=first_test_position`. `false` is deprecated; acausal baselines only. |
| freqai.feature_parameters.min_label_period_candles | 12 | int >= 1 | Minimum labeling NATR horizon used for reversals labeling HPO. |
| freqai.feature_parameters.max_label_period_candles | 24 | int >= 1 | Maximum labeling NATR horizon used for reversals labeling HPO. |
| freqai.feature_parameters.label_natr_multiplier | min/max midpoint | float > 0 | Zigzag labeling NATR multiplier. |
"feature_parameters": {
"include_corr_pairlist": ["BTC/USDT", "ETH/USDT"],
"include_timeframes": ["5m", "15m", "1h", "4h"],
+ "causal_mode": true,
"label_period_candles": 18,
+ "label_horizon_candles": 18,
"label_natr_multiplier": 10.5,
"label_method": "topsis",
"label_weights": [0.5, 5.0, 3.0, 1.0, 0.5, 5.0, 3.0],
fit_regressor,
format_dict,
format_number,
+ get_causal_mode,
get_label_defaults,
+ get_label_horizon_candles,
get_label_pipeline_config,
get_label_prediction_config,
get_min_max_label_period_candles,
get_optuna_study_model_parameters,
+ label_known_at_column_name,
label_weight_column_name,
migrate_config,
optuna_load_best_params,
DensityMethod = Literal["knn", "medoid"]
SelectionMethod = Union[DistanceMethod, ClusterMethod, DensityMethod]
ValidationMode = Literal["warn", "raise", "none"]
-SplitFn = Callable[[pd.DataFrame, pd.DataFrame, NDArray[np.floating]], dict[str, Any]]
+SplitFn = Callable[
+ [pd.DataFrame, pd.DataFrame, NDArray[np.floating], pd.DataFrame], dict[str, Any]
+]
warnings.simplefilter(action="ignore", category=FutureWarning)
logger = logging.getLogger(__name__)
+_KNOWN_AT_NONE_LOGGED: set[tuple[str, str]] = set()
+
+
+def _log_known_at_none_once(pair: str, context: str) -> None:
+ key = (pair, context)
+ if key in _KNOWN_AT_NONE_LOGGED:
+ return
+ _KNOWN_AT_NONE_LOGGED.add(key)
+ logger.info(
+ f"[{pair}] {context}: no <label>_known_at_index column present; "
+ "causal guards use position-based purge only (label-aware filtering disabled)"
+ )
+
class QuickAdapterRegressorV3(BaseRegressionModel):
"""
https://github.com/sponsors/robcaulk
"""
- version = "3.11.13"
+ version = "3.12.0"
_TEST_SIZE: Final[float] = 0.1
return None
return QuickAdapterRegressorV3._coerce_int(value, name, minimum=minimum)
+ @staticmethod
+ def _validate_index_alignment(
+ filtered_dataframe: pd.DataFrame,
+ unfiltered_df: pd.DataFrame,
+ ) -> None:
+ if not unfiltered_df.index.is_unique:
+ raise ValueError("unfiltered_df.index must be unique for causal split guards")
+ if not filtered_dataframe.index.isin(unfiltered_df.index).all():
+ raise ValueError(
+ "filtered_dataframe.index must be a subset of unfiltered_df.index"
+ )
+
+ @staticmethod
+ def _row_positions(
+ filtered_dataframe: pd.DataFrame,
+ unfiltered_df: pd.DataFrame,
+ ) -> pd.Series:
+ QuickAdapterRegressorV3._validate_index_alignment(
+ filtered_dataframe, unfiltered_df
+ )
+ positions = pd.Series(np.arange(len(unfiltered_df), dtype=np.int64), index=unfiltered_df.index)
+ return positions.loc[filtered_dataframe.index]
+
+ @staticmethod
+ def _known_at_index(
+ filtered_dataframe: pd.DataFrame,
+ unfiltered_df: pd.DataFrame,
+ ) -> pd.Series | None:
+ """Per-row leak boundary across all registered labels.
+
+ Returns the row-wise ``max`` of every present
+ ``<label>_known_at_index`` column. A label whose column is missing
+ or contains any NaN is skipped (silently — labels can opt in by
+ emitting the column). Returns ``None`` only when no label exposes
+ a usable column, in which case the caller falls back to the
+ position-based purge.
+ """
+ QuickAdapterRegressorV3._validate_index_alignment(
+ filtered_dataframe, unfiltered_df
+ )
+ series_list: list[pd.Series] = []
+ for label_col in LABEL_COLUMNS:
+ known_at_col = label_known_at_column_name(label_col)
+ if known_at_col not in unfiltered_df.columns:
+ continue
+ known_at = unfiltered_df.loc[filtered_dataframe.index, known_at_col]
+ if known_at.isna().any():
+ continue
+ series_list.append(pd.to_numeric(known_at, errors="raise"))
+ if not series_list:
+ return None
+ if len(series_list) == 1:
+ return series_list[0]
+ return pd.concat(series_list, axis=1).max(axis=1).astype(np.int64)
+
+ @staticmethod
+ def _filter_train_by_mask(
+ train_features: pd.DataFrame,
+ train_labels: pd.DataFrame,
+ train_weights: NDArray[np.floating],
+ keep_mask: NDArray[np.bool_],
+ context: str,
+ ) -> tuple[pd.DataFrame, pd.DataFrame, NDArray[np.floating]]:
+ removed = int((~keep_mask).sum())
+ if removed:
+ logger.info(f"{context}: removed {removed} causal-unsafe train rows")
+ if not keep_mask.any():
+ raise ValueError(f"{context}: causal guard removed all train rows")
+ return (
+ train_features.loc[keep_mask],
+ train_labels.loc[keep_mask],
+ train_weights[keep_mask],
+ )
+
@staticmethod
def _get_selection_category(method: str) -> Optional[str]:
for (
def _label_defaults(self) -> tuple[int, float]:
return get_label_defaults(self.ft_params, logger)
+ @property
+ def _causal_mode(self) -> bool:
+ return get_causal_mode(self.ft_params, logger)
+
+ def _label_horizon_candles(self, pair: str | None = None) -> int:
+ label_params = self.get_optuna_params(pair, "label") if pair else {}
+ return get_label_horizon_candles({**self.ft_params, **label_params}, logger)
+
@property
def _optuna_label_candle_pool_full(self) -> list[int]:
label_frequency_candles = self._label_frequency_candles
"label_period_candles",
default_label_period_candles,
),
+ "label_horizon_candles": get_label_horizon_candles(
+ self.ft_params, logger
+ ),
"label_natr_multiplier": float(
self.ft_params.get(
"label_natr_multiplier",
features: pd.DataFrame,
labels: pd.DataFrame,
weights: NDArray[np.floating],
+ unfiltered: pd.DataFrame,
) -> dict[str, Any]:
- return split_builder(features, labels, weights, dk)
+ return split_builder(features, labels, weights, dk, unfiltered)
logger.info(f"Using data split method: {method}")
return self._train_common(unfiltered_df, pair, dk, split_fn, **kwargs)
labels: pd.DataFrame,
weights: NDArray[np.floating],
dk: FreqaiDataKitchen,
+ unfiltered_df: pd.DataFrame,
) -> dict[str, Any]:
"""Train/test split via sklearn's ``train_test_split``.
dsp = dict(self.data_split_parameters)
dsp.setdefault("shuffle", False)
dsp.setdefault("test_size", QuickAdapterRegressorV3._TEST_SIZE)
+ causal_mode = self._causal_mode
+ if causal_mode and dsp.get("shuffle", False):
+ raise ValueError(
+ "feature_parameters.causal_mode=True is incompatible with "
+ "data_split_parameters.shuffle=True"
+ )
+ if causal_mode and feat_dict.get("shuffle_after_split", False):
+ raise ValueError(
+ "feature_parameters.causal_mode=True is incompatible with "
+ "feature_parameters.shuffle_after_split=True"
+ )
+ if causal_mode and feat_dict.get("reverse_train_test_order", False):
+ raise ValueError(
+ "feature_parameters.causal_mode=True is incompatible with "
+ "feature_parameters.reverse_train_test_order=True"
+ )
sklearn_kwargs = {
k: v
for k, v in dsp.items()
train_weights,
test_weights,
) = train_test_split(features, labels, weights, **sklearn_kwargs)
+ if causal_mode:
+ row_positions = QuickAdapterRegressorV3._row_positions(
+ features, unfiltered_df
+ )
+ first_test_position = int(row_positions.loc[test_features.index].min())
+ label_horizon_candles = self._label_horizon_candles(dk.pair)
+ train_positions = row_positions.loc[train_features.index]
+ keep_mask = (
+ train_positions.to_numpy(dtype=np.int64)
+ < first_test_position - label_horizon_candles
+ )
+ known_at_index = QuickAdapterRegressorV3._known_at_index(
+ features, unfiltered_df
+ )
+ if known_at_index is not None:
+ known_at_train = known_at_index.loc[train_features.index]
+ keep_mask &= (
+ known_at_train.to_numpy(dtype=np.int64) < first_test_position
+ )
+ else:
+ _log_known_at_none_once(
+ dk.pair, "train_test_split causal guard"
+ )
+ train_features, train_labels, train_weights = (
+ QuickAdapterRegressorV3._filter_train_by_mask(
+ train_features,
+ train_labels,
+ train_weights,
+ keep_mask,
+ f"[{dk.pair}] train_test_split causal guard",
+ )
+ )
else:
train_features = features
train_labels = labels
f"-------------------- Training on data from {start_date} to "
f"{end_date} --------------------"
)
- dd = split_fn(features_filtered, labels_filtered, weights)
+ dd = split_fn(features_filtered, labels_filtered, weights, unfiltered_df)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
dd = self._apply_pipelines(dd, dk, pair)
labels: pd.DataFrame,
weights: NDArray[np.floating],
dk: FreqaiDataKitchen,
+ unfiltered_df: pd.DataFrame,
) -> dict:
"""Chronological train/test split using sklearn's TimeSeriesSplit final fold.
``test_idx``.
"""
feat_dict = self.ft_params
+ causal_mode = self._causal_mode
if feat_dict.get("shuffle_after_split", False):
raise ValueError(
"feature_parameters.shuffle_after_split=True is incompatible "
"with data_split_parameters.method='timeseries_split': "
"chronological split must preserve temporal ordering"
)
+ if causal_mode and self.data_split_parameters.get("shuffle", False):
+ raise ValueError(
+ "feature_parameters.causal_mode=True is incompatible with "
+ "data_split_parameters.shuffle=True"
+ )
+ if causal_mode and feat_dict.get("reverse_train_test_order", False):
+ raise ValueError(
+ "feature_parameters.causal_mode=True is incompatible with "
+ "feature_parameters.reverse_train_test_order=True"
+ )
n_splits = QuickAdapterRegressorV3._coerce_int(
self.data_split_parameters.get(
"n_splits", QuickAdapterRegressorV3.TIMESERIES_N_SPLITS_DEFAULT
"n_splits",
minimum=2,
)
+ raw_gap = self.data_split_parameters.get("gap", None)
gap = QuickAdapterRegressorV3._coerce_int(
- self.data_split_parameters.get(
- "gap", QuickAdapterRegressorV3.TIMESERIES_GAP_DEFAULT
- ),
+ raw_gap
+ if raw_gap is not None
+ else QuickAdapterRegressorV3.TIMESERIES_GAP_DEFAULT,
"gap",
minimum=0,
)
f"Increase test_size or provide more data."
)
- if gap == 0:
+ if causal_mode:
+ label_horizon_candles = self._label_horizon_candles(dk.pair)
+ if raw_gap is None or gap == 0:
+ gap = label_horizon_candles
+ logger.info(
+ f"[{dk.pair}] TimeSeriesSplit gap auto-set from label_horizon_candles: {gap}"
+ )
+ elif gap < label_horizon_candles:
+ raise ValueError(
+ f"data_split_parameters.gap={gap!r} is smaller than "
+ f"label_horizon_candles={label_horizon_candles!r} while "
+ "feature_parameters.causal_mode=True"
+ )
+ elif gap == 0:
gap = self.get_optuna_params(
dk.pair,
QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1], # "label"
).get("label_period_candles")
logger.info(
- f"[{dk.pair}] TimeSeriesSplit gap auto-calculated from label_period_candles: {gap}"
+ f"[{dk.pair}] TimeSeriesSplit gap auto-set from label_period_candles: {gap}"
)
tscv = TimeSeriesSplit(
test_features = filtered_dataframe.iloc[test_idx]
train_labels = labels.iloc[train_idx]
test_labels = labels.iloc[test_idx]
- train_weights = sanitize_and_renormalize(
- weights[train_idx], logger=logger, context="timeseries_split:train"
- )
+ train_weights = weights[train_idx]
test_weights = sanitize_and_renormalize(
weights[test_idx], logger=logger, context="timeseries_split:test"
)
+ if causal_mode:
+ row_positions = QuickAdapterRegressorV3._row_positions(
+ filtered_dataframe, unfiltered_df
+ )
+ first_test_position = int(row_positions.iloc[test_idx].min())
+ known_at_index = QuickAdapterRegressorV3._known_at_index(
+ filtered_dataframe, unfiltered_df
+ )
+ if known_at_index is not None:
+ known_at_train = known_at_index.iloc[train_idx]
+ keep_mask = known_at_train.to_numpy(dtype=np.int64) < first_test_position
+ train_features, train_labels, train_weights = (
+ QuickAdapterRegressorV3._filter_train_by_mask(
+ train_features,
+ train_labels,
+ train_weights,
+ keep_mask,
+ f"[{dk.pair}] timeseries_split causal guard",
+ )
+ )
+ else:
+ _log_known_at_none_once(
+ dk.pair, "timeseries_split causal guard"
+ )
+
+ train_weights = sanitize_and_renormalize(
+ train_weights, logger=logger, context="timeseries_split:train"
+ )
+
if feat_dict.get("reverse_train_test_order", False):
return dk.build_data_dictionary(
test_features,
def optuna_load_best_params(
self, pair: str, namespace: OptunaNamespace
) -> Optional[dict[str, Any]]:
- return optuna_load_best_params(self.full_path, pair, namespace)
+ return optuna_load_best_params(self.full_path, pair, namespace, logger)
@staticmethod
def optuna_delete_study(
get_callable_sha256,
get_distance,
get_label_defaults,
+ get_label_horizon_candles,
get_label_smoothing_config,
get_label_weighting_config,
get_zl_ma_fn,
+ label_known_at_column_name,
label_weight_column_name,
migrate_config,
nan_average,
)
self._label_params: dict[str, dict[str, Any]] = {}
for pair in self.pairs:
+ label_best_params = self.optuna_load_best_params(pair, "label")
self._label_params[pair] = (
- self.optuna_load_best_params(pair, "label")
- if self.optuna_load_best_params(pair, "label")
+ label_best_params
+ if label_best_params
else {
"label_period_candles": feature_parameters.get(
"label_period_candles",
default_label_period_candles,
),
+ "label_horizon_candles": get_label_horizon_candles(
+ feature_parameters, logger
+ ),
"label_natr_multiplier": float(
feature_parameters.get(
"label_natr_multiplier",
if isinstance(label_period_candles, int):
self._label_params[pair]["label_period_candles"] = label_period_candles
+ def get_label_horizon_candles(self, pair: str) -> int:
+ label_params = self._label_params.get(pair, {})
+ feature_parameters = self.freqai_info.get("feature_parameters", {})
+ return get_label_horizon_candles({**feature_parameters, **label_params}, logger)
+
def get_label_natr_multiplier(self, pair: str) -> float:
label_natr_multiplier = self._label_params.get(pair, {}).get(
"label_natr_multiplier"
return {
"natr_period": self.get_label_period_candles(pair),
"natr_multiplier": self.get_label_natr_multiplier(pair),
+ "label_horizon_candles": self.get_label_horizon_candles(pair),
}
return {}
dataframe[label_col] = label_data.series
+ if label_data.known_at_index is not None:
+ dataframe[label_known_at_column_name(label_col)] = label_data.known_at_index
+
label_weight_col = label_weight_column_name(label_col)
if is_weighting_active:
dataframe[label_weight_col] = compute_label_weights(
def optuna_load_best_params(
self, pair: str, namespace: str
) -> Optional[dict[str, Any]]:
- return optuna_load_best_params(self.models_full_path, pair, namespace)
+ return optuna_load_best_params(self.models_full_path, pair, namespace, logger)
EXTREMA_DIRECTION_SMOOTHED_COLUMN: Final[str] = "extrema_direction_smoothed"
EXTREMA_WEIGHT_COLUMN: Final[str] = "extrema_weight"
EXTREMA_WEIGHT_SMOOTHED_COLUMN: Final[str] = "extrema_weight_smoothed"
+_LABEL_KNOWN_AT_SUFFIX: Final[str] = "_known_at_index"
LABEL_WEIGHT_SUFFIX: Final[str] = "_weight"
_FREQAI_LABEL_SIGIL_PATTERN: Final[re.Pattern[str]] = re.compile(r"^&-?")
-@lru_cache(maxsize=16)
-def label_weight_column_name(label_col: str) -> str:
- """Return the weight column name for a label column.
+@lru_cache(maxsize=64)
+def _label_aux_column_name(label_col: str, suffix: str) -> str:
+ """Derive a freqtrade-safe auxiliary column name from a label column.
Strips the freqtrade label sigil (``&`` and its optional immediate ``-``
separator) so the resulting column does NOT collide with
Raises ``ValueError`` if the result still contains ``&`` or ``%``.
Examples:
- ``"&s-extrema"`` -> ``"s-extrema_weight"`` (smoothed marker preserved)
- ``"&-amplitude"`` -> ``"amplitude_weight"`` (raw target)
- ``"&-time_to_pivot"`` -> ``"time_to_pivot_weight"`` (raw target)
- ``"&-natr"`` -> ``"natr_weight"`` (raw target)
+ ``("&s-extrema", "_weight")`` -> ``"s-extrema_weight"``
+ ``("&-amplitude", "_weight")`` -> ``"amplitude_weight"``
+ ``("&s-extrema", "_known_at_index")`` -> ``"s-extrema_known_at_index"``
"""
stripped = _FREQAI_LABEL_SIGIL_PATTERN.sub("", label_col, count=1)
- result = f"{stripped}{LABEL_WEIGHT_SUFFIX}"
+ if not stripped or not any(c.isalpha() for c in stripped):
+ raise ValueError(
+ f"Auxiliary label column name derived from {label_col!r} with "
+ f"suffix {suffix!r} has empty or non-alphabetic stem after "
+ f"sigil strip"
+ )
+ result = f"{stripped}{suffix}"
if "&" in result or "%" in result:
raise ValueError(
- f"label_weight_column_name produced collision-prone name {result!r} "
- f"from {label_col!r}; weight columns must not contain '&' or '%'"
+ f"Auxiliary label column name {result!r} (derived from "
+ f"{label_col!r} with suffix {suffix!r}) must not contain '&' or '%'"
)
return result
+def label_weight_column_name(label_col: str) -> str:
+ """Return the weight column name for a label column."""
+ return _label_aux_column_name(label_col, LABEL_WEIGHT_SUFFIX)
+
+
+def label_known_at_column_name(label_col: str) -> str:
+ """Return the known-at-index column name for a label column."""
+ return _label_aux_column_name(label_col, _LABEL_KNOWN_AT_SUFFIX)
+
+
@dataclass
class LabelData:
series: pd.Series
indices: list[int]
metrics: dict[str, list[float]]
+ known_at_index: pd.Series | None = None
LabelGenerator = Callable[[pd.DataFrame, dict[str, Any]], LabelData]
) -> LabelData:
natr_period = params.get("natr_period", 14)
natr_multiplier = params.get("natr_multiplier", 9.0)
+ label_horizon_candles = get_label_horizon_candles(params, logger)
(
pivots_indices,
"volume_weighted_efficiency_ratio": pivots_volume_weighted_efficiency_ratios,
}
- return LabelData(series=series, indices=pivots_indices, metrics=metrics)
+ known_at_index = pd.Series(
+ np.arange(len(dataframe), dtype=np.int64) + label_horizon_candles,
+ index=dataframe.index,
+ )
+
+ return LabelData(
+ series=series,
+ indices=pivots_indices,
+ metrics=metrics,
+ known_at_index=known_at_index,
+ )
register_label_generator(EXTREMA_COLUMN, _generate_extrema_label)
return get_label_kind_config("label_prediction", config, logger)
+_CAUSAL_MODE_FALSE_WARNED: bool = False
+
+
+def get_causal_mode(config: dict[str, Any], logger: Logger) -> bool:
+ causal_mode = config.get("causal_mode", True)
+ if not isinstance(causal_mode, bool):
+ logger.warning(
+ f"Invalid causal_mode value {causal_mode!r}: must be bool, using True"
+ )
+ return True
+ global _CAUSAL_MODE_FALSE_WARNED
+ if causal_mode is False and not _CAUSAL_MODE_FALSE_WARNED:
+ logger.warning(
+ "feature_parameters.causal_mode=false is deprecated: "
+ "causal split guards disabled; label lookahead leakage possible. "
+ "Default causal_mode=true; causal_mode=false for acausal baselines only."
+ )
+ _CAUSAL_MODE_FALSE_WARNED = True
+ return causal_mode
+
+
+def get_label_horizon_candles(config: dict[str, Any], logger: Logger) -> int:
+ def _is_positive_int(value: Any) -> bool:
+ return (
+ not isinstance(value, bool)
+ and isinstance(value, (int, np.integer))
+ and value >= 1
+ )
+
+ fallback = config.get("label_period_candles", 1)
+ if not _is_positive_int(fallback):
+ fallback = 1
+ label_horizon_candles = config.get("label_horizon_candles", fallback)
+ if not _is_positive_int(label_horizon_candles):
+ logger.warning(
+ f"Invalid label_horizon_candles value {label_horizon_candles!r}: "
+ f"must be int >= 1, using {fallback!r}"
+ )
+ return fallback
+ return int(label_horizon_candles)
+
+
_EPOCH_MS_MIN = 1_262_304_000_000 # 2010-01-01T00:00:00Z
_EPOCH_MS_MAX = 2_051_222_400_000 # 2035-01-01T00:00:00Z
return trial.suggest_int(name, int_range[0], int_range[1], log=log)
+_OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION: Final[int] = 2
+"""Wire format version of optuna-label-best-params-{pair}.json.
+
+Incremented on every on-disk JSON shape change (top-level keys, params layout).
+"""
+
+
+def _is_unversioned_label_best_params_shape(best_params: Any) -> bool:
+ """Detect an unversioned Optuna label best-params dict.
+
+ An unversioned dict is a raw best-params mapping missing
+ ``schema_version``; its field shape matches the inner ``params`` of
+ a schema-versioned ``{schema_version, params}`` dict.
+ """
+ return (
+ isinstance(best_params, dict)
+ and "schema_version" not in best_params
+ and "label_period_candles" in best_params
+ and "label_natr_multiplier" in best_params
+ )
+
+
+def _validate_optuna_label_best_params(
+ best_params: Any,
+ pair: str,
+ logger: Logger | None,
+) -> dict[str, Any] | None:
+ if _is_unversioned_label_best_params_shape(best_params):
+ if logger is not None:
+ logger.info(
+ f"[{pair}] Optuna label best-params (no schema_version) "
+ f"read as v{_OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION} in-memory."
+ )
+ best_params = {
+ "schema_version": _OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION,
+ "params": best_params,
+ }
+ if not isinstance(best_params, dict):
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best-params: not a dict"
+ )
+ return None
+ schema_version = best_params.get("schema_version")
+ if schema_version is None:
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best-params: missing schema_version"
+ )
+ return None
+ if isinstance(schema_version, bool) or not isinstance(
+ schema_version, (int, np.integer)
+ ):
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best-params: invalid "
+ f"schema_version={schema_version!r} type "
+ f"(must be int)"
+ )
+ return None
+ if schema_version != _OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION:
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best-params: incompatible "
+ f"schema_version={schema_version!r} "
+ f"(expected {_OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION})"
+ )
+ return None
+ params = best_params.get("params")
+ if not isinstance(params, dict):
+ if logger is not None:
+ logger.warning(f"[{pair}] Ignoring Optuna label best-params without params")
+ return None
+ label_period_candles = params.get("label_period_candles")
+ label_natr_multiplier = params.get("label_natr_multiplier")
+ if (
+ isinstance(label_period_candles, bool)
+ or not isinstance(label_period_candles, (int, np.integer))
+ or label_period_candles < 1
+ ):
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best-params: invalid "
+ f"label_period_candles={label_period_candles!r} (must be int >= 1)"
+ )
+ return None
+ if (
+ isinstance(label_natr_multiplier, bool)
+ or not isinstance(label_natr_multiplier, (int, float, np.integer, np.floating))
+ or not np.isfinite(label_natr_multiplier)
+ or label_natr_multiplier <= 0
+ ):
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best-params: invalid "
+ f"label_natr_multiplier={label_natr_multiplier!r} "
+ f"(must be finite number > 0)"
+ )
+ return None
+ label_horizon_candles = params.get("label_horizon_candles")
+ if label_horizon_candles is not None and (
+ isinstance(label_horizon_candles, bool)
+ or not isinstance(label_horizon_candles, (int, np.integer))
+ or label_horizon_candles < 1
+ ):
+ if logger is not None:
+ logger.warning(
+ f"[{pair}] Ignoring Optuna label best-params: invalid "
+ f"label_horizon_candles={label_horizon_candles!r} (must be int >= 1)"
+ )
+ return None
+ return params
+
+
def optuna_load_best_params(
- base_path: Path, pair: str, namespace: str
+ base_path: Path, pair: str, namespace: str, logger: Logger | None = None
) -> dict[str, Any] | None:
best_params_path = (
base_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
)
if best_params_path.is_file():
with best_params_path.open("r", encoding="utf-8") as read_file:
- return json.load(read_file)
+ best_params = json.load(read_file)
+ if namespace == "label":
+ return _validate_optuna_label_best_params(best_params, pair, logger)
+ return best_params
return None
base_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
)
try:
+ if namespace == "label":
+ best_params: dict[str, Any] = {
+ "schema_version": _OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION,
+ "params": params,
+ }
+ else:
+ best_params = params
with best_params_path.open("w", encoding="utf-8") as write_file:
- json.dump(params, write_file, indent=4)
+ json.dump(best_params, write_file, indent=4)
except Exception as e:
logger.error(
f"[{pair}] Optuna {namespace} failed to save best params: {e!r}",