]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(quickadapter): consolidate pivot metrics and extrema ranking; bump version...
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 7 Jan 2026 12:46:29 +0000 (13:46 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 7 Jan 2026 12:46:29 +0000 (13:46 +0100)
- Utils.py: unify amplitude/threshold/speed in calculate_pivot_metrics, remove calculate_pivot_speed, update add_pivot to consume normalized speed; preserves edge-case guards (NaN/inf, zero duration).
- QuickAdapterRegressorV3: add _calculate_n_kept_extrema and use in ranking; mark scaler fallback path; bump version to 3.10.5.
- QuickAdapterV3: bump version() to 3.10.5; adjust docstring for t-distribution helper.

quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index b87c3b5fa43b3d6559c827562bfc91eca7f4d0f9..44b2aa6bcd6140ba030047195ce37e2e20fb227f 100644 (file)
@@ -87,7 +87,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.10.4"
+    version = "3.10.5"
 
     _TEST_SIZE: Final[float] = 0.1
 
@@ -1371,7 +1371,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
             scaler_obj = SKLearnWrapper(StandardScaler())
         elif scaler == QuickAdapterRegressorV3._SCALER_TYPES[3]:  # "robust"
             scaler_obj = SKLearnWrapper(RobustScaler())
-        else:
+        else:  # "minmax"
             scaler_obj = SKLearnWrapper(MinMaxScaler(feature_range=feature_range))
 
         steps = [
@@ -1704,6 +1704,10 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         )
         return minima_indices, maxima_indices
 
+    @staticmethod
+    def _calculate_n_kept_extrema(count: int, keep_fraction: float) -> int:
+        return max(1, int(round(count * keep_fraction))) if count > 0 else 0
+
     @staticmethod
     def _get_ranked_peaks(
         pred_extrema: pd.Series,
@@ -1711,15 +1715,11 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         maxima_indices: NDArray[np.intp],
         keep_extrema_fraction: float = 1.0,
     ) -> tuple[pd.Series, pd.Series]:
-        n_kept_minima = (
-            max(1, int(round(minima_indices.size * keep_extrema_fraction)))
-            if minima_indices.size > 0
-            else 0
+        n_kept_minima = QuickAdapterRegressorV3._calculate_n_kept_extrema(
+            minima_indices.size, keep_extrema_fraction
         )
-        n_kept_maxima = (
-            max(1, int(round(maxima_indices.size * keep_extrema_fraction)))
-            if maxima_indices.size > 0
-            else 0
+        n_kept_maxima = QuickAdapterRegressorV3._calculate_n_kept_extrema(
+            maxima_indices.size, keep_extrema_fraction
         )
 
         pred_minima = (
@@ -1750,11 +1750,11 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         n_maxima: int,
         keep_extrema_fraction: float = 1.0,
     ) -> tuple[pd.Series, pd.Series]:
-        n_kept_minima = (
-            max(1, int(round(n_minima * keep_extrema_fraction))) if n_minima > 0 else 0
+        n_kept_minima = QuickAdapterRegressorV3._calculate_n_kept_extrema(
+            n_minima, keep_extrema_fraction
         )
-        n_kept_maxima = (
-            max(1, int(round(n_maxima * keep_extrema_fraction))) if n_maxima > 0 else 0
+        n_kept_maxima = QuickAdapterRegressorV3._calculate_n_kept_extrema(
+            n_maxima, keep_extrema_fraction
         )
 
         pred_minima = (
index 183f2a31d45bc809d1635a1d224cbca75964eecc..23db9bbfc1dd3f29d99fe8255164d2d5d0e41e25 100644 (file)
@@ -107,7 +107,7 @@ class QuickAdapterV3(IStrategy):
     _PLOT_EXTREMA_MIN_EPS: Final[float] = 0.01
 
     def version(self) -> str:
-        return "3.10.4"
+        return "3.10.5"
 
     timeframe = "5m"
     timeframe_minutes = timeframe_to_minutes(timeframe)
@@ -2003,7 +2003,7 @@ class QuickAdapterV3(IStrategy):
 
         Args:
             q: Quantile in (0, 1), e.g. 0.75.
-            df: Degrees of freedom (can be fractional).
+            df: Degrees of freedom.
             default_t: Fallback value on error.
 
         Returns:
index fec90b981001bba5e04e9d05b37cd088bb2bbfca..04dec0805102a46d97c9f41680932ed89c965edc 100644 (file)
@@ -1237,24 +1237,24 @@ def zigzag(
         candidate_pivot_pos = -1
         candidate_pivot_value = np.nan
 
-    def calculate_pivot_amplitude_and_threshold_ratio(
+    def calculate_pivot_metrics(
         *,
         previous_pos: int,
         previous_value: float,
         current_pos: int,
         current_value: float,
-    ) -> tuple[float, float]:
+    ) -> tuple[float, float, float]:
         if previous_pos < 0 or current_pos < 0:
-            return np.nan, np.nan
+            return np.nan, np.nan, np.nan
         if previous_pos >= n or current_pos >= n:
-            return np.nan, np.nan
+            return np.nan, np.nan, np.nan
 
         if np.isclose(previous_value, 0.0):
-            return np.nan, np.nan
+            return np.nan, np.nan, np.nan
 
         amplitude = abs(current_value - previous_value) / abs(previous_value)
         if not (np.isfinite(amplitude) and amplitude >= 0):
-            return np.nan, np.nan
+            return np.nan, np.nan, np.nan
 
         start_pos = min(previous_pos, current_pos)
         end_pos = max(previous_pos, current_pos) + 1
@@ -1266,7 +1266,24 @@ def zigzag(
             else np.nan
         )
 
-        return amplitude / (1.0 + amplitude), amplitude_threshold_ratio
+        duration = calculate_pivot_duration(
+            previous_pos=previous_pos,
+            current_pos=current_pos,
+        )
+
+        if np.isfinite(duration) and duration > 0:
+            speed = amplitude / duration
+            normalized_speed = (
+                speed / (1.0 + speed) if np.isfinite(speed) and speed >= 0 else np.nan
+            )
+        else:
+            normalized_speed = np.nan
+
+        return (
+            amplitude / (1.0 + amplitude),
+            amplitude_threshold_ratio,
+            normalized_speed,
+        )
 
     def calculate_pivot_duration(
         *,
@@ -1310,35 +1327,6 @@ def zigzag(
             return avg_volume_per_candle / (avg_volume_per_candle + median_volume)
         return np.nan
 
-    def calculate_pivot_speed(
-        *,
-        previous_pos: int,
-        previous_value: float,
-        current_pos: int,
-        current_value: float,
-    ) -> float:
-        if previous_pos < 0 or current_pos < 0:
-            return np.nan
-        if previous_pos >= n or current_pos >= n:
-            return np.nan
-
-        if np.isclose(previous_value, 0.0):
-            return np.nan
-
-        duration = calculate_pivot_duration(
-            previous_pos=previous_pos,
-            current_pos=current_pos,
-        )
-        if not np.isfinite(duration) or duration == 0:
-            return np.nan
-
-        amplitude = abs(current_value - previous_value) / abs(previous_value)
-        if not (np.isfinite(amplitude) and amplitude >= 0):
-            return np.nan
-
-        speed = amplitude / duration
-        return speed / (1.0 + speed) if np.isfinite(speed) and speed >= 0 else np.nan
-
     def calculate_pivot_efficiency_ratio(
         *,
         previous_pos: int,
@@ -1409,23 +1397,15 @@ def zigzag(
             and last_pivot_pos >= 0
             and len(pivots_values) == len(pivots_amplitudes)
         ):
-            amplitude, amplitude_threshold_ratio = (
-                calculate_pivot_amplitude_and_threshold_ratio(
-                    previous_pos=last_pivot_pos,
-                    previous_value=pivots_values[-1],
-                    current_pos=pos,
-                    current_value=value,
-                )
-            )
-            volume_rate = calculate_pivot_volume_rate(
+            amplitude, amplitude_threshold_ratio, speed = calculate_pivot_metrics(
                 previous_pos=last_pivot_pos,
+                previous_value=pivots_values[-1],
                 current_pos=pos,
+                current_value=value,
             )
-            speed = calculate_pivot_speed(
+            volume_rate = calculate_pivot_volume_rate(
                 previous_pos=last_pivot_pos,
-                previous_value=pivots_values[-1],
                 current_pos=pos,
-                current_value=value,
             )
             efficiency_ratio = calculate_pivot_efficiency_ratio(
                 previous_pos=last_pivot_pos,