https://github.com/sponsors/robcaulk
"""
- version = "3.7.33"
+ version = "3.7.34"
@cached_property
def _optuna_config(self) -> dict:
candidate_pivot_pos: int,
next_confirmation_pos: int,
direction: TrendDirection,
+ extrema_threshold: float = 0.95,
+ min_slope_strength: float = 0.5,
move_away_ratio: float = 0.25,
- min_price_change_ratio: float = 0.125,
) -> bool:
next_start = next_confirmation_pos + 1
- next_end = min(next_confirmation_pos + confirmation_window + 1, n)
+ next_end = min(next_start + confirmation_window, n)
previous_start = max(candidate_pivot_pos - confirmation_window, 0)
previous_end = candidate_pivot_pos
if next_start >= next_end or previous_start >= previous_end:
next_lows = lows[next_slice]
previous_slice = slice(previous_start, previous_end)
previous_closes = closes[previous_slice]
- previous_highs = highs[previous_slice]
- previous_lows = lows[previous_slice]
local_extrema_ok = False
if direction == TrendDirection.DOWN:
- if (
- np.all(next_closes < highs[candidate_pivot_pos])
- and np.all(previous_closes < highs[candidate_pivot_pos])
- and np.max(next_highs) <= highs[candidate_pivot_pos]
- and np.max(previous_highs) <= highs[candidate_pivot_pos]
- ):
- local_extrema_ok = True
+ valid_next = (
+ np.sum(next_closes < highs[candidate_pivot_pos]) / len(next_closes)
+ >= extrema_threshold
+ )
+ valid_previous = (
+ np.sum(previous_closes < highs[candidate_pivot_pos])
+ / len(previous_closes)
+ >= extrema_threshold
+ )
+ local_extrema_ok = valid_next and valid_previous
elif direction == TrendDirection.UP:
- if (
- np.all(next_closes > lows[candidate_pivot_pos])
- and np.all(previous_closes > lows[candidate_pivot_pos])
- and np.min(next_lows) >= lows[candidate_pivot_pos]
- and np.min(previous_lows) >= lows[candidate_pivot_pos]
- ):
- local_extrema_ok = True
-
- slope_ok = True
- if len(next_closes) >= 2:
- next_slope = (next_closes[-1] - next_closes[0]) / (len(next_closes) - 1)
+ valid_next = (
+ np.sum(next_closes > lows[candidate_pivot_pos]) / len(next_closes)
+ >= extrema_threshold
+ )
+ valid_previous = (
+ np.sum(previous_closes > lows[candidate_pivot_pos])
+ / len(previous_closes)
+ >= extrema_threshold
+ )
+ local_extrema_ok = valid_next and valid_previous
+
+ slope_ok = False
+ next_closes_std = np.std(next_closes)
+ if len(next_closes) >= 2 and next_closes_std > np.finfo(float).eps:
+ weights = np.linspace(0.5, 1.5, len(next_closes))
+ next_slope = np.polyfit(range(len(next_closes)), next_closes, 1, w=weights)[
+ 0
+ ]
+ next_slope_strength = next_slope / next_closes_std
if direction == TrendDirection.DOWN:
- slope_ok = next_slope < 0
+ slope_ok = next_slope_strength < -min_slope_strength
elif direction == TrendDirection.UP:
- slope_ok = next_slope > 0
+ slope_ok = next_slope_strength > min_slope_strength
significant_move_away_ok = False
if direction == TrendDirection.DOWN:
):
significant_move_away_ok = True
- min_price_change_ok = False
- required_price_change = (
- thresholds[next_confirmation_pos]
- * min_price_change_ratio
- * closes[next_confirmation_pos]
- )
- if len(next_closes) > 0:
- actual_price_change = abs(next_closes[-1] - next_closes[0])
- if actual_price_change >= required_price_change:
- min_price_change_ok = True
-
- return (
- local_extrema_ok
- and slope_ok
- and significant_move_away_ok
- and min_price_change_ok
- )
+ return local_extrema_ok and slope_ok and significant_move_away_ok
start_pos = 0
initial_high_pos = start_pos
candidate_pivot_pos: int,
next_confirmation_pos: int,
direction: TrendDirection,
+ extrema_threshold: float = 0.95,
+ min_slope_strength: float = 0.5,
move_away_ratio: float = 0.25,
- min_price_change_ratio: float = 0.125,
) -> bool:
next_start = next_confirmation_pos + 1
- next_end = min(next_confirmation_pos + confirmation_window + 1, n)
+ next_end = min(next_start + confirmation_window, n)
previous_start = max(candidate_pivot_pos - confirmation_window, 0)
previous_end = candidate_pivot_pos
if next_start >= next_end or previous_start >= previous_end:
next_lows = lows[next_slice]
previous_slice = slice(previous_start, previous_end)
previous_closes = closes[previous_slice]
- previous_highs = highs[previous_slice]
- previous_lows = lows[previous_slice]
local_extrema_ok = False
if direction == TrendDirection.DOWN:
- if (
- np.all(next_closes < highs[candidate_pivot_pos])
- and np.all(previous_closes < highs[candidate_pivot_pos])
- and np.max(next_highs) <= highs[candidate_pivot_pos]
- and np.max(previous_highs) <= highs[candidate_pivot_pos]
- ):
- local_extrema_ok = True
+ valid_next = (
+ np.sum(next_closes < highs[candidate_pivot_pos]) / len(next_closes)
+ >= extrema_threshold
+ )
+ valid_previous = (
+ np.sum(previous_closes < highs[candidate_pivot_pos])
+ / len(previous_closes)
+ >= extrema_threshold
+ )
+ local_extrema_ok = valid_next and valid_previous
elif direction == TrendDirection.UP:
- if (
- np.all(next_closes > lows[candidate_pivot_pos])
- and np.all(previous_closes > lows[candidate_pivot_pos])
- and np.min(next_lows) >= lows[candidate_pivot_pos]
- and np.min(previous_lows) >= lows[candidate_pivot_pos]
- ):
- local_extrema_ok = True
-
- slope_ok = True
- if len(next_closes) >= 2:
- next_slope = (next_closes[-1] - next_closes[0]) / (len(next_closes) - 1)
+ valid_next = (
+ np.sum(next_closes > lows[candidate_pivot_pos]) / len(next_closes)
+ >= extrema_threshold
+ )
+ valid_previous = (
+ np.sum(previous_closes > lows[candidate_pivot_pos])
+ / len(previous_closes)
+ >= extrema_threshold
+ )
+ local_extrema_ok = valid_next and valid_previous
+
+ slope_ok = False
+ next_closes_std = np.std(next_closes)
+ if len(next_closes) >= 2 and next_closes_std > np.finfo(float).eps:
+ weights = np.linspace(0.5, 1.5, len(next_closes))
+ next_slope = np.polyfit(range(len(next_closes)), next_closes, 1, w=weights)[
+ 0
+ ]
+ next_slope_strength = next_slope / next_closes_std
if direction == TrendDirection.DOWN:
- slope_ok = next_slope < 0
+ slope_ok = next_slope_strength < -min_slope_strength
elif direction == TrendDirection.UP:
- slope_ok = next_slope > 0
+ slope_ok = next_slope_strength > min_slope_strength
significant_move_away_ok = False
if direction == TrendDirection.DOWN:
):
significant_move_away_ok = True
- min_price_change_ok = False
- required_price_change = (
- thresholds[next_confirmation_pos]
- * min_price_change_ratio
- * closes[next_confirmation_pos]
- )
- if len(next_closes) > 0:
- actual_price_change = abs(next_closes[-1] - next_closes[0])
- if actual_price_change >= required_price_change:
- min_price_change_ok = True
-
- return (
- local_extrema_ok
- and slope_ok
- and significant_move_away_ok
- and min_price_change_ok
- )
+ return local_extrema_ok and slope_ok and significant_move_away_ok
start_pos = 0
initial_high_pos = start_pos