LabelWeightSupportError,
REGRESSORS,
Regressor,
+ _OPTUNA_NAMESPACES,
+ OptunaNamespace,
compose_sample_weights,
ensure_datetime_series,
make_test_set_and_weights,
)
OptunaSampler = Literal["tpe", "auto", "nsgaii", "nsgaiii"]
-OptunaNamespace = Literal["hp", "label"]
ScalerType = Literal["minmax", "maxabs", "standard", "robust"]
DensityAggregation = Literal["power_mean", "quantile", "min", "max"]
DistanceMethod = Literal["compromise_programming", "topsis"]
_OPTUNA_SAMPLERS[2], # "nsgaii"
_OPTUNA_SAMPLERS[3], # "nsgaiii"
)
- _OPTUNA_NAMESPACES: Final[tuple[OptunaNamespace, ...]] = ("hp", "label")
_SCALER_TYPES: Final[tuple[ScalerType, ...]] = (
"minmax",
_DISTANCE_METRICS[7], # "jensenshannon"
)
+ _PROBABILITY_DISTANCE_METRICS: Final[tuple[str, ...]] = (
+ "jensenshannon",
+ "hellinger",
+ "shellinger",
+ )
+
+ _OPTUNA_LABEL_SELECTION_SCHEMA_VERSION: Final[int] = 1
+ """Version of the label-namespace Optuna best-trial selection algorithm.
+
+ Incremented on any change to tie-break, normalization, distance-metric
+ whitelist, or selection metadata. Independent of
+ ``Utils._OPTUNA_LABEL_BEST_PARAMS_SCHEMA_VERSION``.
+ """
+
+ # Absolute tolerance (rtol=0) for constant-column detection in
+ # `_non_constant_objective_indices`; valid on the [0,1]-normalized
+ # output of `_normalize_objective_values`.
+ _NON_CONSTANT_OBJECTIVE_ATOL: Final[float] = 1e-8
+
_DENSITY_AGGREGATIONS: Final[tuple[DensityAggregation, ...]] = (
"power_mean",
"quantile",
@staticmethod
@lru_cache(maxsize=None)
def _optuna_namespaces_set() -> set[OptunaNamespace]:
- return set(QuickAdapterRegressorV3._OPTUNA_NAMESPACES)
+ return set(_OPTUNA_NAMESPACES)
@staticmethod
@lru_cache(maxsize=None)
def _unsupported_weights_metrics_set() -> set[str]:
return set(QuickAdapterRegressorV3._UNSUPPORTED_WEIGHTS_METRICS)
+ @staticmethod
+ @lru_cache(maxsize=None)
+ def _probability_distance_metrics_set() -> set[str]:
+ return set(QuickAdapterRegressorV3._PROBABILITY_DISTANCE_METRICS)
+
+ @staticmethod
+ @lru_cache(maxsize=None)
+ def _label_selection_distance_metrics_set() -> set[str]:
+ return (
+ QuickAdapterRegressorV3._distance_metrics_set()
+ - QuickAdapterRegressorV3._probability_distance_metrics_set()
+ )
+
@staticmethod
@lru_cache(maxsize=None)
def _distance_methods_set() -> set[DistanceMethod]:
logger.warning(f"{msg}, using {default!r}")
return default
+ @staticmethod
+ def _validate_label_selection_metric(
+ metric: str,
+ *,
+ ctx: str,
+ default: str,
+ mode: ValidationMode = "warn",
+ ) -> str:
+ valid_metrics = QuickAdapterRegressorV3._label_selection_distance_metrics_set()
+ valid_options = tuple(
+ candidate
+ for candidate in QuickAdapterRegressorV3._DISTANCE_METRICS
+ if candidate in valid_metrics
+ )
+ resolved_metric = QuickAdapterRegressorV3._validate_enum_value(
+ metric,
+ valid_metrics,
+ valid_options,
+ ctx=ctx,
+ mode=mode,
+ default=default,
+ )
+ return cast(str, resolved_metric)
+
@staticmethod
def _prepare_knn_kwargs(
distance_metric: str,
"label_distance_metric",
QuickAdapterRegressorV3.LABEL_DISTANCE_METRIC_DEFAULT,
)
- QuickAdapterRegressorV3._validate_enum_value(
+ distance_metric = QuickAdapterRegressorV3._validate_label_selection_metric(
distance_metric,
- QuickAdapterRegressorV3._distance_metrics_set(),
- QuickAdapterRegressorV3._DISTANCE_METRICS,
ctx="label_distance_metric",
+ mode="warn",
+ default=QuickAdapterRegressorV3.LABEL_DISTANCE_METRIC_DEFAULT,
)
config["distance_metric"] = distance_metric
elif category == "cluster":
"label_cluster_metric",
QuickAdapterRegressorV3.LABEL_CLUSTER_METRIC_DEFAULT,
)
- QuickAdapterRegressorV3._validate_enum_value(
+ distance_metric = QuickAdapterRegressorV3._validate_label_selection_metric(
distance_metric,
- QuickAdapterRegressorV3._distance_metrics_set(),
- QuickAdapterRegressorV3._DISTANCE_METRICS,
ctx="label_cluster_metric",
+ mode="warn",
+ default=QuickAdapterRegressorV3.LABEL_CLUSTER_METRIC_DEFAULT,
)
config["distance_metric"] = distance_metric
config["trial_selection_method"] = trial_selection_method
elif category == "density":
density_method = cast(DensityMethod, label_method)
- distance_metric = self.ft_params.get(
- "label_density_metric",
+ density_metric_default = (
QuickAdapterRegressorV3._get_label_density_metric_default(
density_method
- ),
+ )
)
- QuickAdapterRegressorV3._validate_enum_value(
+ distance_metric = self.ft_params.get(
+ "label_density_metric",
+ density_metric_default,
+ )
+ distance_metric = QuickAdapterRegressorV3._validate_label_selection_metric(
distance_metric,
- QuickAdapterRegressorV3._distance_metrics_set(),
- QuickAdapterRegressorV3._DISTANCE_METRICS,
ctx="label_density_metric",
+ mode="warn",
+ default=density_metric_default,
)
config["distance_metric"] = distance_metric
formatted_value = value
logger.info(f" {tunable_name}: {formatted_value}")
+ def _optuna_label_selection_metadata(self) -> dict[str, Any]:
+ """Build the label-namespace selection metadata for ``set_user_attr``.
+
+ Must return JSON-serializable values only (str/int/float/bool/None
+ and nested dicts/lists thereof); the dict-equality check at
+ ``optuna_create_study`` (idempotent ``set_user_attr`` write) breaks
+ on numpy arrays.
+ """
+ label_method = self.ft_params.get(
+ "label_method", QuickAdapterRegressorV3.LABEL_METHOD_DEFAULT
+ )
+ return {
+ "schema_version": QuickAdapterRegressorV3._OPTUNA_LABEL_SELECTION_SCHEMA_VERSION,
+ "method_config": self._resolve_label_method_config(label_method),
+ }
+
@property
def _optuna_config(self) -> dict[str, Any]:
optuna_default_config = {
return get_causal_mode(self.ft_params, logger)
def _label_horizon_candles(self, pair: str | None = None) -> int:
- label_params = self.get_optuna_params(pair, "label") if pair else {}
+ if pair is None:
+ return get_label_horizon_candles(self.ft_params, logger)
+ label_params = self.get_optuna_params(pair, _OPTUNA_NAMESPACES.label)
return get_label_horizon_candles({**self.ft_params, **label_params}, logger)
@property
-1
] * QuickAdapterRegressorV3._OPTUNA_LABEL_N_OBJECTIVES
self._optuna_hp_params[pair] = (
- self.optuna_load_best_params(
- pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]
- ) # "hp"
- or {}
- )
- self._optuna_label_params[pair] = (
- self.optuna_load_best_params(
- pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]
- ) # "label"
- or {
- "label_period_candles": self.ft_params.get(
- "label_period_candles",
- default_label_period_candles,
- ),
- "label_horizon_candles": get_label_horizon_candles(
- self.ft_params, logger
- ),
- "label_natr_multiplier": float(
- self.ft_params.get(
- "label_natr_multiplier",
- default_label_natr_multiplier,
- )
- ),
- }
- )
+ self.optuna_load_best_params(pair, _OPTUNA_NAMESPACES.hp) or {}
+ )
+ self._optuna_label_params[pair] = self.optuna_load_best_params(
+ pair, _OPTUNA_NAMESPACES.label
+ ) or {
+ "label_period_candles": self.ft_params.get(
+ "label_period_candles",
+ default_label_period_candles,
+ ),
+ "label_horizon_candles": get_label_horizon_candles(
+ self.ft_params, logger
+ ),
+ "label_natr_multiplier": float(
+ self.ft_params.get(
+ "label_natr_multiplier",
+ default_label_natr_multiplier,
+ )
+ ),
+ }
self.set_optuna_label_candle(pair)
self._optuna_label_candles[pair] = 0
def get_optuna_params(
self, pair: str, namespace: OptunaNamespace
) -> dict[str, Any]:
- if namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]: # "hp"
+ if namespace == _OPTUNA_NAMESPACES.hp:
params = self._optuna_hp_params.get(pair, {})
- elif namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]: # "label"
+ elif namespace == _OPTUNA_NAMESPACES.label:
params = self._optuna_label_params.get(pair, {})
else:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {', '.join(QuickAdapterRegressorV3._OPTUNA_NAMESPACES)}"
+ f"supported values are {', '.join(_OPTUNA_NAMESPACES)}"
)
return params
def set_optuna_params(
self, pair: str, namespace: OptunaNamespace, params: dict[str, Any]
) -> None:
- if namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]: # "hp"
+ if namespace == _OPTUNA_NAMESPACES.hp:
self._optuna_hp_params[pair] = params
- elif namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]: # "label"
+ elif namespace == _OPTUNA_NAMESPACES.label:
self._optuna_label_params[pair] = params
else:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {', '.join(QuickAdapterRegressorV3._OPTUNA_NAMESPACES)}"
+ f"supported values are {', '.join(_OPTUNA_NAMESPACES)}"
)
def get_optuna_value(self, pair: str, namespace: OptunaNamespace) -> float:
- if namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]: # "hp"
+ if namespace == _OPTUNA_NAMESPACES.hp:
value = self._optuna_hp_value.get(pair, np.nan)
else:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]!r}" # "hp"
+ f"supported values are {_OPTUNA_NAMESPACES.hp!r}"
)
return value
def set_optuna_value(
self, pair: str, namespace: OptunaNamespace, value: float
) -> None:
- if namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]: # "hp"
+ if namespace == _OPTUNA_NAMESPACES.hp:
self._optuna_hp_value[pair] = value
else:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]!r}" # "hp"
+ f"supported values are {_OPTUNA_NAMESPACES.hp!r}"
)
def get_optuna_values(
self, pair: str, namespace: OptunaNamespace
) -> list[float | int]:
- if namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]: # "label"
+ if namespace == _OPTUNA_NAMESPACES.label:
values = self._optuna_label_values.get(
pair, [np.nan] * QuickAdapterRegressorV3._OPTUNA_LABEL_N_OBJECTIVES
)
else:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]}" # "label"
+ f"supported values are {_OPTUNA_NAMESPACES.label}"
)
return values
def set_optuna_values(
self, pair: str, namespace: OptunaNamespace, values: list[float | int]
) -> None:
- if namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]: # "label"
+ if namespace == _OPTUNA_NAMESPACES.label:
self._optuna_label_values[pair] = values
else:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]}" # "label"
+ f"supported values are {_OPTUNA_NAMESPACES.label}"
)
def init_optuna_label_candle_pool(self) -> None:
elif gap == 0:
gap = self.get_optuna_params(
dk.pair,
- QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1], # "label"
+ _OPTUNA_NAMESPACES.label,
).get("label_period_candles")
logger.info(
f"[{dk.pair}] TimeSeriesSplit gap auto-set from label_period_candles: {gap}"
if self._optuna_hyperopt:
self.optuna_optimize(
pair=dk.pair,
- namespace=QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0], # "hp"
+ namespace=_OPTUNA_NAMESPACES.hp,
objective=lambda trial: hp_objective(
trial,
self.regressor,
self.data_split_parameters.get(
"test_size", QuickAdapterRegressorV3._TEST_SIZE
),
- self.get_optuna_params(
- dk.pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]
- ), # "hp"
+ self.get_optuna_params(dk.pair, _OPTUNA_NAMESPACES.hp),
model_training_parameters,
self._optuna_config.get(
"space_reduction",
direction=optuna.study.StudyDirection.MINIMIZE,
)
- optuna_hp_params = self.get_optuna_params(
- dk.pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]
- ) # "hp"
+ optuna_hp_params = self.get_optuna_params(dk.pair, _OPTUNA_NAMESPACES.hp)
if optuna_hp_params:
model_training_parameters = {
**model_training_parameters,
namespace: OptunaNamespace,
callback: Callable[[], Optional[optuna.study.Study]],
) -> None:
- if namespace not in {QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]}: # "label"
+ if namespace not in {_OPTUNA_NAMESPACES.label}:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]}" # "label"
+ f"supported values are {_OPTUNA_NAMESPACES.label}"
)
if not callable(callback):
raise ValueError(
if self._optuna_hyperopt:
self.optuna_throttle_callback(
pair=pair,
- namespace=QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1], # "label"
+ namespace=_OPTUNA_NAMESPACES.label,
callback=lambda: self.optuna_optimize(
pair=pair,
- namespace=QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1], # "label"
+ namespace=_OPTUNA_NAMESPACES.label,
objective=lambda trial: label_objective(
trial,
self.data_provider.get_pair_dataframe(
col_prediction_config,
pred_df,
fit_live_predictions_candles,
- self.get_optuna_params(
- pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]
- ).get("label_period_candles"), # "label"
+ self.get_optuna_params(pair, _OPTUNA_NAMESPACES.label).get(
+ "label_period_candles"
+ ),
)
di_sample = finite_sample(
[]
)
dk.data["extra_returns_per_train"]["label_period_candles"] = (
- self.get_optuna_params(
- pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]
- ).get("label_period_candles") # "label"
+ self.get_optuna_params(pair, _OPTUNA_NAMESPACES.label).get(
+ "label_period_candles"
+ )
)
dk.data["extra_returns_per_train"]["label_natr_multiplier"] = (
self.get_optuna_params(
pair,
- QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1], # "label"
+ _OPTUNA_NAMESPACES.label,
).get("label_natr_multiplier")
)
hp_rmse = QuickAdapterRegressorV3.optuna_validate_value(
- self.get_optuna_value(pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0])
- ) # "hp"
+ self.get_optuna_value(pair, _OPTUNA_NAMESPACES.hp)
+ )
dk.data["extra_returns_per_train"]["hp_rmse"] = (
hp_rmse if hp_rmse is not None else np.inf
)
return normalized_matrix
+ @staticmethod
+ def _non_constant_objective_indices(
+ normalized_matrix: NDArray[np.floating],
+ ) -> NDArray[np.intp]:
+ if normalized_matrix.ndim != 2:
+ raise ValueError(
+ f"Invalid normalized_matrix (shape={normalized_matrix.shape}, "
+ f"ndim={normalized_matrix.ndim}): must be 2-dimensional"
+ )
+ if not np.all(np.isfinite(normalized_matrix)):
+ raise ValueError(
+ "Invalid normalized_matrix: must contain only finite values (no NaN or inf)"
+ )
+ non_constant_mask = np.array(
+ [
+ # rtol=0: pure absolute tolerance on [0,1]-normalized columns;
+ # any finite rtol would leak column magnitude into the threshold.
+ not np.allclose(
+ normalized_matrix[:, column_index],
+ normalized_matrix[0, column_index],
+ rtol=0.0,
+ atol=QuickAdapterRegressorV3._NON_CONSTANT_OBJECTIVE_ATOL,
+ )
+ for column_index in range(normalized_matrix.shape[1])
+ ],
+ dtype=bool,
+ )
+ return np.flatnonzero(non_constant_mask)
+
+ @staticmethod
+ def _select_lowest_number_trial(
+ trials: list[optuna.trial.FrozenTrial],
+ ) -> optuna.trial.FrozenTrial:
+ return min(trials, key=lambda trial: trial.number)
+
+ @staticmethod
+ def _select_best_trial_by_distance(
+ trials: list[optuna.trial.FrozenTrial],
+ distances: NDArray[np.floating],
+ ) -> optuna.trial.FrozenTrial:
+ if distances.size != len(trials):
+ raise ValueError(
+ f"Invalid trial distances length {distances.size}: "
+ f"must match trials length {len(trials)}"
+ )
+
+ candidates = [
+ (float(distance), trial.number, trial)
+ for trial, distance in zip(trials, distances)
+ if np.isfinite(distance)
+ ]
+ if not candidates:
+ logger.warning(
+ "_select_best_trial_by_distance: all %d candidate distances "
+ "are non-finite; falling back to lowest trial number",
+ len(trials),
+ )
+ return QuickAdapterRegressorV3._select_lowest_number_trial(trials)
+ return min(candidates, key=lambda candidate: (candidate[0], candidate[1]))[2]
+
@staticmethod
def _get_n_clusters(
matrix: NDArray[np.floating],
self,
normalized_matrix: NDArray[np.floating],
selection_method: SelectionMethod,
+ objective_indices: Optional[NDArray[np.intp]] = None,
+ original_n_objectives: Optional[int] = None,
) -> NDArray[np.floating]:
if normalized_matrix.ndim != 2:
raise ValueError(
label_p_order = self.ft_params.get("label_p_order")
label_weights = self.ft_params.get("label_weights")
+ if (
+ label_weights is not None
+ and objective_indices is not None
+ and original_n_objectives is not None
+ and original_n_objectives != n_objectives
+ ):
+ try:
+ label_weights_array = np.asarray(label_weights, dtype=float)
+ except (ValueError, TypeError):
+ label_weights_array = None
+ if (
+ label_weights_array is not None
+ and label_weights_array.ndim == 1
+ and label_weights_array.size == original_n_objectives
+ ):
+ label_weights = label_weights_array[objective_indices]
+ logger.debug(
+ "label_weights sliced to non-constant objectives "
+ "(indices=%s, original_size=%d, sliced_size=%d)",
+ objective_indices.tolist(),
+ label_weights_array.size,
+ label_weights.size,
+ )
weights = QuickAdapterRegressorV3._validate_label_weights(
label_weights,
n_objectives,
def _get_multi_objective_study_best_trial(
self, namespace: OptunaNamespace, study: optuna.study.Study
) -> Optional[optuna.trial.FrozenTrial]:
- if namespace not in {QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]}: # "label"
+ if namespace not in {_OPTUNA_NAMESPACES.label}:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]}" # "label"
+ f"supported values are {_OPTUNA_NAMESPACES.label}"
)
n_objectives = len(study.directions)
if n_objectives < 2:
normalized_matrix = QuickAdapterRegressorV3._normalize_objective_values(
objective_values_matrix, study.directions
)
+ original_n_objectives = normalized_matrix.shape[1]
+ non_constant_objective_indices = (
+ QuickAdapterRegressorV3._non_constant_objective_indices(normalized_matrix)
+ )
+ if non_constant_objective_indices.size == 0:
+ return QuickAdapterRegressorV3._select_lowest_number_trial(best_trials)
+ normalized_matrix = normalized_matrix[:, non_constant_objective_indices]
trial_distances = self._calculate_distances(
normalized_matrix,
selection_method=label_method,
+ objective_indices=non_constant_objective_indices,
+ original_n_objectives=original_n_objectives,
)
- return best_trials[np.nanargmin(trial_distances)]
+ return QuickAdapterRegressorV3._select_best_trial_by_distance(
+ best_trials, trial_distances
+ )
def optuna_optimize(
self,
def optuna_samplers_by_namespace(
self, namespace: OptunaNamespace
) -> tuple[set[OptunaSampler], OptunaSampler]:
- if namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[0]: # "hp"
+ if namespace == _OPTUNA_NAMESPACES.hp:
return (
QuickAdapterRegressorV3._optuna_hpo_samplers_set(),
self._optuna_config.get(
"sampler", QuickAdapterRegressorV3._OPTUNA_HPO_SAMPLERS[0]
),
)
- elif namespace == QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]: # "label"
+ elif namespace == _OPTUNA_NAMESPACES.label:
return (
QuickAdapterRegressorV3._optuna_label_samplers_set(),
self._optuna_config.get(
else:
raise ValueError(
f"Invalid namespace value {namespace!r}: "
- f"supported values are {', '.join(QuickAdapterRegressorV3._OPTUNA_NAMESPACES)}"
+ f"supported values are {', '.join(_OPTUNA_NAMESPACES)}"
)
def optuna_create_study(
QuickAdapterRegressorV3.optuna_delete_study(
pair, namespace, study_name, storage
)
+ elif namespace == _OPTUNA_NAMESPACES.label:
+ existing_study = QuickAdapterRegressorV3.optuna_load_study(
+ study_name, storage
+ )
+ if existing_study is not None:
+ existing_selection_metadata = existing_study.user_attrs.get(
+ "selection_metadata"
+ )
+ existing_schema_version = (
+ existing_selection_metadata.get("schema_version")
+ if isinstance(existing_selection_metadata, dict)
+ else None
+ )
+ target_version = (
+ QuickAdapterRegressorV3._OPTUNA_LABEL_SELECTION_SCHEMA_VERSION
+ )
+ if existing_schema_version is None:
+ logger.info(
+ f"[{pair}] Optuna {namespace} study {study_name}: "
+ f"selection schema (none -> v{target_version}); "
+ f"{len(existing_study.trials)} trial(s) preserved"
+ )
+ elif existing_schema_version != target_version:
+ logger.warning(
+ f"[{pair}] Optuna {namespace} study {study_name}: "
+ f"selection schema v{existing_schema_version!r} incompatible "
+ f"with v{target_version}; resetting study"
+ )
+ QuickAdapterRegressorV3.optuna_delete_study(
+ pair, namespace, study_name, storage
+ )
samplers, sampler = self.optuna_samplers_by_namespace(namespace)
if sampler not in samplers:
)
try:
- return optuna.create_study(
+ study = optuna.create_study(
study_name=study_name,
sampler=self.optuna_create_sampler(sampler),
pruner=self.optuna_create_pruner(is_study_single_objective),
storage=storage,
load_if_exists=not continuous,
)
+ if namespace == _OPTUNA_NAMESPACES.label:
+ new_selection_metadata = self._optuna_label_selection_metadata()
+ existing_selection_metadata = study.user_attrs.get("selection_metadata")
+ if existing_selection_metadata != new_selection_metadata:
+ if isinstance(existing_selection_metadata, dict):
+ logger.warning(
+ f"[{pair}] Optuna {namespace} study {study_name}: "
+ f"selection_metadata change detected "
+ f"(stored: {existing_selection_metadata!r}, "
+ f"current: {new_selection_metadata!r})"
+ )
+ study.set_user_attr("selection_metadata", new_selection_metadata)
+ return study
except Exception as e:
logger.error(
f"[{pair}] Optuna {namespace} study creation failed for study {study_name}: {e!r}",
namespace,
self.get_optuna_params(pair, namespace),
logger,
+ selection_metadata=self._optuna_label_selection_metadata()
+ if namespace == _OPTUNA_NAMESPACES.label
+ else None,
)
def optuna_load_best_params(