model_training_parameters: dict,
) -> float:
def calculate_min_extrema(
- length: int, fit_live_predictions_candles: int, min_extrema: float = 2.0
+ length: int, fit_live_predictions_candles: int, min_extrema: int = 2
) -> int:
return int(round((length / fit_live_predictions_candles) * min_extrema))
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 6.0,
+ use_local_extrema_validation=False,
+ use_slope_validation=False,
+ use_significant_move_away_validation=False,
) -> tuple[list[int], list[float], list[int], list[float]]:
min_confirmation_window: int = 3
max_confirmation_window: int = 5
candidate_pivot_pos: int,
confirmation_start_pos: int,
direction: TrendDirection,
- extrema_threshold: float = 0.85,
move_away_ratio: float = 0.25,
) -> bool:
confirmation_window = calculate_confirmation_window(candidate_pivot_pos)
previous_highs = highs[previous_slice]
previous_lows = lows[previous_slice]
- local_extrema_ok = False
- if direction == TrendDirection.DOWN:
- valid_next = (
- np.sum(next_highs < highs[candidate_pivot_pos]) / len(next_highs)
- >= extrema_threshold
- )
- valid_previous = (
- np.sum(previous_highs < highs[candidate_pivot_pos])
- / len(previous_highs)
- >= extrema_threshold
- )
- local_extrema_ok = valid_next and valid_previous
- elif direction == TrendDirection.UP:
- valid_next = (
- np.sum(next_lows > lows[candidate_pivot_pos]) / len(next_lows)
- >= extrema_threshold
- )
- valid_previous = (
- np.sum(previous_lows > lows[candidate_pivot_pos]) / len(previous_lows)
- >= extrema_threshold
- )
- local_extrema_ok = valid_next and valid_previous
- if not local_extrema_ok:
- return False
+ local_extrema_ok = True
+ if use_local_extrema_validation:
+ local_extrema_threshold = (confirmation_window - 1) / confirmation_window
- slope_ok = False
- if len(next_moving_closes) >= 2:
+ if direction == TrendDirection.DOWN:
+ valid_next = (
+ np.sum(next_highs < highs[candidate_pivot_pos]) / len(next_highs)
+ >= local_extrema_threshold
+ )
+ valid_previous = (
+ np.sum(previous_highs < highs[candidate_pivot_pos])
+ / len(previous_highs)
+ >= local_extrema_threshold
+ )
+ local_extrema_ok = valid_next and valid_previous
+ elif direction == TrendDirection.UP:
+ valid_next = (
+ np.sum(next_lows > lows[candidate_pivot_pos]) / len(next_lows)
+ >= local_extrema_threshold
+ )
+ valid_previous = (
+ np.sum(previous_lows > lows[candidate_pivot_pos])
+ / len(previous_lows)
+ >= local_extrema_threshold
+ )
+ local_extrema_ok = valid_next and valid_previous
+
+ slope_ok = True
+ if use_slope_validation and len(next_moving_closes) >= 2:
log_next_moving_closes = np.log(next_moving_closes)
log_next_moving_closes_std = np.std(log_next_moving_closes)
if np.isclose(log_next_moving_closes_std, 0):
slope_ok = next_moving_slope_strength < -min_slope_strength
elif direction == TrendDirection.UP:
slope_ok = next_moving_slope_strength > min_slope_strength
- if not slope_ok:
- return False
- significant_move_away_ok = False
- if direction == TrendDirection.DOWN:
- if np.any(
- next_moving_lows
- < highs[candidate_pivot_pos]
- * (1 - thresholds[candidate_pivot_pos] * move_away_ratio)
- ):
- significant_move_away_ok = True
- elif direction == TrendDirection.UP:
- if np.any(
- next_moving_highs
- > lows[candidate_pivot_pos]
- * (1 + thresholds[candidate_pivot_pos] * move_away_ratio)
- ):
- significant_move_away_ok = True
- return significant_move_away_ok
+ if use_significant_move_away_validation:
+ significant_move_away_ok = False
+ if direction == TrendDirection.DOWN:
+ if np.any(
+ next_moving_lows
+ < highs[candidate_pivot_pos]
+ * (1 - thresholds[candidate_pivot_pos] * move_away_ratio)
+ ):
+ significant_move_away_ok = True
+ elif direction == TrendDirection.UP:
+ if np.any(
+ next_moving_highs
+ > lows[candidate_pivot_pos]
+ * (1 + thresholds[candidate_pivot_pos] * move_away_ratio)
+ ):
+ significant_move_away_ok = True
+ else:
+ significant_move_away_ok = True
+
+ return local_extrema_ok and slope_ok and significant_move_away_ok
start_pos = 0
initial_high_pos = start_pos
extrema_smoothing_zero_phase = self.freqai_info.get(
"extrema_smoothing_zero_phase", True
)
+ std = derive_gaussian_std_from_window(window)
extrema_smoothing_beta = float(
- self.freqai_info.get("extrema_smoothing_beta", 10.0)
+ self.freqai_info.get("extrema_smoothing_beta", 8.0)
)
- std = derive_gaussian_std_from_window(window)
if debug:
logger.info(
- f"{extrema_smoothing=}, {extrema_smoothing_zero_phase=}, {extrema_smoothing_beta=}, {window=}, {std=}"
+ f"{extrema_smoothing=}, {extrema_smoothing_zero_phase=}, {window=}, {std=}, {extrema_smoothing_beta=}"
)
gaussian_window = get_gaussian_window(std, True)
odd_window = get_odd_window(window)
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 6.0,
+ use_local_extrema_validation=False,
+ use_slope_validation=False,
+ use_significant_move_away_validation=False,
) -> tuple[list[int], list[float], list[int], list[float]]:
min_confirmation_window: int = 3
max_confirmation_window: int = 5
candidate_pivot_pos: int,
confirmation_start_pos: int,
direction: TrendDirection,
- extrema_threshold: float = 0.85,
move_away_ratio: float = 0.25,
) -> bool:
confirmation_window = calculate_confirmation_window(candidate_pivot_pos)
previous_highs = highs[previous_slice]
previous_lows = lows[previous_slice]
- local_extrema_ok = False
- if direction == TrendDirection.DOWN:
- valid_next = (
- np.sum(next_highs < highs[candidate_pivot_pos]) / len(next_highs)
- >= extrema_threshold
- )
- valid_previous = (
- np.sum(previous_highs < highs[candidate_pivot_pos])
- / len(previous_highs)
- >= extrema_threshold
- )
- local_extrema_ok = valid_next and valid_previous
- elif direction == TrendDirection.UP:
- valid_next = (
- np.sum(next_lows > lows[candidate_pivot_pos]) / len(next_lows)
- >= extrema_threshold
- )
- valid_previous = (
- np.sum(previous_lows > lows[candidate_pivot_pos]) / len(previous_lows)
- >= extrema_threshold
- )
- local_extrema_ok = valid_next and valid_previous
- if not local_extrema_ok:
- return False
+ local_extrema_ok = True
+ if use_local_extrema_validation:
+ local_extrema_threshold = (confirmation_window - 1) / confirmation_window
+
+ if direction == TrendDirection.DOWN:
+ valid_next = (
+ np.sum(next_highs < highs[candidate_pivot_pos]) / len(next_highs)
+ >= local_extrema_threshold
+ )
+ valid_previous = (
+ np.sum(previous_highs < highs[candidate_pivot_pos])
+ / len(previous_highs)
+ >= local_extrema_threshold
+ )
+ local_extrema_ok = valid_next and valid_previous
+ elif direction == TrendDirection.UP:
+ valid_next = (
+ np.sum(next_lows > lows[candidate_pivot_pos]) / len(next_lows)
+ >= local_extrema_threshold
+ )
+ valid_previous = (
+ np.sum(previous_lows > lows[candidate_pivot_pos])
+ / len(previous_lows)
+ >= local_extrema_threshold
+ )
+ local_extrema_ok = valid_next and valid_previous
- slope_ok = False
- if len(next_moving_closes) >= 2:
+ slope_ok = True
+ if use_slope_validation and len(next_moving_closes) >= 2:
log_next_moving_closes = np.log(next_moving_closes)
log_next_moving_closes_std = np.std(log_next_moving_closes)
if np.isclose(log_next_moving_closes_std, 0):
slope_ok = next_moving_slope_strength < -min_slope_strength
elif direction == TrendDirection.UP:
slope_ok = next_moving_slope_strength > min_slope_strength
- if not slope_ok:
- return False
- significant_move_away_ok = False
- if direction == TrendDirection.DOWN:
- if np.any(
- next_moving_lows
- < highs[candidate_pivot_pos]
- * (1 - thresholds[candidate_pivot_pos] * move_away_ratio)
- ):
- significant_move_away_ok = True
- elif direction == TrendDirection.UP:
- if np.any(
- next_moving_highs
- > lows[candidate_pivot_pos]
- * (1 + thresholds[candidate_pivot_pos] * move_away_ratio)
- ):
- significant_move_away_ok = True
- return significant_move_away_ok
+ if use_significant_move_away_validation:
+ significant_move_away_ok = False
+ if direction == TrendDirection.DOWN:
+ if np.any(
+ next_moving_lows
+ < highs[candidate_pivot_pos]
+ * (1 - thresholds[candidate_pivot_pos] * move_away_ratio)
+ ):
+ significant_move_away_ok = True
+ elif direction == TrendDirection.UP:
+ if np.any(
+ next_moving_highs
+ > lows[candidate_pivot_pos]
+ * (1 + thresholds[candidate_pivot_pos] * move_away_ratio)
+ ):
+ significant_move_away_ok = True
+ else:
+ significant_move_away_ok = True
+
+ return local_extrema_ok and slope_ok and significant_move_away_ok
start_pos = 0
initial_high_pos = start_pos