]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(qav3): switch to state of the art MO trials selection
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 26 May 2025 12:44:38 +0000 (14:44 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 26 May 2025 12:44:38 +0000 (14:44 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py

index bf311d6f9d604e791e0d071790aaf1f20722ba7d..0e595a5eeb48f3e111c22748dc29d93d5dfbdc28 100644 (file)
@@ -45,7 +45,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.69"
+    version = "3.7.70"
 
     @cached_property
     def _optuna_config(self) -> dict:
@@ -406,203 +406,107 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         n_objectives = len(study.directions)
 
         label_trials_selection = self.ft_params.get(
-            "label_trials_selection", "quantile"
+            "label_trials_selection", "euclidean"
         )
-        if label_trials_selection not in ["quantile", "chebyshev", "euclidean"]:
+        metrics = {"euclidean", "chebyshev", "manhattan", "minkowski"}
+        if label_trials_selection not in metrics:
             raise ValueError(
-                f"Unsupported label trials selection method: {label_trials_selection}. Supported methods are 'quantile', 'chebyshev', and 'euclidean'"
-            )
-        if label_trials_selection in ["quantile"] and n_objectives != 2:
-            raise ValueError(
-                f"{label_trials_selection} requires exactly 2 objectives, got {n_objectives}"
+                f"Unsupported label trials selection method: {label_trials_selection}. Supported methods are {', '.join(metrics)}"
             )
 
-        best_trials = [
-            trial
-            for trial in study.best_trials
-            if trial.values is not None and len(trial.values) == n_objectives
-        ]
+        best_trials = []
+        for trial in study.best_trials:
+            if trial.values is not None and len(trial.values) == n_objectives:
+                valid_trial = True
+                for value in trial.values:
+                    if not isinstance(value, (int, float)):
+                        valid_trial = False
+                        break
+                    if np.isnan(value):
+                        valid_trial = False
+                        break
+                if valid_trial:
+                    best_trials.append(trial)
         if not best_trials:
             return None
 
-        if label_trials_selection == "quantile":
-            if len(best_trials) == 1:
-                return best_trials[0]
-
-            def compare_primary_objective(
-                trial_a: optuna.trial.FrozenTrial,
-                trial_b: optuna.trial.FrozenTrial,
-                direction: optuna.study.StudyDirection,
-            ) -> bool:
-                if direction == optuna.study.StudyDirection.MAXIMIZE:
-                    return trial_a.values[0] > trial_b.values[0]
-                return trial_a.values[0] < trial_b.values[0]
-
-            def is_better_above_candidate(
-                candidate_trial: optuna.trial.FrozenTrial,
-                previous_trial: Optional[optuna.trial.FrozenTrial],
-                direction: optuna.study.StudyDirection,
-                target_pivot_size: float,
-            ) -> bool:
-                if not previous_trial:
-                    return True
-
-                candidate_distance = candidate_trial.values[1] - target_pivot_size
-                previous_distance = previous_trial.values[1] - target_pivot_size
-
-                if not np.isclose(candidate_distance, previous_distance):
-                    return candidate_distance < previous_distance
-
-                return compare_primary_objective(
-                    candidate_trial, previous_trial, direction
-                )
-
-            def is_better_below_candidate(
-                candidate_trial: optuna.trial.FrozenTrial,
-                previous_trial: Optional[optuna.trial.FrozenTrial],
-                direction: optuna.study.StudyDirection,
-                target_pivot_size: float,
-            ) -> bool:
-                if not previous_trial:
-                    return True
-
-                candidate_distance = target_pivot_size - candidate_trial.values[1]
-                previous_distance = target_pivot_size - previous_trial.values[1]
-
-                if not np.isclose(candidate_distance, previous_distance):
-                    return candidate_distance < previous_distance
-
-                return compare_primary_objective(
-                    candidate_trial, previous_trial, direction
-                )
-
-            pivots_sizes = [trial.values[1] for trial in best_trials]
-            label_quantile = float(self.ft_params.get("label_quantile", 0.5))
-            if not (0.0 <= label_quantile <= 1.0):
-                raise ValueError("label_quantile must be between 0.0 and 1.0")
-            quantile_pivots_size = np.quantile(pivots_sizes, label_quantile)
-
-            direction0 = study.directions[0]
-
-            equal_quantile_pivots_size_trials = [
-                trial
-                for trial in best_trials
-                if np.isclose(trial.values[1], quantile_pivots_size)
-            ]
-            if equal_quantile_pivots_size_trials:
-                if direction0 == optuna.study.StudyDirection.MAXIMIZE:
-                    return max(
-                        equal_quantile_pivots_size_trials,
-                        key=lambda trial: trial.values[0],
-                    )
-                else:
-                    return min(
-                        equal_quantile_pivots_size_trials,
-                        key=lambda trial: trial.values[0],
-                    )
-
-            nearest_above_quantile = None
-            nearest_below_quantile = None
-            for trial in best_trials:
-                pivots_size = trial.values[1]
-                if pivots_size >= quantile_pivots_size:
-                    if is_better_above_candidate(
-                        trial, nearest_above_quantile, direction0, quantile_pivots_size
-                    ):
-                        nearest_above_quantile = trial
-
-                if pivots_size <= quantile_pivots_size:
-                    if is_better_below_candidate(
-                        trial, nearest_below_quantile, direction0, quantile_pivots_size
-                    ):
-                        nearest_below_quantile = trial
-
-            if not nearest_above_quantile and not nearest_below_quantile:
-                return None
-            if not nearest_above_quantile:
-                return nearest_below_quantile
-            if not nearest_below_quantile:
-                return nearest_above_quantile
-
-            def tie_break_selection(
-                above: optuna.trial.FrozenTrial,
-                below: optuna.trial.FrozenTrial,
-                direction: optuna.study.StudyDirection,
-            ) -> optuna.trial.FrozenTrial:
-                above_quantile_distance = abs(above.values[1] - quantile_pivots_size)
-                below_quantile_distance = abs(quantile_pivots_size - below.values[1])
-
-                if above_quantile_distance < below_quantile_distance:
-                    return above
-                elif above_quantile_distance > below_quantile_distance:
-                    return below
-                else:
-                    if direction == optuna.study.StudyDirection.MAXIMIZE:
-                        return max([above, below], key=lambda trial: trial.values[1])
-                    else:
-                        return min([above, below], key=lambda trial: trial.values[1])
-
-            def final_selection(
-                above: optuna.trial.FrozenTrial,
-                below: optuna.trial.FrozenTrial,
-                direction: optuna.study.StudyDirection,
-            ) -> optuna.trial.FrozenTrial:
-                if direction == optuna.study.StudyDirection.MAXIMIZE:
-                    return above if above.values[0] > below.values[0] else below
-                else:
-                    return above if above.values[0] < below.values[0] else below
-
-            if np.isclose(
-                nearest_above_quantile.values[0], nearest_below_quantile.values[0]
-            ):
-                return tie_break_selection(
-                    nearest_above_quantile,
-                    nearest_below_quantile,
-                    study.directions[1],
+        def calculate_distances(
+            normalized: np.ndarray,
+            metric: str,
+            p_order: float = self.ft_params.get("label_p_order", 2.0),
+        ) -> np.ndarray:
+            ideal_point = np.full(normalized.shape[1], 1.0)
+
+            if metric == "euclidean":
+                return np.linalg.norm(normalized - ideal_point, axis=1)
+            elif metric == "chebyshev":
+                return np.max(np.abs(normalized - ideal_point), axis=1)
+            elif metric == "manhattan":
+                return np.sum(np.abs(normalized - ideal_point), axis=1)
+            elif metric == "minkowski":
+                if p_order < 1:
+                    raise ValueError("Minkowski p_order must be >= 1")
+                return np.power(
+                    np.sum(np.power(np.abs(normalized - ideal_point), p_order), axis=1),
+                    1.0 / p_order,
                 )
             else:
-                return final_selection(
-                    nearest_above_quantile, nearest_below_quantile, direction0
-                )
-        elif label_trials_selection in ["chebyshev", "euclidean"]:
-
-            def calculate_distances(normalized: np.ndarray, metric: str) -> np.ndarray:
-                ideal_point = np.full(normalized.shape[1], 1.0)
+                raise ValueError(f"Unsupported distance metric: {metric}")
 
-                if metric == "chebyshev":
-                    return np.max(np.abs(normalized - ideal_point), axis=1)
-                elif metric == "euclidean":
-                    return np.linalg.norm(normalized - ideal_point, axis=1)
-                # elif metric == "manhattan":
-                #     return np.sum(np.abs(normalized - ideal_point), axis=1)
-                else:
-                    raise ValueError(f"Unsupported distance metric: {metric}")
-
-            objective_values_matrix = np.array([trial.values for trial in best_trials])
-
-            normalized_matrix = np.zeros_like(objective_values_matrix, dtype=float)
+        objective_values_matrix = np.array([trial.values for trial in best_trials])
+        normalized_matrix = np.zeros_like(objective_values_matrix, dtype=float)
 
-            for i in range(objective_values_matrix.shape[1]):
-                col = objective_values_matrix[:, i]
-                min_val = np.min(col)
-                max_val = np.max(col)
-                range_val = max_val - min_val
+        for i in range(objective_values_matrix.shape[1]):
+            current_column = objective_values_matrix[:, i]
+            current_direction = study.directions[i]
 
-                if np.isclose(range_val, 0):
-                    normalized_matrix[:, i] = 0.5
+            is_neg_inf_mask = np.isneginf(current_column)
+            is_pos_inf_mask = np.isposinf(current_column)
+            if current_direction == optuna.study.StudyDirection.MAXIMIZE:
+                normalized_matrix[is_neg_inf_mask, i] = 0.0
+                normalized_matrix[is_pos_inf_mask, i] = 1.0
+            else:
+                normalized_matrix[is_neg_inf_mask, i] = 1.0
+                normalized_matrix[is_pos_inf_mask, i] = 0.0
+
+            is_finite_mask = np.isfinite(current_column)
+
+            if is_finite_mask.any():
+                finite_col = current_column[is_finite_mask]
+                finite_min_val = np.min(finite_col)
+                finite_max_val = np.max(finite_col)
+                finite_range_val = finite_max_val - finite_min_val
+
+                if np.isclose(finite_range_val, 0):
+                    if is_pos_inf_mask.any() and is_neg_inf_mask.any():
+                        normalized_matrix[is_finite_mask, i] = 0.5
+                    elif is_pos_inf_mask.any():
+                        normalized_matrix[is_finite_mask, i] = (
+                            0.0
+                            if current_direction == optuna.study.StudyDirection.MAXIMIZE
+                            else 1.0
+                        )
+                    elif is_neg_inf_mask.any():
+                        normalized_matrix[is_finite_mask, i] = (
+                            1.0
+                            if current_direction == optuna.study.StudyDirection.MAXIMIZE
+                            else 0.0
+                        )
+                    else:
+                        normalized_matrix[is_finite_mask, i] = 0.5
                 else:
-                    if study.directions[i] == optuna.study.StudyDirection.MAXIMIZE:
-                        normalized_matrix[:, i] = (col - min_val) / range_val
+                    if current_direction == optuna.study.StudyDirection.MAXIMIZE:
+                        normalized_matrix[is_finite_mask, i] = (
+                            finite_col - finite_min_val
+                        ) / finite_range_val
                     else:
-                        normalized_matrix[:, i] = (max_val - col) / range_val
+                        normalized_matrix[is_finite_mask, i] = (
+                            finite_max_val - finite_col
+                        ) / finite_range_val
 
-            trial_distances = calculate_distances(
-                normalized_matrix, label_trials_selection
-            )
-            if trial_distances.size == 0:
-                return None
+        trial_distances = calculate_distances(normalized_matrix, label_trials_selection)
 
-            return best_trials[np.argmin(trial_distances)]
+        return best_trials[np.argmin(trial_distances)]
 
     def optuna_optimize(
         self,
index 5ace30f8164a4b87be608d090a5e3962c0f9340a..df7bb114e30b7847f1d764018b0ed8e77b461a06 100644 (file)
@@ -60,7 +60,7 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.3.71"
+        return "3.3.72"
 
     timeframe = "5m"