From: Jérôme Benoit Date: Mon, 26 May 2025 12:44:38 +0000 (+0200) Subject: perf(qav3): switch to state of the art MO trials selection X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=b7cc38404ee818d7daa6b6d954528e2fdcd5574f;p=freqai-strategies.git perf(qav3): switch to state of the art MO trials selection Signed-off-by: Jérôme Benoit --- diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index bf311d6..0e595a5 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -45,7 +45,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.69" + version = "3.7.70" @cached_property def _optuna_config(self) -> dict: @@ -406,203 +406,107 @@ class QuickAdapterRegressorV3(BaseRegressionModel): n_objectives = len(study.directions) label_trials_selection = self.ft_params.get( - "label_trials_selection", "quantile" + "label_trials_selection", "euclidean" ) - if label_trials_selection not in ["quantile", "chebyshev", "euclidean"]: + metrics = {"euclidean", "chebyshev", "manhattan", "minkowski"} + if label_trials_selection not in metrics: raise ValueError( - f"Unsupported label trials selection method: {label_trials_selection}. Supported methods are 'quantile', 'chebyshev', and 'euclidean'" - ) - if label_trials_selection in ["quantile"] and n_objectives != 2: - raise ValueError( - f"{label_trials_selection} requires exactly 2 objectives, got {n_objectives}" + f"Unsupported label trials selection method: {label_trials_selection}. Supported methods are {', '.join(metrics)}" ) - best_trials = [ - trial - for trial in study.best_trials - if trial.values is not None and len(trial.values) == n_objectives - ] + best_trials = [] + for trial in study.best_trials: + if trial.values is not None and len(trial.values) == n_objectives: + valid_trial = True + for value in trial.values: + if not isinstance(value, (int, float)): + valid_trial = False + break + if np.isnan(value): + valid_trial = False + break + if valid_trial: + best_trials.append(trial) if not best_trials: return None - if label_trials_selection == "quantile": - if len(best_trials) == 1: - return best_trials[0] - - def compare_primary_objective( - trial_a: optuna.trial.FrozenTrial, - trial_b: optuna.trial.FrozenTrial, - direction: optuna.study.StudyDirection, - ) -> bool: - if direction == optuna.study.StudyDirection.MAXIMIZE: - return trial_a.values[0] > trial_b.values[0] - return trial_a.values[0] < trial_b.values[0] - - def is_better_above_candidate( - candidate_trial: optuna.trial.FrozenTrial, - previous_trial: Optional[optuna.trial.FrozenTrial], - direction: optuna.study.StudyDirection, - target_pivot_size: float, - ) -> bool: - if not previous_trial: - return True - - candidate_distance = candidate_trial.values[1] - target_pivot_size - previous_distance = previous_trial.values[1] - target_pivot_size - - if not np.isclose(candidate_distance, previous_distance): - return candidate_distance < previous_distance - - return compare_primary_objective( - candidate_trial, previous_trial, direction - ) - - def is_better_below_candidate( - candidate_trial: optuna.trial.FrozenTrial, - previous_trial: Optional[optuna.trial.FrozenTrial], - direction: optuna.study.StudyDirection, - target_pivot_size: float, - ) -> bool: - if not previous_trial: - return True - - candidate_distance = target_pivot_size - candidate_trial.values[1] - previous_distance = target_pivot_size - previous_trial.values[1] - - if not np.isclose(candidate_distance, previous_distance): - return candidate_distance < previous_distance - - return compare_primary_objective( - candidate_trial, previous_trial, direction - ) - - pivots_sizes = [trial.values[1] for trial in best_trials] - label_quantile = float(self.ft_params.get("label_quantile", 0.5)) - if not (0.0 <= label_quantile <= 1.0): - raise ValueError("label_quantile must be between 0.0 and 1.0") - quantile_pivots_size = np.quantile(pivots_sizes, label_quantile) - - direction0 = study.directions[0] - - equal_quantile_pivots_size_trials = [ - trial - for trial in best_trials - if np.isclose(trial.values[1], quantile_pivots_size) - ] - if equal_quantile_pivots_size_trials: - if direction0 == optuna.study.StudyDirection.MAXIMIZE: - return max( - equal_quantile_pivots_size_trials, - key=lambda trial: trial.values[0], - ) - else: - return min( - equal_quantile_pivots_size_trials, - key=lambda trial: trial.values[0], - ) - - nearest_above_quantile = None - nearest_below_quantile = None - for trial in best_trials: - pivots_size = trial.values[1] - if pivots_size >= quantile_pivots_size: - if is_better_above_candidate( - trial, nearest_above_quantile, direction0, quantile_pivots_size - ): - nearest_above_quantile = trial - - if pivots_size <= quantile_pivots_size: - if is_better_below_candidate( - trial, nearest_below_quantile, direction0, quantile_pivots_size - ): - nearest_below_quantile = trial - - if not nearest_above_quantile and not nearest_below_quantile: - return None - if not nearest_above_quantile: - return nearest_below_quantile - if not nearest_below_quantile: - return nearest_above_quantile - - def tie_break_selection( - above: optuna.trial.FrozenTrial, - below: optuna.trial.FrozenTrial, - direction: optuna.study.StudyDirection, - ) -> optuna.trial.FrozenTrial: - above_quantile_distance = abs(above.values[1] - quantile_pivots_size) - below_quantile_distance = abs(quantile_pivots_size - below.values[1]) - - if above_quantile_distance < below_quantile_distance: - return above - elif above_quantile_distance > below_quantile_distance: - return below - else: - if direction == optuna.study.StudyDirection.MAXIMIZE: - return max([above, below], key=lambda trial: trial.values[1]) - else: - return min([above, below], key=lambda trial: trial.values[1]) - - def final_selection( - above: optuna.trial.FrozenTrial, - below: optuna.trial.FrozenTrial, - direction: optuna.study.StudyDirection, - ) -> optuna.trial.FrozenTrial: - if direction == optuna.study.StudyDirection.MAXIMIZE: - return above if above.values[0] > below.values[0] else below - else: - return above if above.values[0] < below.values[0] else below - - if np.isclose( - nearest_above_quantile.values[0], nearest_below_quantile.values[0] - ): - return tie_break_selection( - nearest_above_quantile, - nearest_below_quantile, - study.directions[1], + def calculate_distances( + normalized: np.ndarray, + metric: str, + p_order: float = self.ft_params.get("label_p_order", 2.0), + ) -> np.ndarray: + ideal_point = np.full(normalized.shape[1], 1.0) + + if metric == "euclidean": + return np.linalg.norm(normalized - ideal_point, axis=1) + elif metric == "chebyshev": + return np.max(np.abs(normalized - ideal_point), axis=1) + elif metric == "manhattan": + return np.sum(np.abs(normalized - ideal_point), axis=1) + elif metric == "minkowski": + if p_order < 1: + raise ValueError("Minkowski p_order must be >= 1") + return np.power( + np.sum(np.power(np.abs(normalized - ideal_point), p_order), axis=1), + 1.0 / p_order, ) else: - return final_selection( - nearest_above_quantile, nearest_below_quantile, direction0 - ) - elif label_trials_selection in ["chebyshev", "euclidean"]: - - def calculate_distances(normalized: np.ndarray, metric: str) -> np.ndarray: - ideal_point = np.full(normalized.shape[1], 1.0) + raise ValueError(f"Unsupported distance metric: {metric}") - if metric == "chebyshev": - return np.max(np.abs(normalized - ideal_point), axis=1) - elif metric == "euclidean": - return np.linalg.norm(normalized - ideal_point, axis=1) - # elif metric == "manhattan": - # return np.sum(np.abs(normalized - ideal_point), axis=1) - else: - raise ValueError(f"Unsupported distance metric: {metric}") - - objective_values_matrix = np.array([trial.values for trial in best_trials]) - - normalized_matrix = np.zeros_like(objective_values_matrix, dtype=float) + objective_values_matrix = np.array([trial.values for trial in best_trials]) + normalized_matrix = np.zeros_like(objective_values_matrix, dtype=float) - for i in range(objective_values_matrix.shape[1]): - col = objective_values_matrix[:, i] - min_val = np.min(col) - max_val = np.max(col) - range_val = max_val - min_val + for i in range(objective_values_matrix.shape[1]): + current_column = objective_values_matrix[:, i] + current_direction = study.directions[i] - if np.isclose(range_val, 0): - normalized_matrix[:, i] = 0.5 + is_neg_inf_mask = np.isneginf(current_column) + is_pos_inf_mask = np.isposinf(current_column) + if current_direction == optuna.study.StudyDirection.MAXIMIZE: + normalized_matrix[is_neg_inf_mask, i] = 0.0 + normalized_matrix[is_pos_inf_mask, i] = 1.0 + else: + normalized_matrix[is_neg_inf_mask, i] = 1.0 + normalized_matrix[is_pos_inf_mask, i] = 0.0 + + is_finite_mask = np.isfinite(current_column) + + if is_finite_mask.any(): + finite_col = current_column[is_finite_mask] + finite_min_val = np.min(finite_col) + finite_max_val = np.max(finite_col) + finite_range_val = finite_max_val - finite_min_val + + if np.isclose(finite_range_val, 0): + if is_pos_inf_mask.any() and is_neg_inf_mask.any(): + normalized_matrix[is_finite_mask, i] = 0.5 + elif is_pos_inf_mask.any(): + normalized_matrix[is_finite_mask, i] = ( + 0.0 + if current_direction == optuna.study.StudyDirection.MAXIMIZE + else 1.0 + ) + elif is_neg_inf_mask.any(): + normalized_matrix[is_finite_mask, i] = ( + 1.0 + if current_direction == optuna.study.StudyDirection.MAXIMIZE + else 0.0 + ) + else: + normalized_matrix[is_finite_mask, i] = 0.5 else: - if study.directions[i] == optuna.study.StudyDirection.MAXIMIZE: - normalized_matrix[:, i] = (col - min_val) / range_val + if current_direction == optuna.study.StudyDirection.MAXIMIZE: + normalized_matrix[is_finite_mask, i] = ( + finite_col - finite_min_val + ) / finite_range_val else: - normalized_matrix[:, i] = (max_val - col) / range_val + normalized_matrix[is_finite_mask, i] = ( + finite_max_val - finite_col + ) / finite_range_val - trial_distances = calculate_distances( - normalized_matrix, label_trials_selection - ) - if trial_distances.size == 0: - return None + trial_distances = calculate_distances(normalized_matrix, label_trials_selection) - return best_trials[np.argmin(trial_distances)] + return best_trials[np.argmin(trial_distances)] def optuna_optimize( self, diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 5ace30f..df7bb11 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -60,7 +60,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.71" + return "3.3.72" timeframe = "5m"