import warnings
from functools import lru_cache
from pathlib import Path
-from typing import Any, Callable, Final, Literal, Optional, Union, cast
+from typing import AbstractSet, Any, Callable, Final, Literal, Optional, Union, cast
import numpy as np
import optuna
ClusterMethod = Literal["kmeans", "kmeans2", "kmedoids"]
DensityMethod = Literal["knn", "medoid"]
SelectionMethod = Union[DistanceMethod, ClusterMethod, DensityMethod]
+ValidationMode = Literal["warn", "raise", "none"]
warnings.simplefilter(action="ignore", category=FutureWarning)
logger = logging.getLogger(__name__)
"weighted_sum",
)
- _UNSUPPORTED_CLUSTER_METRICS: Final[tuple[str, ...]] = (
+ _UNSUPPORTED_WEIGHTS_METRICS: Final[tuple[str, ...]] = (
_DISTANCE_METRICS[6], # "mahalanobis"
_DISTANCE_METRICS[5], # "seuclidean"
_DISTANCE_METRICS[7], # "jensenshannon"
@staticmethod
@lru_cache(maxsize=None)
- def _unsupported_cluster_metrics_set() -> set[str]:
- return set(QuickAdapterRegressorV3._UNSUPPORTED_CLUSTER_METRICS)
+ def _unsupported_weights_metrics_set() -> set[str]:
+ return set(QuickAdapterRegressorV3._UNSUPPORTED_WEIGHTS_METRICS)
@staticmethod
@lru_cache(maxsize=None)
return None
@staticmethod
- def _validate_minkowski_p(p: Optional[float], *, ctx: str) -> Optional[float]:
+ def _validate_minkowski_p(
+ p: Optional[float],
+ *,
+ ctx: str,
+ mode: ValidationMode = "raise",
+ ) -> Optional[float]:
if p is None:
return None
+ if mode == "none":
+ return float(p) if (np.isfinite(p) and p > 0) else None
+
if not np.isfinite(p):
- raise ValueError(f"Invalid {ctx} p {p!r}: must be finite")
+ msg = f"Invalid {ctx} {p!r}: must be finite"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using default")
+ return None
+
if p <= 0:
- raise ValueError(f"Invalid {ctx} p {p!r}: must be > 0")
+ msg = f"Invalid {ctx} {p!r}: must be > 0"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using default")
+ return None
+
return float(p)
@staticmethod
distance_metric: str,
weights: Optional[NDArray[np.floating]] = None,
p: Optional[float] = None,
- validate_p: bool = True,
- check_unsupported_metrics: bool = False,
- validation_context: str = "distance calculation",
+ mode: ValidationMode = "none",
+ metric_ctx: str = "distance_metric",
+ p_ctx: str = "p",
) -> dict[str, Any]:
kwargs: dict[str, Any] = {}
if weights is not None:
- if check_unsupported_metrics:
- if (
- distance_metric
- not in QuickAdapterRegressorV3._unsupported_cluster_metrics_set()
- ):
- kwargs["w"] = weights
- else:
+ validated_metric = QuickAdapterRegressorV3._validate_metric_weights_support(
+ distance_metric, ctx=metric_ctx, mode=mode
+ )
+ if validated_metric is not None:
kwargs["w"] = weights
- if (
- distance_metric == QuickAdapterRegressorV3._DISTANCE_METRICS[1]
- ): # "minkowski"
- if p is not None and np.isfinite(p):
- if validate_p:
- kwargs["p"] = QuickAdapterRegressorV3._validate_minkowski_p(
- p, ctx=validation_context
- )
- else:
- kwargs["p"] = p
+ if distance_metric == QuickAdapterRegressorV3._DISTANCE_METRICS[1]:
+ validated_p = QuickAdapterRegressorV3._validate_minkowski_p(
+ p, ctx=p_ctx, mode=mode
+ )
+ if validated_p is not None:
+ kwargs["p"] = validated_p
return kwargs
@staticmethod
- def _validate_quantile_q(q: Optional[float], *, ctx: str) -> Optional[float]:
+ def _validate_quantile_q(
+ q: Optional[float], *, ctx: str, mode: ValidationMode = "raise"
+ ) -> Optional[float]:
if q is None:
return None
+ if mode == "none":
+ return float(q) if (np.isfinite(q) and 0.0 <= q <= 1.0) else None
+
if not np.isfinite(q):
- raise ValueError(f"Invalid {ctx} q {q!r}: must be finite")
+ msg = f"Invalid {ctx} {q!r}: must be finite"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using default")
+ return None
+
if q < 0.0 or q > 1.0:
- raise ValueError(f"Invalid {ctx} q {q!r}: must be in [0, 1]")
+ msg = f"Invalid {ctx} {q!r}: must be in [0, 1]"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using default")
+ return None
+
return float(q)
@staticmethod
- def _validate_metric_supported(metric: str, *, category: str) -> None:
- if metric in QuickAdapterRegressorV3._unsupported_cluster_metrics_set():
- supported_metrics = [
- m
- for m in QuickAdapterRegressorV3._DISTANCE_METRICS
- if m not in QuickAdapterRegressorV3._UNSUPPORTED_CLUSTER_METRICS
- ]
- raise ValueError(
- f"Invalid label_{category}_metric {metric!r}. "
- f"This metric does not support weighted distance calculations. "
- f"Supported: {', '.join(supported_metrics)}"
+ def _validate_power_mean_p(
+ p: Optional[float], *, ctx: str, mode: ValidationMode = "raise"
+ ) -> Optional[float]:
+ if p is None:
+ return None
+ if mode == "none":
+ return float(p) if np.isfinite(p) else None
+
+ if not np.isfinite(p):
+ msg = f"Invalid {ctx} {p!r}: must be finite"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using default")
+ return None
+
+ return float(p)
+
+ @staticmethod
+ def _validate_metric_weights_support(
+ metric: str, *, ctx: str, mode: ValidationMode = "warn"
+ ) -> Optional[str]:
+ if metric not in QuickAdapterRegressorV3._unsupported_weights_metrics_set():
+ return metric
+
+ if mode == "none":
+ return None
+
+ msg = f"Invalid {ctx} {metric!r}: does not support custom weights"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using uniform weights")
+ return None
+
+ @staticmethod
+ def _validate_label_weights(
+ weights: Any,
+ n_objectives: int,
+ *,
+ ctx: str,
+ mode: ValidationMode = "raise",
+ ) -> Optional[NDArray[np.floating]]:
+ uniform_weights = np.full(n_objectives, 1.0 / n_objectives)
+
+ if weights is None:
+ return uniform_weights
+
+ if mode == "none":
+ if not isinstance(weights, (list, tuple, np.ndarray)):
+ return uniform_weights
+ try:
+ np_weights = np.asarray(weights, dtype=float)
+ except (ValueError, TypeError):
+ return uniform_weights
+ if np_weights.size != n_objectives:
+ return uniform_weights
+ if not np.all(np.isfinite(np_weights)):
+ return uniform_weights
+ if np.any(np_weights < 0):
+ return uniform_weights
+ weights_sum = np.nansum(np_weights)
+ if np.isclose(weights_sum, 0.0):
+ return uniform_weights
+ return np_weights / weights_sum
+
+ if not isinstance(weights, (list, tuple, np.ndarray)):
+ msg = (
+ f"Invalid {ctx} {type(weights).__name__!r}: "
+ f"must be a list, tuple, or array"
+ )
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using uniform weights")
+ return uniform_weights
+
+ np_weights = np.asarray(weights, dtype=float)
+
+ if np_weights.size != n_objectives:
+ msg = (
+ f"Invalid {ctx} (length={np_weights.size}): "
+ f"must match number of objectives ({n_objectives})"
)
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using uniform weights")
+ return uniform_weights
+
+ if not np.all(np.isfinite(np_weights)):
+ msg = f"Invalid {ctx}: contains non-finite values"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using uniform weights")
+ return uniform_weights
+
+ if np.any(np_weights < 0):
+ msg = f"Invalid {ctx}: contains negative values"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using uniform weights")
+ return uniform_weights
+
+ weights_sum = np.nansum(np_weights)
+ if np.isclose(weights_sum, 0.0):
+ msg = f"Invalid {ctx}: sum is zero"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using uniform weights")
+ return uniform_weights
+
+ return np_weights / weights_sum
+
+ @staticmethod
+ def _validate_enum_value(
+ value: str,
+ valid_set: AbstractSet[str],
+ valid_options: tuple[str, ...],
+ *,
+ ctx: str,
+ mode: ValidationMode = "raise",
+ default: Optional[str] = None,
+ ) -> Optional[str]:
+ if value in valid_set:
+ return value
+
+ if mode == "none":
+ return default
+
+ msg = f"Invalid {ctx} {value!r}. Supported: {', '.join(valid_options)}"
+ if mode == "raise":
+ raise ValueError(msg)
+ logger.warning(f"{msg}, using {default!r}")
+ return default
+
+ @staticmethod
+ def _prepare_knn_kwargs(
+ distance_metric: str,
+ *,
+ weights: Optional[NDArray[np.floating]] = None,
+ p: Optional[float] = None,
+ mode: ValidationMode = "warn",
+ p_ctx: str = "label_density_p",
+ ) -> dict[str, Any]:
+ knn_kwargs: dict[str, Any] = {}
+
+ if distance_metric == QuickAdapterRegressorV3._DISTANCE_METRICS[1]:
+ validated_p = QuickAdapterRegressorV3._validate_minkowski_p(
+ p, ctx=p_ctx, mode=mode
+ )
+ if validated_p is not None:
+ knn_kwargs["p"] = validated_p
+ if weights is not None:
+ knn_kwargs["metric_params"] = {"w": weights}
+
+ return knn_kwargs
@staticmethod
def _resolve_p_order(
label_p_order: Optional[float],
*,
ctx: str,
+ mode: ValidationMode = "raise",
) -> Optional[float]:
p = (
label_p_order
if (
distance_metric == QuickAdapterRegressorV3._DISTANCE_METRICS[1]
): # "minkowski"
- p = QuickAdapterRegressorV3._validate_minkowski_p(p, ctx=ctx)
+ p = QuickAdapterRegressorV3._validate_minkowski_p(p, ctx=ctx, mode=mode)
return p
def _resolve_label_method_config(self, label_method: str) -> dict[str, Any]:
- if label_method not in self._selection_methods_set():
- raise ValueError(
- f"Invalid label_method {label_method!r}. "
- f"Supported: {', '.join(QuickAdapterRegressorV3._SELECTION_METHODS)}"
- )
+ QuickAdapterRegressorV3._validate_enum_value(
+ label_method,
+ self._selection_methods_set(),
+ QuickAdapterRegressorV3._SELECTION_METHODS,
+ ctx="label_method",
+ )
category = QuickAdapterRegressorV3._get_selection_category(label_method)
config: dict[str, Any] = {
"label_distance_metric",
QuickAdapterRegressorV3.LABEL_DISTANCE_METRIC_DEFAULT,
)
- if distance_metric not in QuickAdapterRegressorV3._distance_metrics_set():
- raise ValueError(
- f"Invalid label_distance_metric {distance_metric!r}. "
- f"Supported: {', '.join(QuickAdapterRegressorV3._DISTANCE_METRICS)}"
- )
+ QuickAdapterRegressorV3._validate_enum_value(
+ distance_metric,
+ QuickAdapterRegressorV3._distance_metrics_set(),
+ QuickAdapterRegressorV3._DISTANCE_METRICS,
+ ctx="label_distance_metric",
+ )
config["distance_metric"] = distance_metric
elif category == "cluster":
distance_metric = self.ft_params.get(
"label_cluster_metric",
QuickAdapterRegressorV3.LABEL_CLUSTER_METRIC_DEFAULT,
)
- if distance_metric not in QuickAdapterRegressorV3._distance_metrics_set():
- raise ValueError(
- f"Invalid label_cluster_metric {distance_metric!r}. "
- f"Supported: {', '.join(QuickAdapterRegressorV3._DISTANCE_METRICS)}"
- )
+ QuickAdapterRegressorV3._validate_enum_value(
+ distance_metric,
+ QuickAdapterRegressorV3._distance_metrics_set(),
+ QuickAdapterRegressorV3._DISTANCE_METRICS,
+ ctx="label_cluster_metric",
+ )
config["distance_metric"] = distance_metric
+
selection_method = self.ft_params.get(
"label_cluster_selection_method",
QuickAdapterRegressorV3.LABEL_CLUSTER_SELECTION_METHOD_DEFAULT,
)
- if selection_method not in QuickAdapterRegressorV3._distance_methods_set():
- raise ValueError(
- f"Invalid label_cluster_selection_method {selection_method!r}. "
- f"Supported: {', '.join(QuickAdapterRegressorV3._DISTANCE_METHODS)}"
- )
+ QuickAdapterRegressorV3._validate_enum_value(
+ selection_method,
+ QuickAdapterRegressorV3._distance_methods_set(),
+ QuickAdapterRegressorV3._DISTANCE_METHODS,
+ ctx="label_cluster_selection_method",
+ )
config["selection_method"] = selection_method
trial_selection_method = self.ft_params.get(
"label_cluster_trial_selection_method",
QuickAdapterRegressorV3.LABEL_CLUSTER_TRIAL_SELECTION_METHOD_DEFAULT,
)
- if (
- trial_selection_method
- not in QuickAdapterRegressorV3._distance_methods_set()
- ):
- raise ValueError(
- f"Invalid label_cluster_trial_selection_method {trial_selection_method!r}. "
- f"Supported: {', '.join(QuickAdapterRegressorV3._DISTANCE_METHODS)}"
- )
+ QuickAdapterRegressorV3._validate_enum_value(
+ trial_selection_method,
+ QuickAdapterRegressorV3._distance_methods_set(),
+ QuickAdapterRegressorV3._DISTANCE_METHODS,
+ ctx="label_cluster_trial_selection_method",
+ )
config["trial_selection_method"] = trial_selection_method
elif category == "density":
density_method = cast(DensityMethod, label_method)
density_method
),
)
- if distance_metric not in QuickAdapterRegressorV3._distance_metrics_set():
- raise ValueError(
- f"Invalid label_density_metric {distance_metric!r}. "
- f"Supported: {', '.join(QuickAdapterRegressorV3._DISTANCE_METRICS)}"
- )
+ QuickAdapterRegressorV3._validate_enum_value(
+ distance_metric,
+ QuickAdapterRegressorV3._distance_metrics_set(),
+ QuickAdapterRegressorV3._DISTANCE_METRICS,
+ ctx="label_density_metric",
+ )
config["distance_metric"] = distance_metric
if density_method == QuickAdapterRegressorV3._DENSITY_METHODS[0]: # "knn"
QuickAdapterRegressorV3.LABEL_DENSITY_AGGREGATION_DEFAULT,
),
)
- if (
- aggregation
- not in QuickAdapterRegressorV3._density_aggregations_set()
- ):
- raise ValueError(
- f"Invalid label_density_aggregation {aggregation!r}. "
- f"Supported: {', '.join(QuickAdapterRegressorV3._DENSITY_AGGREGATIONS)}"
- )
+ QuickAdapterRegressorV3._validate_enum_value(
+ aggregation,
+ QuickAdapterRegressorV3._density_aggregations_set(),
+ QuickAdapterRegressorV3._DENSITY_AGGREGATIONS,
+ ctx="label_density_aggregation",
+ )
config["aggregation"] = aggregation
n_neighbors = self.ft_params.get(
)
if not isinstance(n_neighbors, int) or n_neighbors < 1:
raise ValueError(
- f"Invalid label_density_n_neighbors: must be >= 1, got {n_neighbors!r}"
+ f"Invalid label_density_n_neighbors {n_neighbors!r}: must be int >= 1"
)
config["n_neighbors"] = n_neighbors
ctx="label_density_aggregation_param",
)
elif aggregation == "power_mean":
- if not np.isfinite(aggregation_param):
- raise ValueError(
- f"Invalid label_density_aggregation_param p {aggregation_param!r}: must be finite"
- )
+ QuickAdapterRegressorV3._validate_power_mean_p(
+ aggregation_param,
+ ctx="label_density_aggregation_param",
+ )
config["aggregation_param"] = aggregation_param
)
return np.nanmedian(values)
- @staticmethod
- def _normalize_weights(
- weights: Optional[NDArray[np.floating]],
- n_objectives: int,
- ) -> NDArray[np.floating]:
- if weights is None:
- return np.full(n_objectives, 1.0 / n_objectives)
-
- np_weights = np.asarray(weights, dtype=float)
- if np_weights.size != n_objectives:
- raise ValueError(
- "Invalid label_weights: length must match number of objectives"
- )
- if not np.all(np.isfinite(np_weights)):
- raise ValueError(
- f"Invalid label_weights (shape={np_weights.shape}, dtype={np_weights.dtype}): "
- f"must contain only finite values"
- )
- if np.any(np_weights < 0):
- raise ValueError(
- f"Invalid label_weights (shape={np_weights.shape}, dtype={np_weights.dtype}): "
- f"values must be non-negative"
- )
-
- weights_sum = np.nansum(np_weights)
- if np.isclose(weights_sum, 0.0):
- raise ValueError(
- f"Invalid label_weights (shape={np_weights.shape}, sum={weights_sum}): "
- f"sum cannot be zero"
- )
-
- return np_weights / weights_sum
-
@staticmethod
def _compromise_programming_scores(
normalized_matrix: NDArray[np.floating],
if distance_metric in QuickAdapterRegressorV3._scipy_metrics_set():
cdist_kwargs = QuickAdapterRegressorV3._prepare_distance_kwargs(
- distance_metric=distance_metric,
+ distance_metric,
weights=weights,
p=p,
- validate_p=True,
- check_unsupported_metrics=True,
- validation_context="compromise_programming minkowski p",
+ mode="warn",
+ metric_ctx="label_distance_metric",
+ p_ctx="label_distance_p",
)
return sp.spatial.distance.cdist(
normalized_matrix,
"Invalid matrix: must contain only finite values (no NaN or inf)"
)
- if (
- weights is not None
- and distance_metric
- in QuickAdapterRegressorV3._unsupported_cluster_metrics_set()
- ):
- raise ValueError(
- f"Invalid weights: unsupported for distance_metric {distance_metric!r}"
- )
-
n = matrix.shape[0]
if n == 0:
return np.array([])
return np.array([0.0])
pdist_kwargs = QuickAdapterRegressorV3._prepare_distance_kwargs(
- distance_metric=distance_metric,
+ distance_metric,
weights=weights,
p=p,
- validate_p=True,
- check_unsupported_metrics=False,
- validation_context="pairwise_distance_sums minkowski p",
+ mode="warn",
+ metric_ctx="label_density_metric",
+ p_ctx="label_density_p",
)
pairwise_distances_vector = sp.spatial.distance.pdist(
distance_metric=distance_metric,
weights=weights,
p=p,
- validate_p=True,
- check_unsupported_metrics=True,
- validation_context="topsis minkowski p",
+ mode="warn",
+ metric_ctx="label_distance_metric",
+ p_ctx="label_distance_p",
)
dist_to_ideal = sp.spatial.distance.cdist(
distance_metric=distance_metric,
weights=weights,
p=p,
- validate_p=True,
- check_unsupported_metrics=False,
- validation_context="calculate_trial_distance_to_ideal minkowski p",
+ mode="warn",
+ metric_ctx="label_cluster_metric",
+ p_ctx="label_cluster_p",
)
return sp.spatial.distance.cdist(
if n_samples == 1:
return np.array([0.0])
- knn_kwargs: dict[str, Any] = {}
- if (
- distance_metric == QuickAdapterRegressorV3._DISTANCE_METRICS[1]
- ): # "minkowski"
- if p is not None and np.isfinite(p):
- knn_kwargs["p"] = QuickAdapterRegressorV3._validate_minkowski_p(
- p,
- ctx="knn minkowski p",
- )
- if weights is not None:
- knn_kwargs["metric_params"] = {"w": weights}
- else:
- if weights is not None and not np.allclose(weights, weights[0]):
- raise ValueError(
- f"Invalid configuration: weights are only supported for Minkowski distance metric, "
- f"but got distance_metric={distance_metric!r}"
- )
+ knn_kwargs = QuickAdapterRegressorV3._prepare_knn_kwargs(
+ distance_metric,
+ weights=weights,
+ p=p,
+ mode="raise",
+ )
nbrs = sklearn.neighbors.NearestNeighbors(
n_neighbors=min(n_neighbors, n_samples - 1) + 1,
): # "power_mean"
power = (
aggregation_param
- if aggregation_param is not None and np.isfinite(aggregation_param)
+ if aggregation_param is not None
else QuickAdapterRegressorV3._get_label_density_aggregation_param_default(
aggregation
)
)
+ if power is None:
+ power = 1.0
+ power = QuickAdapterRegressorV3._validate_power_mean_p(
+ power,
+ ctx="label_density_aggregation_param",
+ )
if power is None:
power = 1.0
return sp.stats.pmean(neighbor_distances, p=power, axis=1)
quantile = 0.5
quantile = QuickAdapterRegressorV3._validate_quantile_q(
quantile,
- ctx="knn quantile q",
+ ctx="label_density_aggregation_param",
)
return np.nanquantile(neighbor_distances, quantile, axis=1)
elif aggregation == QuickAdapterRegressorV3._DENSITY_AGGREGATIONS[2]: # "min"
label_p_order = self.ft_params.get("label_p_order")
label_weights = self.ft_params.get("label_weights")
-
- if label_weights is not None and not isinstance(
- label_weights, (list, tuple, np.ndarray)
- ):
- raise ValueError(
- f"Invalid label_weights: must be a list, tuple, or array, got {type(label_weights).__name__}"
- )
- weights = QuickAdapterRegressorV3._normalize_weights(
- np.array(label_weights, dtype=float) if label_weights is not None else None,
+ weights = QuickAdapterRegressorV3._validate_label_weights(
+ label_weights,
n_objectives,
+ ctx="label_weights",
+ mode="raise",
)
if n_samples == 1:
distance_metric,
label_p_order,
ctx=f"label_p_order for {method}",
+ mode="none",
)
if (
cluster_selection_method = label_config["selection_method"]
trial_selection_method = label_config["trial_selection_method"]
- QuickAdapterRegressorV3._validate_metric_supported(
- cluster_metric, category="cluster"
- )
p = QuickAdapterRegressorV3._resolve_p_order(
cluster_metric,
label_p_order,
ctx=f"label_p_order for {method}",
+ mode="none",
)
return self._cluster_based_selection(
normalized_matrix,
if category == "density":
density_method = cast(DensityMethod, method)
density_metric = label_config["distance_metric"]
- QuickAdapterRegressorV3._validate_metric_supported(
- density_metric, category="density"
- )
p = QuickAdapterRegressorV3._resolve_p_order(
density_metric,
label_p_order,
ctx=f"label_p_order for {density_method}",
+ mode="none",
)
if density_method == QuickAdapterRegressorV3._DENSITY_METHODS[0]: # "knn"