return None
n_objectives = len(study.directions)
+ if n_objectives < 2:
+ raise ValueError(
+ f"Multi-objective study must have at least 2 objectives, but got {n_objectives}"
+ )
label_metric = self.ft_params.get("label_metric", "euclidean")
metrics = {
"chebyshev",
"manhattan",
"minkowski",
- "canberra",
- "braycurtis",
"hellinger",
"geometric_mean",
+ "hypervolume",
+ "weighted_sum",
+ "tchebichev",
+ "mahalanobis",
+ "d1",
+ "d2",
}
if label_metric not in metrics:
raise ValueError(
f"Unsupported label metric: {label_metric}. Supported metrics are {', '.join(metrics)}"
)
- best_trials = []
- for trial in study.best_trials:
- if trial.values is not None and len(trial.values) == n_objectives:
- valid_trial = True
- for value in trial.values:
- if not isinstance(value, (int, float)):
- valid_trial = False
- break
- if np.isnan(value):
- valid_trial = False
- break
- if valid_trial:
- best_trials.append(trial)
+ best_trials = [
+ trial
+ for trial in study.best_trials
+ if (
+ trial.values is not None
+ and len(trial.values) == n_objectives
+ and all(
+ isinstance(value, (int, float)) and not np.isnan(value)
+ for value in trial.values
+ )
+ )
+ ]
if not best_trials:
return None
metric: str,
p_order: float = self.ft_params.get("label_p_order", 2.0),
) -> np.ndarray:
- ideal_point = np.full(normalized_matrix.shape[1], 1.0)
-
- if metric == "euclidean":
- return np.linalg.norm(normalized_matrix - ideal_point, axis=1)
- elif metric == "chebyshev":
- return np.max(np.abs(normalized_matrix - ideal_point), axis=1)
- elif metric == "manhattan":
- return np.sum(np.abs(normalized_matrix - ideal_point), axis=1)
- elif metric == "minkowski":
- if p_order < 1:
- raise ValueError("Minkowski p_order must be >= 1")
- return np.power(
- np.sum(
- np.power(np.abs(normalized_matrix - ideal_point), p_order),
- axis=1,
- ),
- 1.0 / p_order,
- )
- elif metric == "canberra":
- return np.sum(
- np.abs(normalized_matrix - ideal_point)
- / (np.abs(normalized_matrix) + np.abs(ideal_point)),
- axis=1,
- )
- elif metric == "braycurtis":
- return np.divide(
- np.sum(np.abs(normalized_matrix - ideal_point), axis=1),
- np.sum(normalized_matrix + ideal_point, axis=1),
- out=np.zeros(normalized_matrix.shape[0], dtype=float),
- where=(np.sum(normalized_matrix + ideal_point, axis=1) != 0),
+ ideal_point = np.ones(normalized_matrix.shape[1])
+
+ if metric == "minkowski" and p_order < 1:
+ raise ValueError("p_order must be >= 1 for the 'minkowski' metric")
+
+ if metric in {"euclidean", "manhattan", "chebyshev", "minkowski"}:
+ order = {
+ "euclidean": 2,
+ "manhattan": 1,
+ "chebyshev": np.inf,
+ "minkowski": p_order,
+ }[metric]
+ return np.linalg.norm(
+ normalized_matrix - ideal_point, ord=order, axis=1
)
elif metric == "hellinger":
return np.sqrt(np.sum((np.sqrt(normalized_matrix) - 1.0) ** 2, axis=1))
elif metric == "geometric_mean":
- return (
- np.array([])
- if normalized_matrix.shape[1] == 0
- else 1.0
- - (
- np.prod(normalized_matrix, axis=1)
- ** (1.0 / normalized_matrix.shape[1])
+ return 1.0 - np.prod(normalized_matrix, axis=1) ** (
+ 1.0 / normalized_matrix.shape[1]
+ )
+ elif metric == "hypervolume":
+ return 1.0 - np.prod(normalized_matrix, axis=1)
+ elif metric == "weighted_sum":
+ weights = self.ft_params.get(
+ "label_weights", [1.0] * normalized_matrix.shape[1]
+ )
+ if len(weights) != normalized_matrix.shape[1]:
+ raise ValueError(
+ "label_weights length must match number of objectives"
)
+ return np.sum(np.array(weights) * (1.0 - normalized_matrix), axis=1)
+ elif metric == "tchebichev":
+ weights = self.ft_params.get(
+ "label_weights", [1.0] * normalized_matrix.shape[1]
+ )
+ if len(weights) != normalized_matrix.shape[1]:
+ raise ValueError(
+ "label_weights length must match number of objectives"
+ )
+ return np.max(np.array(weights) * (1.0 - normalized_matrix), axis=1)
+ elif metric == "mahalanobis":
+ if normalized_matrix.shape[0] < 2:
+ return np.linalg.norm(
+ normalized_matrix - ideal_point, ord=2, axis=1
+ )
+ cov_matrix = np.cov(normalized_matrix.T)
+ inv_cov = np.linalg.pinv(cov_matrix)
+ diff = normalized_matrix - ideal_point
+ return np.sqrt(np.einsum("ij,ji->i", diff @ inv_cov, diff.T))
+ elif metric == "d1":
+ if normalized_matrix.shape[0] < 2:
+ return np.full(normalized_matrix.shape[0], np.inf)
+ nbrs = sklearn.neighbors.NearestNeighbors(n_neighbors=2).fit(
+ normalized_matrix
+ )
+ distances, _ = nbrs.kneighbors(normalized_matrix)
+ if distances.shape[1] < 2:
+ return np.zeros(normalized_matrix.shape[0])
+ return distances[:, 1]
+ elif metric == "d2":
+ if normalized_matrix.shape[0] < 2:
+ return np.full(normalized_matrix.shape[0], np.inf)
+ k = min(4, normalized_matrix.shape[0] - 1) + 1
+ nbrs = sklearn.neighbors.NearestNeighbors(n_neighbors=k).fit(
+ normalized_matrix
)
+ distances, _ = nbrs.kneighbors(normalized_matrix)
+ return np.mean(distances[:, 1:], axis=1)
else:
raise ValueError(f"Unsupported distance metric: {metric}")
**self.get_optuna_params(pair, namespace),
}
logger.info(
- f"Optuna {pair} {namespace} {objective_type} objective done ({time_spent:.2f} secs)"
+ f"Optuna {pair} {namespace} {objective_type} objective done using {self.ft_params.get('label_metric', 'euclidean')} metric ({time_spent:.2f} secs)"
)
for key, value in study_results.items():
logger.info(