https://github.com/sponsors/robcaulk
"""
- version = "3.10.4"
+ version = "3.10.5"
_TEST_SIZE: Final[float] = 0.1
scaler_obj = SKLearnWrapper(StandardScaler())
elif scaler == QuickAdapterRegressorV3._SCALER_TYPES[3]: # "robust"
scaler_obj = SKLearnWrapper(RobustScaler())
- else:
+ else: # "minmax"
scaler_obj = SKLearnWrapper(MinMaxScaler(feature_range=feature_range))
steps = [
)
return minima_indices, maxima_indices
+ @staticmethod
+ def _calculate_n_kept_extrema(count: int, keep_fraction: float) -> int:
+ return max(1, int(round(count * keep_fraction))) if count > 0 else 0
+
@staticmethod
def _get_ranked_peaks(
pred_extrema: pd.Series,
maxima_indices: NDArray[np.intp],
keep_extrema_fraction: float = 1.0,
) -> tuple[pd.Series, pd.Series]:
- n_kept_minima = (
- max(1, int(round(minima_indices.size * keep_extrema_fraction)))
- if minima_indices.size > 0
- else 0
+ n_kept_minima = QuickAdapterRegressorV3._calculate_n_kept_extrema(
+ minima_indices.size, keep_extrema_fraction
)
- n_kept_maxima = (
- max(1, int(round(maxima_indices.size * keep_extrema_fraction)))
- if maxima_indices.size > 0
- else 0
+ n_kept_maxima = QuickAdapterRegressorV3._calculate_n_kept_extrema(
+ maxima_indices.size, keep_extrema_fraction
)
pred_minima = (
n_maxima: int,
keep_extrema_fraction: float = 1.0,
) -> tuple[pd.Series, pd.Series]:
- n_kept_minima = (
- max(1, int(round(n_minima * keep_extrema_fraction))) if n_minima > 0 else 0
+ n_kept_minima = QuickAdapterRegressorV3._calculate_n_kept_extrema(
+ n_minima, keep_extrema_fraction
)
- n_kept_maxima = (
- max(1, int(round(n_maxima * keep_extrema_fraction))) if n_maxima > 0 else 0
+ n_kept_maxima = QuickAdapterRegressorV3._calculate_n_kept_extrema(
+ n_maxima, keep_extrema_fraction
)
pred_minima = (
candidate_pivot_pos = -1
candidate_pivot_value = np.nan
- def calculate_pivot_amplitude_and_threshold_ratio(
+ def calculate_pivot_metrics(
*,
previous_pos: int,
previous_value: float,
current_pos: int,
current_value: float,
- ) -> tuple[float, float]:
+ ) -> tuple[float, float, float]:
if previous_pos < 0 or current_pos < 0:
- return np.nan, np.nan
+ return np.nan, np.nan, np.nan
if previous_pos >= n or current_pos >= n:
- return np.nan, np.nan
+ return np.nan, np.nan, np.nan
if np.isclose(previous_value, 0.0):
- return np.nan, np.nan
+ return np.nan, np.nan, np.nan
amplitude = abs(current_value - previous_value) / abs(previous_value)
if not (np.isfinite(amplitude) and amplitude >= 0):
- return np.nan, np.nan
+ return np.nan, np.nan, np.nan
start_pos = min(previous_pos, current_pos)
end_pos = max(previous_pos, current_pos) + 1
else np.nan
)
- return amplitude / (1.0 + amplitude), amplitude_threshold_ratio
+ duration = calculate_pivot_duration(
+ previous_pos=previous_pos,
+ current_pos=current_pos,
+ )
+
+ if np.isfinite(duration) and duration > 0:
+ speed = amplitude / duration
+ normalized_speed = (
+ speed / (1.0 + speed) if np.isfinite(speed) and speed >= 0 else np.nan
+ )
+ else:
+ normalized_speed = np.nan
+
+ return (
+ amplitude / (1.0 + amplitude),
+ amplitude_threshold_ratio,
+ normalized_speed,
+ )
def calculate_pivot_duration(
*,
return avg_volume_per_candle / (avg_volume_per_candle + median_volume)
return np.nan
- def calculate_pivot_speed(
- *,
- previous_pos: int,
- previous_value: float,
- current_pos: int,
- current_value: float,
- ) -> float:
- if previous_pos < 0 or current_pos < 0:
- return np.nan
- if previous_pos >= n or current_pos >= n:
- return np.nan
-
- if np.isclose(previous_value, 0.0):
- return np.nan
-
- duration = calculate_pivot_duration(
- previous_pos=previous_pos,
- current_pos=current_pos,
- )
- if not np.isfinite(duration) or duration == 0:
- return np.nan
-
- amplitude = abs(current_value - previous_value) / abs(previous_value)
- if not (np.isfinite(amplitude) and amplitude >= 0):
- return np.nan
-
- speed = amplitude / duration
- return speed / (1.0 + speed) if np.isfinite(speed) and speed >= 0 else np.nan
-
def calculate_pivot_efficiency_ratio(
*,
previous_pos: int,
and last_pivot_pos >= 0
and len(pivots_values) == len(pivots_amplitudes)
):
- amplitude, amplitude_threshold_ratio = (
- calculate_pivot_amplitude_and_threshold_ratio(
- previous_pos=last_pivot_pos,
- previous_value=pivots_values[-1],
- current_pos=pos,
- current_value=value,
- )
- )
- volume_rate = calculate_pivot_volume_rate(
+ amplitude, amplitude_threshold_ratio, speed = calculate_pivot_metrics(
previous_pos=last_pivot_pos,
+ previous_value=pivots_values[-1],
current_pos=pos,
+ current_value=value,
)
- speed = calculate_pivot_speed(
+ volume_rate = calculate_pivot_volume_rate(
previous_pos=last_pivot_pos,
- previous_value=pivots_values[-1],
current_pos=pos,
- current_value=value,
)
efficiency_ratio = calculate_pivot_efficiency_ratio(
previous_pos=last_pivot_pos,