From 0d616c929eee36682103878fcafb18d387f5e1be Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 31 Dec 2025 14:01:44 +0100 Subject: [PATCH] refactor(quickadapter): factor out topsis and distance metric logic MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../freqaimodels/QuickAdapterRegressorV3.py | 367 +++++++++--------- 1 file changed, 187 insertions(+), 180 deletions(-) diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 949382e..790ff9c 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -253,6 +253,63 @@ class QuickAdapterRegressorV3(BaseRegressionModel): return 0.5 return None + def _get_distance_metric(self, label_metric: str) -> tuple[str, str, str]: + """Resolve distance metric for composite label metrics. + + Args: + label_metric: Label metric name. + + Returns: + Tuple (distance_metric, param_name, default_metric). + Returns (label_metric, "", "") when label_metric is not composite. + """ + # Mapping: label_metric -> (param_name, default_metric) + composite_metrics: dict[str, tuple[str, str]] = { + QuickAdapterRegressorV3._CUSTOM_METRICS[16]: ( # "medoid" + "label_medoid_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" + ), + QuickAdapterRegressorV3._CUSTOM_METRICS[9]: ( # "kmeans" + "label_kmeans_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" + ), + QuickAdapterRegressorV3._CUSTOM_METRICS[10]: ( # "kmeans2" + "label_kmeans_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" + ), + QuickAdapterRegressorV3._CUSTOM_METRICS[11]: ( # "kmedoids" + "label_kmedoids_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" + ), + QuickAdapterRegressorV3._CUSTOM_METRICS[12]: ( # "knn_power_mean" + "label_knn_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[5], # "minkowski" + ), + QuickAdapterRegressorV3._CUSTOM_METRICS[13]: ( # "knn_quantile" + "label_knn_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[5], # "minkowski" + ), + QuickAdapterRegressorV3._CUSTOM_METRICS[14]: ( # "knn_min" + "label_knn_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[5], # "minkowski" + ), + QuickAdapterRegressorV3._CUSTOM_METRICS[15]: ( # "knn_max" + "label_knn_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[5], # "minkowski" + ), + QuickAdapterRegressorV3._CUSTOM_METRICS[17]: ( # "topsis" + "label_topsis_metric", + QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" + ), + } + + if label_metric not in composite_metrics: + return (label_metric, "", "") + + param_name, default_metric = composite_metrics[label_metric] + distance_metric = self.ft_params.get(param_name, default_metric) + return (distance_metric, param_name, default_metric) + @property def _optuna_config(self) -> dict[str, Any]: optuna_default_config = { @@ -632,72 +689,17 @@ class QuickAdapterRegressorV3(BaseRegressionModel): }: label_p_order_is_used = True label_p_order_reason = label_metric - elif ( - label_metric == QuickAdapterRegressorV3._CUSTOM_METRICS[16] - ): # "medoid" - label_medoid_metric = self.ft_params.get( - "label_medoid_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" default - ) - if ( - label_medoid_metric == QuickAdapterRegressorV3._SCIPY_METRICS[5] - ): # "minkowski" - label_p_order_is_used = True - label_p_order_reason = f"{label_metric} (via label_medoid_metric={label_medoid_metric})" - elif label_metric in { - QuickAdapterRegressorV3._CUSTOM_METRICS[9], # "kmeans" - QuickAdapterRegressorV3._CUSTOM_METRICS[10], # "kmeans2" - }: - label_kmeans_metric = self.ft_params.get( - "label_kmeans_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" default - ) - if ( - label_kmeans_metric == QuickAdapterRegressorV3._SCIPY_METRICS[5] - ): # "minkowski" - label_p_order_is_used = True - label_p_order_reason = f"{label_metric} (via label_kmeans_metric={label_kmeans_metric})" - elif ( - label_metric == QuickAdapterRegressorV3._CUSTOM_METRICS[11] - ): # "kmedoids" - label_kmedoids_metric = self.ft_params.get( - "label_kmedoids_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" default - ) - if ( - label_kmedoids_metric == QuickAdapterRegressorV3._SCIPY_METRICS[5] - ): # "minkowski" - label_p_order_is_used = True - label_p_order_reason = f"{label_metric} (via label_kmedoids_metric={label_kmedoids_metric})" - elif label_metric in { - QuickAdapterRegressorV3._CUSTOM_METRICS[12], # "knn_power_mean" - QuickAdapterRegressorV3._CUSTOM_METRICS[13], # "knn_quantile" - QuickAdapterRegressorV3._CUSTOM_METRICS[14], # "knn_min" - QuickAdapterRegressorV3._CUSTOM_METRICS[15], # "knn_max" - }: - label_knn_metric = self.ft_params.get( - "label_knn_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[5], # "minkowski" default - ) + else: + distance_metric, param_name, _ = self._get_distance_metric(label_metric) if ( - label_knn_metric == QuickAdapterRegressorV3._SCIPY_METRICS[5] - ): # "minkowski" + param_name + and distance_metric + == QuickAdapterRegressorV3._SCIPY_METRICS[5] # "minkowski" + ): label_p_order_is_used = True label_p_order_reason = ( - f"{label_metric} (via label_knn_metric={label_knn_metric})" + f"{label_metric} (via {param_name}={distance_metric})" ) - elif ( - label_metric == QuickAdapterRegressorV3._CUSTOM_METRICS[17] - ): # "topsis" - label_topsis_metric = self.ft_params.get( - "label_topsis_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" default - ) - if ( - label_topsis_metric == QuickAdapterRegressorV3._SCIPY_METRICS[5] - ): # "minkowski" - label_p_order_is_used = True - label_p_order_reason = f"{label_metric} (via label_topsis_metric={label_topsis_metric})" if label_p_order_config is not None: logger.info( @@ -723,25 +725,15 @@ class QuickAdapterRegressorV3(BaseRegressionModel): f" label_p_order: {format_number(label_p_order_default)} (default for {label_p_order_reason})" ) - label_medoid_metric_config = self.ft_params.get("label_medoid_metric") - if label_medoid_metric_config is not None: - logger.info(f" label_medoid_metric: {label_medoid_metric_config}") - elif ( - label_metric == QuickAdapterRegressorV3._CUSTOM_METRICS[16] - ): # "medoid" - logger.info( - f" label_medoid_metric: {QuickAdapterRegressorV3._SCIPY_METRICS[2]} (default for {label_metric})" - ) - label_kmeans_metric_config = self.ft_params.get("label_kmeans_metric") - if label_kmeans_metric_config is not None: - logger.info(f" label_kmeans_metric: {label_kmeans_metric_config}") - elif label_metric in { - QuickAdapterRegressorV3._CUSTOM_METRICS[9], # "kmeans" - QuickAdapterRegressorV3._CUSTOM_METRICS[10], # "kmeans2" - }: - logger.info( - f" label_kmeans_metric: {QuickAdapterRegressorV3._SCIPY_METRICS[2]} (default for {label_metric})" - ) + _, param_name, default_metric = self._get_distance_metric(label_metric) + if param_name: + config_value = self.ft_params.get(param_name) + if config_value is not None: + logger.info(f" {param_name}: {config_value}") + else: + logger.info( + f" {param_name}: {default_metric} (default for {label_metric})" + ) label_kmeans_selection_config = self.ft_params.get("label_kmeans_selection") if label_kmeans_selection_config is not None: @@ -755,15 +747,6 @@ class QuickAdapterRegressorV3(BaseRegressionModel): logger.info( f" label_kmeans_selection: {QuickAdapterRegressorV3._CLUSTER_SELECTION_METHODS[1]} (default for {label_metric})" ) - label_kmedoids_metric_config = self.ft_params.get("label_kmedoids_metric") - if label_kmedoids_metric_config is not None: - logger.info(f" label_kmedoids_metric: {label_kmedoids_metric_config}") - elif ( - label_metric == QuickAdapterRegressorV3._CUSTOM_METRICS[11] - ): # "kmedoids" - logger.info( - f" label_kmedoids_metric: {QuickAdapterRegressorV3._SCIPY_METRICS[2]} (default for {label_metric})" - ) label_kmedoids_selection_config = self.ft_params.get( "label_kmedoids_selection" @@ -779,19 +762,6 @@ class QuickAdapterRegressorV3(BaseRegressionModel): f" label_kmedoids_selection: {QuickAdapterRegressorV3._CLUSTER_SELECTION_METHODS[1]} (default for {label_metric})" ) - label_knn_metric_config = self.ft_params.get("label_knn_metric") - if label_knn_metric_config is not None: - logger.info(f" label_knn_metric: {label_knn_metric_config}") - elif label_metric in { - QuickAdapterRegressorV3._CUSTOM_METRICS[12], # "knn_power_mean" - QuickAdapterRegressorV3._CUSTOM_METRICS[13], # "knn_quantile" - QuickAdapterRegressorV3._CUSTOM_METRICS[14], # "knn_min" - QuickAdapterRegressorV3._CUSTOM_METRICS[15], # "knn_max" - }: - logger.info( - f" label_knn_metric: {QuickAdapterRegressorV3._SCIPY_METRICS[5]} (default for {label_metric})" - ) - label_knn_n_neighbors = self.ft_params.get("label_knn_n_neighbors") if label_knn_n_neighbors is not None: logger.info(f" label_knn_n_neighbors: {label_knn_n_neighbors}") @@ -821,16 +791,6 @@ class QuickAdapterRegressorV3(BaseRegressionModel): f" label_knn_p_order: {format_number(label_knn_p_order_default)} (default for {label_metric})" ) - label_topsis_metric_config = self.ft_params.get("label_topsis_metric") - if label_topsis_metric_config is not None: - logger.info(f" label_topsis_metric: {label_topsis_metric_config}") - elif ( - label_metric == QuickAdapterRegressorV3._CUSTOM_METRICS[17] - ): # "topsis" - logger.info( - f" label_topsis_metric: {QuickAdapterRegressorV3._SCIPY_METRICS[2]} (default for {label_metric})" - ) - logger.info("Predictions Extrema Configuration:") predictions_extrema = self.predictions_extrema logger.info( @@ -1630,36 +1590,20 @@ class QuickAdapterRegressorV3(BaseRegressionModel): weights: Optional[NDArray[np.floating]] = None, p: Optional[float] = None, ) -> NDArray[np.floating]: - """ - Calculate the sum of pairwise distances for each sample in a matrix. - - Typical usage: medoid selection by taking argmin of the returned vector. + """Compute sum of pairwise distances per row. - Parameters: - - matrix: 2D array (n_samples, n_features), assumed normalized. - Must contain only finite values (no NaN or inf). - - metric: distance metric name accepted by scipy.spatial.distance.pdist. - - weights: optional weight vector per feature (passed as 'w' to pdist). - Not supported by metrics in _UNSUPPORTED_CLUSTER_METRICS. - Must have size equal to n_features and contain finite non-negative values. - - p: optional Minkowski order (default 2.0 if metric=='minkowski'). + Args: + matrix: 2D array, shape (n_samples, n_features). + Must contain only finite values (no NaN or inf). + metric: scipy.spatial.distance.pdist metric name. + weights: Optional 1D array, shape (n_features,). + Must be finite and non-negative. + p: Minkowski order, used only when metric == 'minkowski'. Returns: - - 1D array of shape (n_samples,) with sum of distances per sample. - - Notes: - - For n_samples==0, returns empty array []. - - For n_samples==1, returns [0.0]. - - Raises ValueError if matrix is not 2D, has 0 features, contains non-finite values, - or if weights are invalid or incompatible with the metric. - - Memory usage: O(n²/2) for the condensed distance vector. - - Time complexity: O(n² × d) where d is the number of features. - - Example: - >>> matrix = np.array([[0.0, 0.0], [1.0, 0.0], [0.0, 1.0]]) - >>> _pairwise_distance_sums(matrix, "euclidean") - array([2. , 2.41421356, 2.41421356]) + 1D array, shape (n_samples,). Returns [] when n_samples == 0, [0.0] when n_samples == 1. """ + if matrix.ndim != 2: raise ValueError("Invalid matrix: must be 2-dimensional") if matrix.shape[1] == 0: @@ -1714,6 +1658,87 @@ class QuickAdapterRegressorV3(BaseRegressionModel): return sums + @staticmethod + def _topsis_scores( + normalized_matrix: NDArray[np.floating], + metric: str, + *, + weights: Optional[NDArray[np.floating]] = None, + p: Optional[float] = None, + ) -> NDArray[np.floating]: + """Compute TOPSIS score S = D+ / (D+ + D-) per row. + + Args: + normalized_matrix: 2D array, shape (n_samples, n_objectives), values in [0, 1]. + Must contain only finite values (no NaN or inf). + metric: scipy.spatial.distance.cdist metric name. + weights: Optional 1D array, shape (n_objectives,). + Must be finite and non-negative. + p: Minkowski order, used only when metric == 'minkowski'. + + Returns: + 1D array, shape (n_samples,), values in [0, 1]. Lower is better. + Returns [] when n_samples == 0, [0.5] when n_samples == 1. + """ + if normalized_matrix.ndim != 2: + raise ValueError("Invalid normalized_matrix: must be 2-dimensional") + + n_samples, n_objectives = normalized_matrix.shape + if n_objectives == 0: + raise ValueError( + "Invalid normalized_matrix: must have at least one objective" + ) + + if not np.all(np.isfinite(normalized_matrix)): + raise ValueError( + "Invalid normalized_matrix: must contain only finite values (no NaN or inf)" + ) + + if weights is not None: + if weights.size != n_objectives: + raise ValueError( + f"Invalid weights: size {weights.size} must match number of objectives {n_objectives}" + ) + if not np.all(np.isfinite(weights)) or np.any(weights < 0): + raise ValueError("Invalid weights: must be finite and non-negative") + + normalized_matrix = np.asarray(normalized_matrix, dtype=np.float64) + if weights is not None: + weights = np.asarray(weights, dtype=np.float64) + + if n_samples == 0: + return np.array([]) + if n_samples == 1: + return np.array([0.5]) + + ideal_point = np.ones((1, n_objectives)) + anti_ideal_point = np.zeros((1, n_objectives)) + + cdist_kwargs: dict[str, Any] = {} + if weights is not None: + cdist_kwargs["w"] = weights + if ( + metric == QuickAdapterRegressorV3._SCIPY_METRICS[5] # "minkowski" + and p is not None + and np.isfinite(p) + ): + cdist_kwargs["p"] = p + + dist_to_ideal = sp.spatial.distance.cdist( + normalized_matrix, ideal_point, metric=metric, **cdist_kwargs + ).flatten() + dist_to_anti_ideal = sp.spatial.distance.cdist( + normalized_matrix, anti_ideal_point, metric=metric, **cdist_kwargs + ).flatten() + + denominator = dist_to_ideal + dist_to_anti_ideal + zero_mask = np.isclose(denominator, 0.0) + denominator[zero_mask] = 1.0 + scores = dist_to_ideal / denominator + scores[zero_mask] = 0.5 + + return scores + @staticmethod def _normalize_objective_values( objective_values_matrix: NDArray[np.floating], @@ -1934,10 +1959,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[8]: # "weighted_sum" return (ideal_point - normalized_matrix) @ np_weights elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[16]: # "medoid" - label_medoid_metric = self.ft_params.get( - "label_medoid_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" - ) + label_medoid_metric, _, _ = self._get_distance_metric(metric) if ( label_medoid_metric in QuickAdapterRegressorV3._unsupported_cluster_metrics_set() @@ -1977,10 +1999,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): cluster_centers, cluster_labels = sp.cluster.vq.kmeans2( normalized_matrix, n_clusters, rng=42, minit="++" ) - label_kmeans_metric = self.ft_params.get( - "label_kmeans_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" - ) + label_kmeans_metric, _, _ = self._get_distance_metric(metric) if ( label_kmeans_metric in QuickAdapterRegressorV3._unsupported_cluster_metrics_set() @@ -2070,10 +2089,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): return trial_distances elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[11]: # "kmedoids" n_clusters = QuickAdapterRegressorV3._get_n_clusters(normalized_matrix) - label_kmedoids_metric = self.ft_params.get( - "label_kmedoids_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" - ) + label_kmedoids_metric, _, _ = self._get_distance_metric(metric) if ( label_kmedoids_metric in QuickAdapterRegressorV3._unsupported_cluster_metrics_set() @@ -2157,10 +2173,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): QuickAdapterRegressorV3._CUSTOM_METRICS[14], # "knn_min" QuickAdapterRegressorV3._CUSTOM_METRICS[15], # "knn_max" }: - label_knn_metric = self.ft_params.get( - "label_knn_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[5], # "minkowski" - ) + label_knn_metric, _, _ = self._get_distance_metric(metric) knn_kwargs: dict[str, Any] = {} if ( label_knn_metric @@ -2215,38 +2228,23 @@ class QuickAdapterRegressorV3(BaseRegressionModel): elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[15]: # "knn_max" return np.nanmax(neighbor_distances, axis=1) elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[17]: # "topsis" - # TOPSIS (Hwang & Yoon, 1981): returns D+ / (D+ + D-) for argmin selection - # where D+ = distance to ideal [1,1,...], D- = distance to anti-ideal [0,0,...] - label_topsis_metric = self.ft_params.get( - "label_topsis_metric", - QuickAdapterRegressorV3._SCIPY_METRICS[2], # "euclidean" - ) - cdist_kwargs: dict[str, Any] = { - "metric": label_topsis_metric, - "w": np_weights, - } + label_topsis_metric, _, _ = self._get_distance_metric(metric) + p = None if ( label_topsis_metric == QuickAdapterRegressorV3._SCIPY_METRICS[5] # "minkowski" ): - cdist_kwargs["p"] = ( + p = ( label_p_order if label_p_order is not None and np.isfinite(label_p_order) else self._get_label_p_order_default(label_topsis_metric) ) - dist_to_ideal = sp.spatial.distance.cdist( - normalized_matrix, ideal_point_2d, **cdist_kwargs - ).flatten() - dist_to_anti_ideal = sp.spatial.distance.cdist( - normalized_matrix, np.zeros((1, n_objectives)), **cdist_kwargs - ).flatten() - - denominator = dist_to_ideal + dist_to_anti_ideal - zero_mask = np.isclose(denominator, 0.0) - denominator[zero_mask] = 1.0 - topsis_score = dist_to_ideal / denominator - topsis_score[zero_mask] = 0.5 - return topsis_score + return QuickAdapterRegressorV3._topsis_scores( + normalized_matrix, + label_topsis_metric, + weights=np_weights, + p=p, + ) else: raise ValueError( f"Invalid label metric {metric!r}. Supported: {', '.join(metrics)}" @@ -2399,7 +2397,16 @@ class QuickAdapterRegressorV3(BaseRegressionModel): "values": self.get_optuna_values(pair, namespace), **self.get_optuna_params(pair, namespace), } - metric_log_msg = f" using {self.ft_params.get('label_metric', QuickAdapterRegressorV3._SCIPY_METRICS[2])} metric" + label_metric = self.ft_params.get( + "label_metric", QuickAdapterRegressorV3._SCIPY_METRICS[2] + ) + distance_metric, param_name, _ = self._get_distance_metric(label_metric) + if param_name: + metric_log_msg = ( + f" using {label_metric} metric ({distance_metric} distance)" + ) + else: + metric_log_msg = f" using {label_metric} metric" logger.info( f"[{pair}] Optuna {namespace} {objective_type} objective hyperopt completed{metric_log_msg} ({time_spent:.2f} secs)" ) -- 2.43.0