From a0e91ae68b2c7a64fae0307071b232ae617ed03d Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 3 Jun 2025 16:15:22 +0200 Subject: [PATCH] feat(qav3): add kmeans to ideal point clustering multi objective trial selection MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../freqaimodels/QuickAdapterRegressorV3.py | 78 +++++++++++++++++-- 1 file changed, 70 insertions(+), 8 deletions(-) diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index ce3f629..27a48e6 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -442,6 +442,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): "harmonic_mean", "power_mean", "weighted_sum", + "kmeans", "knn_d1", "knn_d2_mean", "knn_d2_median", @@ -471,18 +472,20 @@ class QuickAdapterRegressorV3(BaseRegressionModel): def calculate_distances( normalized_matrix: np.ndarray, metric: str ) -> np.ndarray: + n_objectives = normalized_matrix.shape[1] + n_samples = normalized_matrix.shape[0] label_p_order = float(self.ft_params.get("label_p_order", 2.0)) np_weights = np.array( - self.ft_params.get("label_weights", [1.0] * normalized_matrix.shape[1]) + self.ft_params.get("label_weights", [1.0] * n_objectives) ) - if np_weights.size != normalized_matrix.shape[1]: + if np_weights.size != n_objectives: raise ValueError("label_weights length must match number of objectives") knn_kwargs = {} label_knn_metric = self.ft_params.get("label_knn_metric", "euclidean") if label_knn_metric == "minkowski" and isinstance(label_p_order, float): knn_kwargs["p"] = label_p_order - ideal_point = np.ones(normalized_matrix.shape[1]) + ideal_point = np.ones(n_objectives) if metric in { "braycurtis", @@ -545,21 +548,80 @@ class QuickAdapterRegressorV3(BaseRegressionModel): ) - sp.stats.pmean(normalized_matrix, p=p, weights=np_weights, axis=1) elif metric == "weighted_sum": return np.sum(np_weights * (ideal_point - normalized_matrix), axis=1) + elif metric == "kmeans": + label_kmeans_metric = self.ft_params.get( + "label_kmeans_metric", "euclidean" + ) + cdist_kwargs = {} + if label_kmeans_metric == "minkowski" and isinstance( + label_p_order, float + ): + cdist_kwargs["p"] = label_p_order + if n_samples == 0: + return np.array([]) + if n_samples == 1: + return sp.spatial.distance.cdist( + normalized_matrix, + ideal_point.reshape(1, -1), + metric=label_kmeans_metric, + **cdist_kwargs, + ).flatten() + n_clusters = min(max(2, int(np.sqrt(n_samples / 2))), 10, n_samples) + kmeans = sklearn.cluster.KMeans( + n_clusters=n_clusters, random_state=42, n_init=10 + ) + cluster_labels = kmeans.fit_predict(normalized_matrix) + cluster_centers = kmeans.cluster_centers_ + cluster_distances_to_ideal = sp.spatial.distance.cdist( + cluster_centers, + ideal_point.reshape(1, -1), + metric=label_kmeans_metric, + **cdist_kwargs, + ).flatten() + best_cluster = np.argmin(cluster_distances_to_ideal) + best_center = cluster_centers[best_cluster].reshape(1, -1) + distances_to_best_cluster = sp.spatial.distance.cdist( + normalized_matrix, + best_center, + metric=label_kmeans_metric, + **cdist_kwargs, + ).flatten() + penalty_value = ( + np.clip( + ( + np.mean(np.delete(cluster_distances_to_ideal, best_cluster)) + / cluster_distances_to_ideal[best_cluster] + if cluster_distances_to_ideal[best_cluster] > 0 + else np.std( + np.delete(cluster_distances_to_ideal, best_cluster) + ) + if len(cluster_distances_to_ideal) > 1 + else 1.0 + ) + - 1.0, + 0.5, + 3.0, + ) + if len(cluster_distances_to_ideal) > 1 + else 1.0 + ) + penalties = np.where(cluster_labels == best_cluster, 0.0, penalty_value) + return distances_to_best_cluster + penalties elif metric == "knn_d1": - if normalized_matrix.shape[0] < 2: - return np.full(normalized_matrix.shape[0], np.inf) + if n_samples < 2: + return np.full(n_samples, np.inf) nbrs = sklearn.neighbors.NearestNeighbors( n_neighbors=2, metric=label_knn_metric, **knn_kwargs ).fit(normalized_matrix) distances, _ = nbrs.kneighbors(normalized_matrix) return distances[:, 1] elif metric in {"knn_d2_mean", "knn_d2_median", "knn_d2_max"}: - if normalized_matrix.shape[0] < 2: - return np.full(normalized_matrix.shape[0], np.inf) + if n_samples < 2: + return np.full(n_samples, np.inf) n_neighbors = ( min( int(self.ft_params.get("label_knn_d2_n_neighbors", 4)), - normalized_matrix.shape[0] - 1, + n_samples - 1, ) + 1 ) -- 2.43.0