https://github.com/sponsors/robcaulk
"""
- version = "3.7.69"
+ version = "3.7.70"
@cached_property
def _optuna_config(self) -> dict:
n_objectives = len(study.directions)
label_trials_selection = self.ft_params.get(
- "label_trials_selection", "quantile"
+ "label_trials_selection", "euclidean"
)
- if label_trials_selection not in ["quantile", "chebyshev", "euclidean"]:
+ metrics = {"euclidean", "chebyshev", "manhattan", "minkowski"}
+ if label_trials_selection not in metrics:
raise ValueError(
- f"Unsupported label trials selection method: {label_trials_selection}. Supported methods are 'quantile', 'chebyshev', and 'euclidean'"
- )
- if label_trials_selection in ["quantile"] and n_objectives != 2:
- raise ValueError(
- f"{label_trials_selection} requires exactly 2 objectives, got {n_objectives}"
+ f"Unsupported label trials selection method: {label_trials_selection}. Supported methods are {', '.join(metrics)}"
)
- best_trials = [
- trial
- for trial in study.best_trials
- if trial.values is not None and len(trial.values) == n_objectives
- ]
+ best_trials = []
+ for trial in study.best_trials:
+ if trial.values is not None and len(trial.values) == n_objectives:
+ valid_trial = True
+ for value in trial.values:
+ if not isinstance(value, (int, float)):
+ valid_trial = False
+ break
+ if np.isnan(value):
+ valid_trial = False
+ break
+ if valid_trial:
+ best_trials.append(trial)
if not best_trials:
return None
- if label_trials_selection == "quantile":
- if len(best_trials) == 1:
- return best_trials[0]
-
- def compare_primary_objective(
- trial_a: optuna.trial.FrozenTrial,
- trial_b: optuna.trial.FrozenTrial,
- direction: optuna.study.StudyDirection,
- ) -> bool:
- if direction == optuna.study.StudyDirection.MAXIMIZE:
- return trial_a.values[0] > trial_b.values[0]
- return trial_a.values[0] < trial_b.values[0]
-
- def is_better_above_candidate(
- candidate_trial: optuna.trial.FrozenTrial,
- previous_trial: Optional[optuna.trial.FrozenTrial],
- direction: optuna.study.StudyDirection,
- target_pivot_size: float,
- ) -> bool:
- if not previous_trial:
- return True
-
- candidate_distance = candidate_trial.values[1] - target_pivot_size
- previous_distance = previous_trial.values[1] - target_pivot_size
-
- if not np.isclose(candidate_distance, previous_distance):
- return candidate_distance < previous_distance
-
- return compare_primary_objective(
- candidate_trial, previous_trial, direction
- )
-
- def is_better_below_candidate(
- candidate_trial: optuna.trial.FrozenTrial,
- previous_trial: Optional[optuna.trial.FrozenTrial],
- direction: optuna.study.StudyDirection,
- target_pivot_size: float,
- ) -> bool:
- if not previous_trial:
- return True
-
- candidate_distance = target_pivot_size - candidate_trial.values[1]
- previous_distance = target_pivot_size - previous_trial.values[1]
-
- if not np.isclose(candidate_distance, previous_distance):
- return candidate_distance < previous_distance
-
- return compare_primary_objective(
- candidate_trial, previous_trial, direction
- )
-
- pivots_sizes = [trial.values[1] for trial in best_trials]
- label_quantile = float(self.ft_params.get("label_quantile", 0.5))
- if not (0.0 <= label_quantile <= 1.0):
- raise ValueError("label_quantile must be between 0.0 and 1.0")
- quantile_pivots_size = np.quantile(pivots_sizes, label_quantile)
-
- direction0 = study.directions[0]
-
- equal_quantile_pivots_size_trials = [
- trial
- for trial in best_trials
- if np.isclose(trial.values[1], quantile_pivots_size)
- ]
- if equal_quantile_pivots_size_trials:
- if direction0 == optuna.study.StudyDirection.MAXIMIZE:
- return max(
- equal_quantile_pivots_size_trials,
- key=lambda trial: trial.values[0],
- )
- else:
- return min(
- equal_quantile_pivots_size_trials,
- key=lambda trial: trial.values[0],
- )
-
- nearest_above_quantile = None
- nearest_below_quantile = None
- for trial in best_trials:
- pivots_size = trial.values[1]
- if pivots_size >= quantile_pivots_size:
- if is_better_above_candidate(
- trial, nearest_above_quantile, direction0, quantile_pivots_size
- ):
- nearest_above_quantile = trial
-
- if pivots_size <= quantile_pivots_size:
- if is_better_below_candidate(
- trial, nearest_below_quantile, direction0, quantile_pivots_size
- ):
- nearest_below_quantile = trial
-
- if not nearest_above_quantile and not nearest_below_quantile:
- return None
- if not nearest_above_quantile:
- return nearest_below_quantile
- if not nearest_below_quantile:
- return nearest_above_quantile
-
- def tie_break_selection(
- above: optuna.trial.FrozenTrial,
- below: optuna.trial.FrozenTrial,
- direction: optuna.study.StudyDirection,
- ) -> optuna.trial.FrozenTrial:
- above_quantile_distance = abs(above.values[1] - quantile_pivots_size)
- below_quantile_distance = abs(quantile_pivots_size - below.values[1])
-
- if above_quantile_distance < below_quantile_distance:
- return above
- elif above_quantile_distance > below_quantile_distance:
- return below
- else:
- if direction == optuna.study.StudyDirection.MAXIMIZE:
- return max([above, below], key=lambda trial: trial.values[1])
- else:
- return min([above, below], key=lambda trial: trial.values[1])
-
- def final_selection(
- above: optuna.trial.FrozenTrial,
- below: optuna.trial.FrozenTrial,
- direction: optuna.study.StudyDirection,
- ) -> optuna.trial.FrozenTrial:
- if direction == optuna.study.StudyDirection.MAXIMIZE:
- return above if above.values[0] > below.values[0] else below
- else:
- return above if above.values[0] < below.values[0] else below
-
- if np.isclose(
- nearest_above_quantile.values[0], nearest_below_quantile.values[0]
- ):
- return tie_break_selection(
- nearest_above_quantile,
- nearest_below_quantile,
- study.directions[1],
+ def calculate_distances(
+ normalized: np.ndarray,
+ metric: str,
+ p_order: float = self.ft_params.get("label_p_order", 2.0),
+ ) -> np.ndarray:
+ ideal_point = np.full(normalized.shape[1], 1.0)
+
+ if metric == "euclidean":
+ return np.linalg.norm(normalized - ideal_point, axis=1)
+ elif metric == "chebyshev":
+ return np.max(np.abs(normalized - ideal_point), axis=1)
+ elif metric == "manhattan":
+ return np.sum(np.abs(normalized - ideal_point), axis=1)
+ elif metric == "minkowski":
+ if p_order < 1:
+ raise ValueError("Minkowski p_order must be >= 1")
+ return np.power(
+ np.sum(np.power(np.abs(normalized - ideal_point), p_order), axis=1),
+ 1.0 / p_order,
)
else:
- return final_selection(
- nearest_above_quantile, nearest_below_quantile, direction0
- )
- elif label_trials_selection in ["chebyshev", "euclidean"]:
-
- def calculate_distances(normalized: np.ndarray, metric: str) -> np.ndarray:
- ideal_point = np.full(normalized.shape[1], 1.0)
+ raise ValueError(f"Unsupported distance metric: {metric}")
- if metric == "chebyshev":
- return np.max(np.abs(normalized - ideal_point), axis=1)
- elif metric == "euclidean":
- return np.linalg.norm(normalized - ideal_point, axis=1)
- # elif metric == "manhattan":
- # return np.sum(np.abs(normalized - ideal_point), axis=1)
- else:
- raise ValueError(f"Unsupported distance metric: {metric}")
-
- objective_values_matrix = np.array([trial.values for trial in best_trials])
-
- normalized_matrix = np.zeros_like(objective_values_matrix, dtype=float)
+ objective_values_matrix = np.array([trial.values for trial in best_trials])
+ normalized_matrix = np.zeros_like(objective_values_matrix, dtype=float)
- for i in range(objective_values_matrix.shape[1]):
- col = objective_values_matrix[:, i]
- min_val = np.min(col)
- max_val = np.max(col)
- range_val = max_val - min_val
+ for i in range(objective_values_matrix.shape[1]):
+ current_column = objective_values_matrix[:, i]
+ current_direction = study.directions[i]
- if np.isclose(range_val, 0):
- normalized_matrix[:, i] = 0.5
+ is_neg_inf_mask = np.isneginf(current_column)
+ is_pos_inf_mask = np.isposinf(current_column)
+ if current_direction == optuna.study.StudyDirection.MAXIMIZE:
+ normalized_matrix[is_neg_inf_mask, i] = 0.0
+ normalized_matrix[is_pos_inf_mask, i] = 1.0
+ else:
+ normalized_matrix[is_neg_inf_mask, i] = 1.0
+ normalized_matrix[is_pos_inf_mask, i] = 0.0
+
+ is_finite_mask = np.isfinite(current_column)
+
+ if is_finite_mask.any():
+ finite_col = current_column[is_finite_mask]
+ finite_min_val = np.min(finite_col)
+ finite_max_val = np.max(finite_col)
+ finite_range_val = finite_max_val - finite_min_val
+
+ if np.isclose(finite_range_val, 0):
+ if is_pos_inf_mask.any() and is_neg_inf_mask.any():
+ normalized_matrix[is_finite_mask, i] = 0.5
+ elif is_pos_inf_mask.any():
+ normalized_matrix[is_finite_mask, i] = (
+ 0.0
+ if current_direction == optuna.study.StudyDirection.MAXIMIZE
+ else 1.0
+ )
+ elif is_neg_inf_mask.any():
+ normalized_matrix[is_finite_mask, i] = (
+ 1.0
+ if current_direction == optuna.study.StudyDirection.MAXIMIZE
+ else 0.0
+ )
+ else:
+ normalized_matrix[is_finite_mask, i] = 0.5
else:
- if study.directions[i] == optuna.study.StudyDirection.MAXIMIZE:
- normalized_matrix[:, i] = (col - min_val) / range_val
+ if current_direction == optuna.study.StudyDirection.MAXIMIZE:
+ normalized_matrix[is_finite_mask, i] = (
+ finite_col - finite_min_val
+ ) / finite_range_val
else:
- normalized_matrix[:, i] = (max_val - col) / range_val
+ normalized_matrix[is_finite_mask, i] = (
+ finite_max_val - finite_col
+ ) / finite_range_val
- trial_distances = calculate_distances(
- normalized_matrix, label_trials_selection
- )
- if trial_distances.size == 0:
- return None
+ trial_distances = calculate_distances(normalized_matrix, label_trials_selection)
- return best_trials[np.argmin(trial_distances)]
+ return best_trials[np.argmin(trial_distances)]
def optuna_optimize(
self,