]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(qav3): refine protections default configuration
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 25 Nov 2025 16:45:13 +0000 (17:45 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 25 Nov 2025 16:45:13 +0000 (17:45 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/reward_space_analysis/reward_space_analysis.py
ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py
ReforceXY/user_data/freqaimodels/ReforceXY.py
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py

index 371c77c0a3d0531cf898e4e2cc9738483b0cc45f..b60344f0173e2853cd1bd1b7bb75fbb740c59605 100644 (file)
@@ -1256,7 +1256,7 @@ def simulate_samples(
                 pnl -= 0.005 * duration_ratio
 
             # Clip PnL to realistic range
-            pnl = max(min(pnl, 0.15), -0.15)
+            pnl = min(max(-0.15, pnl), 0.15)
 
         if position == Positions.Neutral:
             max_unrealized_profit = 0.0
index 0fd48cc962d381eaeef160e5781356d939f1791b..071bb41f0eb5e21a0d710ce6733395764b2e8b18 100644 (file)
@@ -387,7 +387,7 @@ def main():
     # Prepare list of (conf, strict)
     scenario_pairs: List[Tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios]
     indices = {conf: idx for idx, conf in enumerate(scenarios, start=1)}
-    n_duplicated = max(0, min(args.strict_sample, len(scenarios)))
+    n_duplicated = min(max(0, args.strict_sample), len(scenarios))
     if n_duplicated > 0:
         print(f"Duplicating first {n_duplicated} scenarios with --strict_diagnostics")
     for c in scenarios[:n_duplicated]:
index 6b14f0fb905a3f407856ffc4d8cf06df18cce945..ec4095c9a126d97fe1244de718e299b364170f6b 100644 (file)
@@ -1942,7 +1942,7 @@ class MyRLEnv(Base5ActionRLEnv):
             return x / math.hypot(1.0, x)
 
         if name == ReforceXY._TRANSFORM_FUNCTIONS[5]:  # "clip"
-            return max(-1.0, min(1.0, x))
+            return min(max(-1.0, x), 1.0)
 
         logger.warning(
             "Unknown potential transform '%s'; falling back to tanh. Valid transforms: %s",
index 7b0a9586b7369f2baaab41ecf0ead38d970b2c3e..03aa3321896815fda50d960c75956b87a98abfc7 100644 (file)
@@ -1155,7 +1155,383 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
 
         return normalized_matrix
 
-    def get_multi_objective_study_best_trial(
+    @staticmethod
+    def _get_n_clusters(
+        matrix: NDArray[np.floating],
+        *,
+        min_n_clusters: int = 2,
+        max_n_clusters: int = 10,
+    ) -> int:
+        n_samples = matrix.shape[0]
+        if n_samples <= 1:
+            return 1
+        n_uniques = np.unique(matrix, axis=0).shape[0]
+        upper_bound = min(max_n_clusters, n_uniques, n_samples)
+        if upper_bound < 2:
+            return 1
+        lower_bound = min(min_n_clusters, upper_bound)
+        if n_uniques <= 3:
+            return min(n_uniques, upper_bound)
+        n_clusters = int(round((np.log2(n_uniques) + np.sqrt(n_uniques)) / 2.0))
+        return max(lower_bound, min(n_clusters, upper_bound))
+
+    def _calculate_distances_to_ideal(
+        self,
+        normalized_matrix: NDArray[np.floating],
+        metric: str,
+        metrics: set[str],
+    ) -> NDArray[np.floating]:
+        if normalized_matrix.ndim != 2:
+            raise ValueError("normalized_matrix must be 2-dimensional")
+        n_objectives = normalized_matrix.shape[1]
+        n_samples = normalized_matrix.shape[0]
+        if n_samples == 0 or n_objectives == 0:
+            raise ValueError(
+                "normalized_matrix must have at least one sample and one objective"
+            )
+        if not np.all(np.isfinite(normalized_matrix)):
+            raise ValueError(
+                "normalized_matrix must contain only finite values (no NaN or inf)"
+            )
+        label_p_order = self.ft_params.get("label_p_order")
+        np_weights = np.array(self.ft_params.get("label_weights", [1.0] * n_objectives))
+        if np_weights.size != n_objectives:
+            raise ValueError("label_weights length must match number of objectives")
+        if not np.all(np.isfinite(np_weights)):
+            raise ValueError("label_weights must contain only finite values")
+        if np.any(np_weights < 0):
+            raise ValueError("label_weights values must be non-negative")
+        label_weights_sum = np.sum(np.abs(np_weights))
+        if np.isclose(label_weights_sum, 0.0):
+            raise ValueError("label_weights sum cannot be zero")
+        np_weights = np_weights / label_weights_sum
+
+        ideal_point = np.ones(n_objectives)
+        ideal_point_2d = ideal_point.reshape(1, -1)
+
+        if n_samples == 0:
+            return np.array([])
+        if n_samples == 1:
+            if metric in {
+                "medoid",
+                "kmeans",
+                "kmeans2",
+                "kmedoids",
+                "knn_power_mean",
+                "knn_percentile",
+                "knn_min",
+                "knn_max",
+            }:
+                return np.array([0.0])
+
+        if metric in {
+            # "braycurtis",
+            # "canberra",
+            "chebyshev",
+            "cityblock",
+            # "correlation",
+            # "cosine",
+            # "dice",
+            "euclidean",
+            # "hamming",
+            # "jaccard",
+            "jensenshannon",
+            # "kulczynski1",  # Deprecated in SciPy ≥ 1.15.0; do not use.
+            "mahalanobis",
+            # "matching",
+            "minkowski",
+            # "rogerstanimoto",
+            # "russellrao",
+            "seuclidean",
+            # "sokalmichener",  # Deprecated in SciPy ≥ 1.15.0; do not use.
+            # "sokalsneath",
+            "sqeuclidean",
+            # "yule",
+        }:
+            cdist_kwargs: dict[str, Any] = {}
+            if metric not in {"mahalanobis", "seuclidean", "jensenshannon"}:
+                cdist_kwargs["w"] = np_weights
+            if metric == "minkowski":
+                cdist_kwargs["p"] = (
+                    label_p_order
+                    if label_p_order is not None and np.isfinite(label_p_order)
+                    else 2.0
+                )
+            return sp.spatial.distance.cdist(
+                normalized_matrix,
+                ideal_point_2d,
+                metric=metric,
+                **cdist_kwargs,
+            ).flatten()
+        elif metric in {"hellinger", "shellinger"}:
+            np_sqrt_normalized_matrix = np.sqrt(normalized_matrix)
+            if metric == "shellinger":
+                variances = np.var(np_sqrt_normalized_matrix, axis=0, ddof=1)
+                if np.any(variances <= 0):
+                    raise ValueError(
+                        "shellinger metric requires non-zero variance for all objectives"
+                    )
+                np_weights = 1 / variances
+            return (
+                np.sqrt(
+                    np.sum(
+                        np_weights
+                        * (np_sqrt_normalized_matrix - np.sqrt(ideal_point)) ** 2,
+                        axis=1,
+                    )
+                )
+                / QuickAdapterRegressorV3._SQRT_2
+            )
+        elif metric in {
+            "harmonic_mean",
+            "geometric_mean",
+            "arithmetic_mean",
+            "quadratic_mean",
+            "cubic_mean",
+            "power_mean",
+        }:
+            p = {
+                "harmonic_mean": -1.0,
+                "geometric_mean": 0.0,
+                "arithmetic_mean": 1.0,
+                "quadratic_mean": 2.0,
+                "cubic_mean": 3.0,
+                "power_mean": label_p_order
+                if label_p_order is not None and np.isfinite(label_p_order)
+                else 1.0,
+            }[metric]
+            return sp.stats.pmean(
+                ideal_point, p=p, weights=np_weights
+            ) - sp.stats.pmean(normalized_matrix, p=p, weights=np_weights, axis=1)
+        elif metric == "weighted_sum":
+            return np.sum(np_weights * (ideal_point - normalized_matrix), axis=1)
+        elif metric == "medoid":
+            label_medoid_metric = self.ft_params.get("label_medoid_metric", "euclidean")
+            if label_medoid_metric in {
+                "mahalanobis",
+                "seuclidean",
+                "jensenshannon",
+            }:
+                raise ValueError(
+                    f"Unsupported label_medoid_metric: {label_medoid_metric}. Supported are euclidean/minkowski/cityblock/chebyshev/..."
+                )
+            p = None
+            if label_medoid_metric == "minkowski":
+                p = (
+                    label_p_order
+                    if label_p_order is not None and np.isfinite(label_p_order)
+                    else 2.0
+                )
+            return self._pairwise_distance_sums(
+                normalized_matrix,
+                label_medoid_metric,
+                weights=np_weights,
+                p=p,
+            )
+        elif metric in {"kmeans", "kmeans2"}:
+            n_clusters = QuickAdapterRegressorV3._get_n_clusters(normalized_matrix)
+            if metric == "kmeans":
+                kmeans = sklearn.cluster.KMeans(
+                    n_clusters=n_clusters, random_state=42, n_init=10
+                )
+                cluster_labels = kmeans.fit_predict(normalized_matrix)
+                cluster_centers = kmeans.cluster_centers_
+            elif metric == "kmeans2":
+                cluster_centers, cluster_labels = sp.cluster.vq.kmeans2(
+                    normalized_matrix, n_clusters, rng=42, minit="++"
+                )
+            label_kmeans_metric = self.ft_params.get("label_kmeans_metric", "euclidean")
+            if label_kmeans_metric in {
+                "mahalanobis",
+                "seuclidean",
+                "jensenshannon",
+            }:
+                raise ValueError(
+                    f"Unsupported label_kmeans_metric: {label_kmeans_metric}. Supported are euclidean/minkowski/cityblock/chebyshev/..."
+                )
+            cdist_kwargs: dict[str, Any] = {}
+            if label_kmeans_metric == "minkowski":
+                cdist_kwargs["p"] = (
+                    label_p_order
+                    if label_p_order is not None and np.isfinite(label_p_order)
+                    else 2.0
+                )
+            cluster_center_distances_to_ideal = sp.spatial.distance.cdist(
+                cluster_centers,
+                ideal_point_2d,
+                metric=label_kmeans_metric,
+                **cdist_kwargs,
+            ).flatten()
+            label_kmeans_selection = self.ft_params.get("label_kmeans_selection", "min")
+            ordered_cluster_indices = np.argsort(cluster_center_distances_to_ideal)
+            best_cluster_indices = None
+            for cluster_index in ordered_cluster_indices:
+                cluster_indices = np.flatnonzero(cluster_labels == cluster_index)
+                if cluster_indices.size > 0:
+                    best_cluster_indices = cluster_indices
+                    break
+            trial_distances = np.full(n_samples, np.inf)
+            if best_cluster_indices is not None and best_cluster_indices.size > 0:
+                if label_kmeans_selection == "medoid":
+                    p = None
+                    if label_kmeans_metric == "minkowski":
+                        p = (
+                            label_p_order
+                            if label_p_order is not None and np.isfinite(label_p_order)
+                            else 2.0
+                        )
+                    best_medoid_position = np.argmin(
+                        self._pairwise_distance_sums(
+                            normalized_matrix[best_cluster_indices],
+                            label_kmeans_metric,
+                            p=p,
+                        )
+                    )
+                    best_trial_index = best_cluster_indices[best_medoid_position]
+                    best_trial_distance = sp.spatial.distance.cdist(
+                        normalized_matrix[[best_trial_index]],
+                        ideal_point_2d,
+                        metric=label_kmeans_metric,
+                        **cdist_kwargs,
+                    ).item()
+                    trial_distances[best_trial_index] = best_trial_distance
+                elif label_kmeans_selection == "min":
+                    best_cluster_distances = sp.spatial.distance.cdist(
+                        normalized_matrix[best_cluster_indices],
+                        ideal_point_2d,
+                        metric=label_kmeans_metric,
+                        **cdist_kwargs,
+                    ).flatten()
+                    min_distance_position = np.argmin(best_cluster_distances)
+                    best_trial_index = best_cluster_indices[min_distance_position]
+                    trial_distances[best_trial_index] = best_cluster_distances[
+                        min_distance_position
+                    ]
+                else:
+                    raise ValueError(
+                        f"Unsupported label_kmeans_selection: {label_kmeans_selection}. Supported are medoid/min"
+                    )
+            return trial_distances
+        elif metric == "kmedoids":
+            n_clusters = QuickAdapterRegressorV3._get_n_clusters(normalized_matrix)
+            label_kmedoids_metric = self.ft_params.get(
+                "label_kmedoids_metric", "euclidean"
+            )
+            if label_kmedoids_metric in {
+                "mahalanobis",
+                "seuclidean",
+                "jensenshannon",
+            }:
+                raise ValueError(
+                    f"Unsupported label_kmedoids_metric: {label_kmedoids_metric}. Supported are euclidean/minkowski/cityblock/chebyshev/..."
+                )
+            kmedoids_kwargs: dict[str, Any] = {
+                "metric": label_kmedoids_metric,
+                "random_state": 42,
+                "init": "k-medoids++",
+                "method": "pam",
+            }
+            kmedoids = KMedoids(n_clusters=n_clusters, **kmedoids_kwargs)
+            cluster_labels = kmedoids.fit_predict(normalized_matrix)
+            medoid_indices = kmedoids.medoid_indices_
+            cdist_kwargs: dict[str, Any] = {}
+            if label_kmedoids_metric == "minkowski":
+                cdist_kwargs["p"] = (
+                    label_p_order
+                    if label_p_order is not None and np.isfinite(label_p_order)
+                    else 2.0
+                )
+            medoid_distances_to_ideal = sp.spatial.distance.cdist(
+                normalized_matrix[medoid_indices],
+                ideal_point_2d,
+                metric=label_kmedoids_metric,
+                **cdist_kwargs,
+            ).flatten()
+            label_kmedoids_selection = self.ft_params.get(
+                "label_kmedoids_selection", "min"
+            )
+            best_medoid_distance_position = np.argmin(medoid_distances_to_ideal)
+            best_medoid_index = medoid_indices[best_medoid_distance_position]
+            cluster_index = cluster_labels[best_medoid_index]
+            best_cluster_indices = np.flatnonzero(cluster_labels == cluster_index)
+            trial_distances = np.full(n_samples, np.inf)
+            if best_cluster_indices.size > 0:
+                if label_kmedoids_selection == "medoid":
+                    trial_distances[best_medoid_index] = medoid_distances_to_ideal[
+                        best_medoid_distance_position
+                    ]
+                elif label_kmedoids_selection == "min":
+                    if best_cluster_indices.size == 1:
+                        best_trial_index = best_cluster_indices[0]
+                        trial_distances[best_trial_index] = medoid_distances_to_ideal[
+                            best_medoid_distance_position
+                        ]
+                    else:
+                        best_cluster_distances = sp.spatial.distance.cdist(
+                            normalized_matrix[best_cluster_indices],
+                            ideal_point_2d,
+                            metric=label_kmedoids_metric,
+                            **cdist_kwargs,
+                        ).flatten()
+                        min_distance_position = np.argmin(best_cluster_distances)
+                        best_trial_index = best_cluster_indices[min_distance_position]
+                        trial_distances[best_trial_index] = best_cluster_distances[
+                            min_distance_position
+                        ]
+                else:
+                    raise ValueError(
+                        f"Unsupported label_kmedoids_selection: {label_kmedoids_selection}. Supported are medoid/min"
+                    )
+            return trial_distances
+        elif metric in {"knn_power_mean", "knn_percentile", "knn_min", "knn_max"}:
+            label_knn_metric = self.ft_params.get("label_knn_metric", "minkowski")
+            knn_kwargs: dict[str, Any] = {}
+            if label_knn_metric == "minkowski":
+                knn_kwargs["p"] = (
+                    label_p_order
+                    if label_p_order is not None and np.isfinite(label_p_order)
+                    else 2.0
+                )
+                knn_kwargs["metric_params"] = {"w": np_weights}
+            label_knn_p_order = self.ft_params.get("label_knn_p_order")
+            n_neighbors = (
+                min(
+                    int(self.ft_params.get("label_knn_n_neighbors", 5)),
+                    n_samples - 1,
+                )
+                + 1
+            )
+            nbrs = sklearn.neighbors.NearestNeighbors(
+                n_neighbors=n_neighbors, metric=label_knn_metric, **knn_kwargs
+            ).fit(normalized_matrix)
+            distances, _ = nbrs.kneighbors(normalized_matrix)
+            neighbor_distances = distances[:, 1:]
+            if neighbor_distances.shape[1] < 1:
+                return np.full(n_samples, np.inf)
+            if metric == "knn_power_mean":
+                label_knn_p_order = (
+                    label_knn_p_order
+                    if label_knn_p_order is not None and np.isfinite(label_knn_p_order)
+                    else 1.0
+                )
+                return sp.stats.pmean(neighbor_distances, p=label_knn_p_order, axis=1)
+            elif metric == "knn_percentile":
+                label_knn_p_order = (
+                    label_knn_p_order
+                    if label_knn_p_order is not None and np.isfinite(label_knn_p_order)
+                    else 50.0
+                )
+                return np.percentile(neighbor_distances, label_knn_p_order, axis=1)
+            elif metric == "knn_min":
+                return np.min(neighbor_distances, axis=1)
+            elif metric == "knn_max":
+                return np.max(neighbor_distances, axis=1)
+        else:
+            raise ValueError(
+                f"Unsupported label metric: {metric}. Supported metrics are {', '.join(metrics)}"
+            )
+
+    def _get_multi_objective_study_best_trial(
         self, namespace: str, study: optuna.study.Study
     ) -> Optional[optuna.trial.FrozenTrial]:
         if namespace not in {
@@ -1236,393 +1612,6 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         if not best_trials:
             return None
 
-        def calculate_distances(
-            normalized_matrix: NDArray[np.floating], metric: str
-        ) -> NDArray[np.floating]:
-            if normalized_matrix.ndim != 2:
-                raise ValueError("normalized_matrix must be 2-dimensional")
-            n_objectives = normalized_matrix.shape[1]
-            n_samples = normalized_matrix.shape[0]
-            if n_samples == 0 or n_objectives == 0:
-                raise ValueError(
-                    "normalized_matrix must have at least one sample and one objective"
-                )
-            if not np.all(np.isfinite(normalized_matrix)):
-                raise ValueError(
-                    "normalized_matrix must contain only finite values (no NaN or inf)"
-                )
-            label_p_order = self.ft_params.get("label_p_order")
-            np_weights = np.array(
-                self.ft_params.get("label_weights", [1.0] * n_objectives)
-            )
-            if np_weights.size != n_objectives:
-                raise ValueError("label_weights length must match number of objectives")
-            if not np.all(np.isfinite(np_weights)):
-                raise ValueError("label_weights must contain only finite values")
-            if np.any(np_weights < 0):
-                raise ValueError("label_weights values must be non-negative")
-            label_weights_sum = np.sum(np.abs(np_weights))
-            if np.isclose(label_weights_sum, 0.0):
-                raise ValueError("label_weights sum cannot be zero")
-            np_weights = np_weights / label_weights_sum
-
-            ideal_point = np.ones(n_objectives)
-            ideal_point_2d = ideal_point.reshape(1, -1)
-
-            def _get_n_clusters(
-                matrix: NDArray[np.floating],
-                *,
-                min_n_clusters: int = 2,
-                max_n_clusters: int = 10,
-            ) -> int:
-                n_samples = matrix.shape[0]
-                if n_samples <= 1:
-                    return 1
-                n_uniques = np.unique(matrix, axis=0).shape[0]
-                upper_bound = min(max_n_clusters, n_uniques, n_samples)
-                if upper_bound < 2:
-                    return 1
-                lower_bound = min(min_n_clusters, upper_bound)
-                if n_uniques <= 3:
-                    return min(n_uniques, upper_bound)
-                n_clusters = int(round((np.log2(n_uniques) + np.sqrt(n_uniques)) / 2.0))
-                return max(lower_bound, min(n_clusters, upper_bound))
-
-            if n_samples == 0:
-                return np.array([])
-            if n_samples == 1:
-                if metric in {
-                    "medoid",
-                    "kmeans",
-                    "kmeans2",
-                    "kmedoids",
-                    "knn_power_mean",
-                    "knn_percentile",
-                    "knn_min",
-                    "knn_max",
-                }:
-                    return np.array([0.0])
-
-            if metric in {
-                # "braycurtis",
-                # "canberra",
-                "chebyshev",
-                "cityblock",
-                # "correlation",
-                # "cosine",
-                # "dice",
-                "euclidean",
-                # "hamming",
-                # "jaccard",
-                "jensenshannon",
-                # "kulczynski1",  # Deprecated in SciPy ≥ 1.15.0; do not use.
-                "mahalanobis",
-                # "matching",
-                "minkowski",
-                # "rogerstanimoto",
-                # "russellrao",
-                "seuclidean",
-                # "sokalmichener",  # Deprecated in SciPy ≥ 1.15.0; do not use.
-                # "sokalsneath",
-                "sqeuclidean",
-                # "yule",
-            }:
-                cdist_kwargs: dict[str, Any] = {}
-                if metric not in {"mahalanobis", "seuclidean", "jensenshannon"}:
-                    cdist_kwargs["w"] = np_weights
-                if metric == "minkowski":
-                    cdist_kwargs["p"] = (
-                        label_p_order
-                        if label_p_order is not None and np.isfinite(label_p_order)
-                        else 2.0
-                    )
-                return sp.spatial.distance.cdist(
-                    normalized_matrix,
-                    ideal_point_2d,
-                    metric=metric,
-                    **cdist_kwargs,
-                ).flatten()
-            elif metric in {"hellinger", "shellinger"}:
-                np_sqrt_normalized_matrix = np.sqrt(normalized_matrix)
-                if metric == "shellinger":
-                    variances = np.var(np_sqrt_normalized_matrix, axis=0, ddof=1)
-                    if np.any(variances <= 0):
-                        raise ValueError(
-                            "shellinger metric requires non-zero variance for all objectives"
-                        )
-                    np_weights = 1 / variances
-                return (
-                    np.sqrt(
-                        np.sum(
-                            np_weights
-                            * (np_sqrt_normalized_matrix - np.sqrt(ideal_point)) ** 2,
-                            axis=1,
-                        )
-                    )
-                    / QuickAdapterRegressorV3._SQRT_2
-                )
-            elif metric in {
-                "harmonic_mean",
-                "geometric_mean",
-                "arithmetic_mean",
-                "quadratic_mean",
-                "cubic_mean",
-                "power_mean",
-            }:
-                p = {
-                    "harmonic_mean": -1.0,
-                    "geometric_mean": 0.0,
-                    "arithmetic_mean": 1.0,
-                    "quadratic_mean": 2.0,
-                    "cubic_mean": 3.0,
-                    "power_mean": label_p_order
-                    if label_p_order is not None and np.isfinite(label_p_order)
-                    else 1.0,
-                }[metric]
-                return sp.stats.pmean(
-                    ideal_point, p=p, weights=np_weights
-                ) - sp.stats.pmean(normalized_matrix, p=p, weights=np_weights, axis=1)
-            elif metric == "weighted_sum":
-                return np.sum(np_weights * (ideal_point - normalized_matrix), axis=1)
-            elif metric == "medoid":
-                label_medoid_metric = self.ft_params.get(
-                    "label_medoid_metric", "euclidean"
-                )
-                if label_medoid_metric in {
-                    "mahalanobis",
-                    "seuclidean",
-                    "jensenshannon",
-                }:
-                    raise ValueError(
-                        f"Unsupported label_medoid_metric: {label_medoid_metric}. Supported are euclidean/minkowski/cityblock/chebyshev/..."
-                    )
-                p = None
-                if label_medoid_metric == "minkowski":
-                    p = (
-                        label_p_order
-                        if label_p_order is not None and np.isfinite(label_p_order)
-                        else 2.0
-                    )
-                return self._pairwise_distance_sums(
-                    normalized_matrix,
-                    label_medoid_metric,
-                    weights=np_weights,
-                    p=p,
-                )
-            elif metric in {"kmeans", "kmeans2"}:
-                n_clusters = _get_n_clusters(normalized_matrix)
-                if metric == "kmeans":
-                    kmeans = sklearn.cluster.KMeans(
-                        n_clusters=n_clusters, random_state=42, n_init=10
-                    )
-                    cluster_labels = kmeans.fit_predict(normalized_matrix)
-                    cluster_centers = kmeans.cluster_centers_
-                elif metric == "kmeans2":
-                    cluster_centers, cluster_labels = sp.cluster.vq.kmeans2(
-                        normalized_matrix, n_clusters, rng=42, minit="++"
-                    )
-                label_kmeans_metric = self.ft_params.get(
-                    "label_kmeans_metric", "euclidean"
-                )
-                if label_kmeans_metric in {
-                    "mahalanobis",
-                    "seuclidean",
-                    "jensenshannon",
-                }:
-                    raise ValueError(
-                        f"Unsupported label_kmeans_metric: {label_kmeans_metric}. Supported are euclidean/minkowski/cityblock/chebyshev/..."
-                    )
-                cdist_kwargs: dict[str, Any] = {}
-                if label_kmeans_metric == "minkowski":
-                    cdist_kwargs["p"] = (
-                        label_p_order
-                        if label_p_order is not None and np.isfinite(label_p_order)
-                        else 2.0
-                    )
-                cluster_center_distances_to_ideal = sp.spatial.distance.cdist(
-                    cluster_centers,
-                    ideal_point_2d,
-                    metric=label_kmeans_metric,
-                    **cdist_kwargs,
-                ).flatten()
-                label_kmeans_selection = self.ft_params.get(
-                    "label_kmeans_selection", "min"
-                )
-                ordered_cluster_indices = np.argsort(cluster_center_distances_to_ideal)
-                best_cluster_indices = None
-                for cluster_index in ordered_cluster_indices:
-                    cluster_indices = np.flatnonzero(cluster_labels == cluster_index)
-                    if cluster_indices.size > 0:
-                        best_cluster_indices = cluster_indices
-                        break
-                trial_distances = np.full(n_samples, np.inf)
-                if best_cluster_indices is not None and best_cluster_indices.size > 0:
-                    if label_kmeans_selection == "medoid":
-                        p = None
-                        if label_kmeans_metric == "minkowski":
-                            p = (
-                                label_p_order
-                                if label_p_order is not None
-                                and np.isfinite(label_p_order)
-                                else 2.0
-                            )
-                        best_medoid_position = np.argmin(
-                            self._pairwise_distance_sums(
-                                normalized_matrix[best_cluster_indices],
-                                label_kmeans_metric,
-                                p=p,
-                            )
-                        )
-                        best_trial_index = best_cluster_indices[best_medoid_position]
-                        best_trial_distance = sp.spatial.distance.cdist(
-                            normalized_matrix[[best_trial_index]],
-                            ideal_point_2d,
-                            metric=label_kmeans_metric,
-                            **cdist_kwargs,
-                        ).item()
-                        trial_distances[best_trial_index] = best_trial_distance
-                    elif label_kmeans_selection == "min":
-                        best_cluster_distances = sp.spatial.distance.cdist(
-                            normalized_matrix[best_cluster_indices],
-                            ideal_point_2d,
-                            metric=label_kmeans_metric,
-                            **cdist_kwargs,
-                        ).flatten()
-                        min_distance_position = np.argmin(best_cluster_distances)
-                        best_trial_index = best_cluster_indices[min_distance_position]
-                        trial_distances[best_trial_index] = best_cluster_distances[
-                            min_distance_position
-                        ]
-                    else:
-                        raise ValueError(
-                            f"Unsupported label_kmeans_selection: {label_kmeans_selection}. Supported are medoid/min"
-                        )
-                return trial_distances
-            elif metric == "kmedoids":
-                n_clusters = _get_n_clusters(normalized_matrix)
-                label_kmedoids_metric = self.ft_params.get(
-                    "label_kmedoids_metric", "euclidean"
-                )
-                if label_kmedoids_metric in {
-                    "mahalanobis",
-                    "seuclidean",
-                    "jensenshannon",
-                }:
-                    raise ValueError(
-                        f"Unsupported label_kmedoids_metric: {label_kmedoids_metric}. Supported are euclidean/minkowski/cityblock/chebyshev/..."
-                    )
-                kmedoids_kwargs: dict[str, Any] = {
-                    "metric": label_kmedoids_metric,
-                    "random_state": 42,
-                    "init": "k-medoids++",
-                    "method": "pam",
-                }
-                kmedoids = KMedoids(n_clusters=n_clusters, **kmedoids_kwargs)
-                cluster_labels = kmedoids.fit_predict(normalized_matrix)
-                medoid_indices = kmedoids.medoid_indices_
-                cdist_kwargs: dict[str, Any] = {}
-                if label_kmedoids_metric == "minkowski":
-                    cdist_kwargs["p"] = (
-                        label_p_order
-                        if label_p_order is not None and np.isfinite(label_p_order)
-                        else 2.0
-                    )
-                medoid_distances_to_ideal = sp.spatial.distance.cdist(
-                    normalized_matrix[medoid_indices],
-                    ideal_point_2d,
-                    metric=label_kmedoids_metric,
-                    **cdist_kwargs,
-                ).flatten()
-                label_kmedoids_selection = self.ft_params.get(
-                    "label_kmedoids_selection", "min"
-                )
-                best_medoid_distance_position = np.argmin(medoid_distances_to_ideal)
-                best_medoid_index = medoid_indices[best_medoid_distance_position]
-                cluster_index = cluster_labels[best_medoid_index]
-                best_cluster_indices = np.flatnonzero(cluster_labels == cluster_index)
-                trial_distances = np.full(n_samples, np.inf)
-                if best_cluster_indices.size > 0:
-                    if label_kmedoids_selection == "medoid":
-                        trial_distances[best_medoid_index] = medoid_distances_to_ideal[
-                            best_medoid_distance_position
-                        ]
-                    elif label_kmedoids_selection == "min":
-                        if best_cluster_indices.size == 1:
-                            best_trial_index = best_cluster_indices[0]
-                            trial_distances[best_trial_index] = (
-                                medoid_distances_to_ideal[best_medoid_distance_position]
-                            )
-                        else:
-                            best_cluster_distances = sp.spatial.distance.cdist(
-                                normalized_matrix[best_cluster_indices],
-                                ideal_point_2d,
-                                metric=label_kmedoids_metric,
-                                **cdist_kwargs,
-                            ).flatten()
-                            min_distance_position = np.argmin(best_cluster_distances)
-                            best_trial_index = best_cluster_indices[
-                                min_distance_position
-                            ]
-                            trial_distances[best_trial_index] = best_cluster_distances[
-                                min_distance_position
-                            ]
-                    else:
-                        raise ValueError(
-                            f"Unsupported label_kmedoids_selection: {label_kmedoids_selection}. Supported are medoid/min"
-                        )
-                return trial_distances
-            elif metric in {"knn_power_mean", "knn_percentile", "knn_min", "knn_max"}:
-                label_knn_metric = self.ft_params.get("label_knn_metric", "minkowski")
-                knn_kwargs: dict[str, Any] = {}
-                if label_knn_metric == "minkowski":
-                    knn_kwargs["p"] = (
-                        label_p_order
-                        if label_p_order is not None and np.isfinite(label_p_order)
-                        else 2.0
-                    )
-                    knn_kwargs["metric_params"] = {"w": np_weights}
-                label_knn_p_order = self.ft_params.get("label_knn_p_order")
-                n_neighbors = (
-                    min(
-                        int(self.ft_params.get("label_knn_n_neighbors", 5)),
-                        n_samples - 1,
-                    )
-                    + 1
-                )
-                nbrs = sklearn.neighbors.NearestNeighbors(
-                    n_neighbors=n_neighbors, metric=label_knn_metric, **knn_kwargs
-                ).fit(normalized_matrix)
-                distances, _ = nbrs.kneighbors(normalized_matrix)
-                neighbor_distances = distances[:, 1:]
-                if neighbor_distances.shape[1] < 1:
-                    return np.full(n_samples, np.inf)
-                if metric == "knn_power_mean":
-                    label_knn_p_order = (
-                        label_knn_p_order
-                        if label_knn_p_order is not None
-                        and np.isfinite(label_knn_p_order)
-                        else 1.0
-                    )
-                    return sp.stats.pmean(
-                        neighbor_distances, p=label_knn_p_order, axis=1
-                    )
-                elif metric == "knn_percentile":
-                    label_knn_p_order = (
-                        label_knn_p_order
-                        if label_knn_p_order is not None
-                        and np.isfinite(label_knn_p_order)
-                        else 50.0
-                    )
-                    return np.percentile(neighbor_distances, label_knn_p_order, axis=1)
-                elif metric == "knn_min":
-                    return np.min(neighbor_distances, axis=1)
-                elif metric == "knn_max":
-                    return np.max(neighbor_distances, axis=1)
-            else:
-                raise ValueError(
-                    f"Unsupported label metric: {metric}. Supported metrics are {', '.join(metrics)}"
-                )
-
         objective_values_matrix = np.array(
             [trial.values for trial in best_trials], dtype=float
         )
@@ -1630,7 +1619,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
             objective_values_matrix, study.directions
         )
 
-        trial_distances = calculate_distances(normalized_matrix, metric=label_metric)
+        trial_distances = self._calculate_distances_to_ideal(
+            normalized_matrix, metric=label_metric, metrics=metrics
+        )
 
         return best_trials[np.argmin(trial_distances)]
 
@@ -1708,7 +1699,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
             metric_log_msg = ""
         else:
             try:
-                best_trial = self.get_multi_objective_study_best_trial(namespace, study)
+                best_trial = self._get_multi_objective_study_best_trial(
+                    namespace, study
+                )
             except Exception as e:
                 logger.error(
                     f"Optuna {pair} {namespace} {objective_type} objective hyperopt failed ({time_spent:.2f} secs): {repr(e)}",
index 4faa12cdf54c480d0ba8d383d976b7a5b4349d51..d0ab9324689892bb4b946d6962d8a59f6914a519 100644 (file)
@@ -195,37 +195,50 @@ class QuickAdapterV3(IStrategy):
             self.config.get("freqai", {}).get("fit_live_predictions_candles", 100)
         )
         estimated_trade_duration_candles = int(
-            self.config.get("estimated_trade_duration_candles", 48)
-        )
-        stoploss_guard_lookback_period_candles = int(
-            round(fit_live_predictions_candles * 0.5)
-        )
-        stoploss_guard_trade_limit = max(
-            1,
-            int(
-                round(
-                    (
-                        stoploss_guard_lookback_period_candles
-                        / estimated_trade_duration_candles
+            self.config.get("estimated_trade_duration_candles", 60)
+        )
+
+        lookback_period_candles = max(1, int(round(fit_live_predictions_candles * 0.5)))
+        cooldown_stop_duration_candles = 4
+        stoploss_stop_duration_candles = max(
+            cooldown_stop_duration_candles, estimated_trade_duration_candles
+        )
+        drawdown_stop_duration_candles = max(
+            fit_live_predictions_candles,
+            estimated_trade_duration_candles * 2,
+            stoploss_stop_duration_candles,
+        )
+        max_open_trades = int(self.config.get("max_open_trades", 0))
+        stoploss_trade_limit = min(
+            max(
+                1,
+                int(
+                    round(
+                        lookback_period_candles
+                        / max(1, estimated_trade_duration_candles)
                     )
-                    * 0.5
-                )
+                ),
             ),
+            max(1, int(round(max_open_trades * 0.5))),
         )
+
         return [
-            {"method": "CooldownPeriod", "stop_duration_candles": 4},
+            {
+                "method": "CooldownPeriod",
+                "stop_duration_candles": cooldown_stop_duration_candles,
+            },
             {
                 "method": "MaxDrawdown",
-                "lookback_period_candles": fit_live_predictions_candles,
-                "trade_limit": 2 * self.config.get("max_open_trades"),
-                "stop_duration_candles": fit_live_predictions_candles,
+                "lookback_period_candles": lookback_period_candles,
+                "trade_limit": int(round(1.5 * max_open_trades)),
+                "stop_duration_candles": drawdown_stop_duration_candles,
                 "max_allowed_drawdown": 0.2,
             },
             {
                 "method": "StoplossGuard",
-                "lookback_period_candles": stoploss_guard_lookback_period_candles,
-                "trade_limit": stoploss_guard_trade_limit,
-                "stop_duration_candles": stoploss_guard_lookback_period_candles,
+                "lookback_period_candles": lookback_period_candles,
+                "trade_limit": stoploss_trade_limit,
+                "stop_duration_candles": stoploss_stop_duration_candles,
                 "only_per_pair": True,
             },
         ]
@@ -239,7 +252,7 @@ class QuickAdapterV3(IStrategy):
 
     @cached_property
     def max_open_trades_per_side(self) -> int:
-        max_open_trades = self.config.get("max_open_trades")
+        max_open_trades = self.config.get("max_open_trades", 0)
         if max_open_trades < 0:
             return -1
         if self.is_short_allowed():
@@ -1420,7 +1433,7 @@ class QuickAdapterV3(IStrategy):
             return 0
         if idx < 0:
             idx = length + idx
-        return max(0, min(idx, length - 1))
+        return min(max(0, idx), length - 1)
 
     def _calculate_candle_deviation(
         self,
@@ -2101,7 +2114,7 @@ class QuickAdapterV3(IStrategy):
         ):  # "short"
             logger.info(f"User denied short entry for {pair}: shorting not allowed")
             return False
-        if Trade.get_open_trade_count() >= self.config.get("max_open_trades"):
+        if Trade.get_open_trade_count() >= self.config.get("max_open_trades", 0):
             return False
         max_open_trades_per_side = self.max_open_trades_per_side
         if max_open_trades_per_side >= 0: