https://github.com/sponsors/robcaulk
"""
- version = "3.7.88"
+ version = "3.7.89"
@cached_property
def _optuna_config(self) -> dict:
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 6.0,
- use_local_extrema_validation=False,
- use_slope_validation=False,
- use_significant_move_away_validation=False,
) -> tuple[list[int], list[float], list[int], list[float]]:
- min_confirmation_window: int = 3
- max_confirmation_window: int = 5
n = len(df)
- if df.empty or n < max(natr_period, 2 * max_confirmation_window + 1):
+ if df.empty or n < natr_period:
return [], [], [], []
natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
lows = df.get("low").to_numpy()
state: TrendDirection = TrendDirection.NEUTRAL
- depth = -1
- pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs = (
- [],
- [],
- [],
- [],
- )
+ pivots_indices: list[int] = []
+ pivots_values: list[float] = []
+ pivots_directions: list[int] = []
+ pivots_scaled_natrs: list[float] = []
last_pivot_pos = -1
candidate_pivot_pos: int = -1
candidate_pivot_value: float = np.nan
- candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL
- candidate_pivot_scaled_natr: float = np.nan
volatility_quantile_cache: dict[int, float] = {}
return volatility_quantile_cache[pos]
- def calculate_confirmation_window(
+ def calculate_slope_confirmation_window(
pos: int,
- min_window: int = min_confirmation_window,
- max_window: int = max_confirmation_window,
+ min_window: int = 3,
+ max_window: int = 5,
) -> int:
volatility_quantile = calculate_volatility_quantile(pos)
if np.isnan(volatility_quantile):
max_window,
).astype(int)
- def calculate_depth(
- pos: int,
- min_depth: int = 6,
- max_depth: int = 24,
- ) -> int:
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return int(round(median([min_depth, max_depth])))
-
- return np.clip(
- round(max_depth - (max_depth - min_depth) * volatility_quantile),
- min_depth,
- max_depth,
- ).astype(int)
-
- def calculate_min_slope_strength(
- pos: int,
- min_strength: float = 0.5,
- max_strength: float = 1.5,
- ) -> float:
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return median([min_strength, max_strength])
-
- return min_strength + (max_strength - min_strength) * volatility_quantile
-
- def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
- nonlocal \
- candidate_pivot_pos, \
- candidate_pivot_value, \
- candidate_pivot_direction, \
- candidate_pivot_scaled_natr
+ def update_candidate_pivot(pos: int, value: float):
+ nonlocal candidate_pivot_pos, candidate_pivot_value
if 0 <= pos < n:
candidate_pivot_pos = pos
candidate_pivot_value = value
- candidate_pivot_direction = direction
- candidate_pivot_scaled_natr = thresholds[pos]
def reset_candidate_pivot():
- nonlocal \
- candidate_pivot_pos, \
- candidate_pivot_value, \
- candidate_pivot_direction, \
- candidate_pivot_scaled_natr
+ nonlocal candidate_pivot_pos, candidate_pivot_value
candidate_pivot_pos = -1
candidate_pivot_value = np.nan
- candidate_pivot_direction = TrendDirection.NEUTRAL
- candidate_pivot_scaled_natr = np.nan
def add_pivot(pos: int, value: float, direction: TrendDirection):
- nonlocal last_pivot_pos, depth
+ nonlocal last_pivot_pos
if pivots_indices and indices[pos] == pivots_indices[-1]:
return
pivots_indices.append(indices[pos])
pivots_directions.append(direction)
pivots_scaled_natrs.append(thresholds[pos])
last_pivot_pos = pos
- depth = calculate_depth(pos)
reset_candidate_pivot()
- def is_reversal_confirmed(
+ def is_pivot_confirmed(
candidate_pivot_pos: int,
- confirmation_start_pos: int,
direction: TrendDirection,
- move_away_ratio: float = 0.25,
+ min_slope: float = np.finfo(float).eps,
+ enable_weighting: bool = False,
+ slopes_ok_threshold: float = 0.75,
) -> bool:
- confirmation_window = calculate_confirmation_window(candidate_pivot_pos)
-
- next_start = candidate_pivot_pos + 1
- next_end = min(next_start + confirmation_window, n)
- next_moving_start = confirmation_start_pos + 1
- next_moving_end = min(next_moving_start + confirmation_window, n)
- previous_start = max(candidate_pivot_pos - confirmation_window, 0)
- previous_end = candidate_pivot_pos
- if (
- next_start >= next_end
- or next_moving_start >= next_moving_end
- or previous_start >= previous_end
- ):
- return False
+ slope_confirmation_window = calculate_slope_confirmation_window(
+ candidate_pivot_pos
+ )
- next_slice = slice(next_start, next_end)
- next_highs = highs[next_slice]
- next_lows = lows[next_slice]
- next_moving_slice = slice(next_moving_start, next_moving_end)
- next_moving_closes = closes[next_moving_slice]
- next_moving_highs = highs[next_moving_slice]
- next_moving_lows = lows[next_moving_slice]
- previous_slice = slice(previous_start, previous_end)
- previous_highs = highs[previous_slice]
- previous_lows = lows[previous_slice]
-
- local_extrema_ok = True
- if use_local_extrema_validation:
- local_extrema_threshold = (confirmation_window - 1) / confirmation_window
-
- if direction == TrendDirection.DOWN:
- valid_next = (
- np.sum(next_highs < highs[candidate_pivot_pos]) / len(next_highs)
- >= local_extrema_threshold
- )
- valid_previous = (
- np.sum(previous_highs < highs[candidate_pivot_pos])
- / len(previous_highs)
- >= local_extrema_threshold
- )
- local_extrema_ok = valid_next and valid_previous
- elif direction == TrendDirection.UP:
- valid_next = (
- np.sum(next_lows > lows[candidate_pivot_pos]) / len(next_lows)
- >= local_extrema_threshold
- )
- valid_previous = (
- np.sum(previous_lows > lows[candidate_pivot_pos])
- / len(previous_lows)
- >= local_extrema_threshold
- )
- local_extrema_ok = valid_next and valid_previous
-
- slope_ok = True
- if use_slope_validation and len(next_moving_closes) >= 2:
- log_next_moving_closes = np.log(next_moving_closes)
- log_next_moving_closes_std = np.std(log_next_moving_closes)
- if np.isclose(log_next_moving_closes_std, 0):
- next_moving_slope_strength = 0
- else:
- log_next_moving_closes_length = len(log_next_moving_closes)
- weights = np.linspace(0.5, 1.5, log_next_moving_closes_length)
- log_next_moving_slope = np.polyfit(
- range(log_next_moving_closes_length),
- log_next_moving_closes,
+ slopes_ok: list[bool] = []
+
+ for i in range(candidate_pivot_pos + 1, n):
+ next_start = i
+ next_end = min(next_start + slope_confirmation_window, n)
+
+ next_closes = closes[next_start:next_end]
+
+ if len(next_closes) >= 2:
+ log_next_closes = np.log(next_closes)
+ log_next_closes_length = len(log_next_closes)
+ polyfit_kwargs = {}
+ if enable_weighting:
+ polyfit_kwargs = {
+ "w": np.linspace(0.5, 1.5, log_next_closes_length)
+ }
+ log_next_slope = np.polyfit(
+ range(log_next_closes_length),
+ log_next_closes,
1,
- w=weights,
+ **polyfit_kwargs,
)[0]
- next_moving_slope_strength = (
- log_next_moving_slope / log_next_moving_closes_std
- )
- min_slope_strength = calculate_min_slope_strength(candidate_pivot_pos)
- if direction == TrendDirection.DOWN:
- slope_ok = next_moving_slope_strength < -min_slope_strength
- elif direction == TrendDirection.UP:
- slope_ok = next_moving_slope_strength > min_slope_strength
-
- if use_significant_move_away_validation:
- significant_move_away_ok = False
- if direction == TrendDirection.DOWN:
- if np.any(
- next_moving_lows
- < highs[candidate_pivot_pos]
- * (1 - thresholds[candidate_pivot_pos] * move_away_ratio)
- ):
- significant_move_away_ok = True
- elif direction == TrendDirection.UP:
- if np.any(
- next_moving_highs
- > lows[candidate_pivot_pos]
- * (1 + thresholds[candidate_pivot_pos] * move_away_ratio)
- ):
- significant_move_away_ok = True
- else:
- significant_move_away_ok = True
+ if direction == TrendDirection.DOWN:
+ slopes_ok.append(log_next_slope < -min_slope)
+ elif direction == TrendDirection.UP:
+ slopes_ok.append(log_next_slope > min_slope)
+ else:
+ slopes_ok.append(False)
- return local_extrema_ok and slope_ok and significant_move_away_ok
+ if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold:
+ return True
+
+ return False
start_pos = 0
initial_high_pos = start_pos
else:
return [], [], [], []
- if n - last_pivot_pos - 1 < depth:
- return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs
-
for i in range(last_pivot_pos + 1, n):
current_high = highs[i]
current_low = lows[i]
if state == TrendDirection.UP:
if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
- update_candidate_pivot(i, current_high, TrendDirection.UP)
+ update_candidate_pivot(i, current_high)
if (
- (candidate_pivot_value - current_low) / candidate_pivot_value
- >= thresholds[candidate_pivot_pos]
- and (candidate_pivot_pos - last_pivot_pos) >= depth
- and is_reversal_confirmed(candidate_pivot_pos, i, TrendDirection.DOWN)
- ):
+ candidate_pivot_value - current_low
+ ) / candidate_pivot_value >= thresholds[
+ candidate_pivot_pos
+ ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN):
add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
state = TrendDirection.DOWN
+
elif state == TrendDirection.DOWN:
if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
- update_candidate_pivot(i, current_low, TrendDirection.DOWN)
+ update_candidate_pivot(i, current_low)
if (
- (current_high - candidate_pivot_value) / candidate_pivot_value
- >= thresholds[candidate_pivot_pos]
- and (candidate_pivot_pos - last_pivot_pos) >= depth
- and is_reversal_confirmed(candidate_pivot_pos, i, TrendDirection.UP)
- ):
+ current_high - candidate_pivot_value
+ ) / candidate_pivot_value >= thresholds[
+ candidate_pivot_pos
+ ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP):
add_pivot(
candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
)
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 6.0,
- use_local_extrema_validation=False,
- use_slope_validation=False,
- use_significant_move_away_validation=False,
) -> tuple[list[int], list[float], list[int], list[float]]:
- min_confirmation_window: int = 3
- max_confirmation_window: int = 5
n = len(df)
- if df.empty or n < max(natr_period, 2 * max_confirmation_window + 1):
+ if df.empty or n < natr_period:
return [], [], [], []
natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
lows = df.get("low").to_numpy()
state: TrendDirection = TrendDirection.NEUTRAL
- depth = -1
- pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs = (
- [],
- [],
- [],
- [],
- )
+ pivots_indices: list[int] = []
+ pivots_values: list[float] = []
+ pivots_directions: list[int] = []
+ pivots_scaled_natrs: list[float] = []
last_pivot_pos = -1
candidate_pivot_pos: int = -1
candidate_pivot_value: float = np.nan
- candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL
- candidate_pivot_scaled_natr: float = np.nan
volatility_quantile_cache: dict[int, float] = {}
return volatility_quantile_cache[pos]
- def calculate_confirmation_window(
+ def calculate_slope_confirmation_window(
pos: int,
- min_window: int = min_confirmation_window,
- max_window: int = max_confirmation_window,
+ min_window: int = 3,
+ max_window: int = 5,
) -> int:
volatility_quantile = calculate_volatility_quantile(pos)
if np.isnan(volatility_quantile):
max_window,
).astype(int)
- def calculate_depth(
- pos: int,
- min_depth: int = 6,
- max_depth: int = 24,
- ) -> int:
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return int(round(median([min_depth, max_depth])))
-
- return np.clip(
- round(max_depth - (max_depth - min_depth) * volatility_quantile),
- min_depth,
- max_depth,
- ).astype(int)
-
- def calculate_min_slope_strength(
- pos: int,
- min_strength: float = 0.5,
- max_strength: float = 1.5,
- ) -> float:
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return median([min_strength, max_strength])
-
- return min_strength + (max_strength - min_strength) * volatility_quantile
-
- def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
- nonlocal \
- candidate_pivot_pos, \
- candidate_pivot_value, \
- candidate_pivot_direction, \
- candidate_pivot_scaled_natr
+ def update_candidate_pivot(pos: int, value: float):
+ nonlocal candidate_pivot_pos, candidate_pivot_value
if 0 <= pos < n:
candidate_pivot_pos = pos
candidate_pivot_value = value
- candidate_pivot_direction = direction
- candidate_pivot_scaled_natr = thresholds[pos]
def reset_candidate_pivot():
- nonlocal \
- candidate_pivot_pos, \
- candidate_pivot_value, \
- candidate_pivot_direction, \
- candidate_pivot_scaled_natr
+ nonlocal candidate_pivot_pos, candidate_pivot_value
candidate_pivot_pos = -1
candidate_pivot_value = np.nan
- candidate_pivot_direction = TrendDirection.NEUTRAL
- candidate_pivot_scaled_natr = np.nan
def add_pivot(pos: int, value: float, direction: TrendDirection):
- nonlocal last_pivot_pos, depth
+ nonlocal last_pivot_pos
if pivots_indices and indices[pos] == pivots_indices[-1]:
return
pivots_indices.append(indices[pos])
pivots_directions.append(direction)
pivots_scaled_natrs.append(thresholds[pos])
last_pivot_pos = pos
- depth = calculate_depth(pos)
reset_candidate_pivot()
- def is_reversal_confirmed(
+ def is_pivot_confirmed(
candidate_pivot_pos: int,
- confirmation_start_pos: int,
direction: TrendDirection,
- move_away_ratio: float = 0.25,
+ min_slope: float = np.finfo(float).eps,
+ enable_weighting: bool = False,
+ slopes_ok_threshold: float = 0.75,
) -> bool:
- confirmation_window = calculate_confirmation_window(candidate_pivot_pos)
-
- next_start = candidate_pivot_pos + 1
- next_end = min(next_start + confirmation_window, n)
- next_moving_start = confirmation_start_pos + 1
- next_moving_end = min(next_moving_start + confirmation_window, n)
- previous_start = max(candidate_pivot_pos - confirmation_window, 0)
- previous_end = candidate_pivot_pos
- if (
- next_start >= next_end
- or next_moving_start >= next_moving_end
- or previous_start >= previous_end
- ):
- return False
-
- next_slice = slice(next_start, next_end)
- next_highs = highs[next_slice]
- next_lows = lows[next_slice]
- next_moving_slice = slice(next_moving_start, next_moving_end)
- next_moving_closes = closes[next_moving_slice]
- next_moving_highs = highs[next_moving_slice]
- next_moving_lows = lows[next_moving_slice]
- previous_slice = slice(previous_start, previous_end)
- previous_highs = highs[previous_slice]
- previous_lows = lows[previous_slice]
-
- local_extrema_ok = True
- if use_local_extrema_validation:
- local_extrema_threshold = (confirmation_window - 1) / confirmation_window
-
- if direction == TrendDirection.DOWN:
- valid_next = (
- np.sum(next_highs < highs[candidate_pivot_pos]) / len(next_highs)
- >= local_extrema_threshold
- )
- valid_previous = (
- np.sum(previous_highs < highs[candidate_pivot_pos])
- / len(previous_highs)
- >= local_extrema_threshold
- )
- local_extrema_ok = valid_next and valid_previous
- elif direction == TrendDirection.UP:
- valid_next = (
- np.sum(next_lows > lows[candidate_pivot_pos]) / len(next_lows)
- >= local_extrema_threshold
- )
- valid_previous = (
- np.sum(previous_lows > lows[candidate_pivot_pos])
- / len(previous_lows)
- >= local_extrema_threshold
- )
- local_extrema_ok = valid_next and valid_previous
-
- slope_ok = True
- if use_slope_validation and len(next_moving_closes) >= 2:
- log_next_moving_closes = np.log(next_moving_closes)
- log_next_moving_closes_std = np.std(log_next_moving_closes)
- if np.isclose(log_next_moving_closes_std, 0):
- next_moving_slope_strength = 0
- else:
- log_next_moving_closes_length = len(log_next_moving_closes)
- weights = np.linspace(0.5, 1.5, log_next_moving_closes_length)
- log_next_moving_slope = np.polyfit(
- range(log_next_moving_closes_length),
- log_next_moving_closes,
+ slope_confirmation_window = calculate_slope_confirmation_window(
+ candidate_pivot_pos
+ )
+
+ slopes_ok: list[bool] = []
+
+ for i in range(candidate_pivot_pos + 1, n):
+ next_start = i
+ next_end = min(next_start + slope_confirmation_window, n)
+
+ next_closes = closes[next_start:next_end]
+
+ if len(next_closes) >= 2:
+ log_next_closes = np.log(next_closes)
+ log_next_closes_length = len(log_next_closes)
+ polyfit_kwargs = {}
+ if enable_weighting:
+ polyfit_kwargs = {
+ "w": np.linspace(0.5, 1.5, log_next_closes_length)
+ }
+ log_next_slope = np.polyfit(
+ range(log_next_closes_length),
+ log_next_closes,
1,
- w=weights,
+ **polyfit_kwargs,
)[0]
- next_moving_slope_strength = (
- log_next_moving_slope / log_next_moving_closes_std
- )
- min_slope_strength = calculate_min_slope_strength(candidate_pivot_pos)
- if direction == TrendDirection.DOWN:
- slope_ok = next_moving_slope_strength < -min_slope_strength
- elif direction == TrendDirection.UP:
- slope_ok = next_moving_slope_strength > min_slope_strength
-
- if use_significant_move_away_validation:
- significant_move_away_ok = False
- if direction == TrendDirection.DOWN:
- if np.any(
- next_moving_lows
- < highs[candidate_pivot_pos]
- * (1 - thresholds[candidate_pivot_pos] * move_away_ratio)
- ):
- significant_move_away_ok = True
- elif direction == TrendDirection.UP:
- if np.any(
- next_moving_highs
- > lows[candidate_pivot_pos]
- * (1 + thresholds[candidate_pivot_pos] * move_away_ratio)
- ):
- significant_move_away_ok = True
- else:
- significant_move_away_ok = True
+ if direction == TrendDirection.DOWN:
+ slopes_ok.append(log_next_slope < -min_slope)
+ elif direction == TrendDirection.UP:
+ slopes_ok.append(log_next_slope > min_slope)
+ else:
+ slopes_ok.append(False)
- return local_extrema_ok and slope_ok and significant_move_away_ok
+ if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold:
+ return True
+
+ return False
start_pos = 0
initial_high_pos = start_pos
else:
return [], [], [], []
- if n - last_pivot_pos - 1 < depth:
- return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs
-
for i in range(last_pivot_pos + 1, n):
current_high = highs[i]
current_low = lows[i]
if state == TrendDirection.UP:
if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
- update_candidate_pivot(i, current_high, TrendDirection.UP)
+ update_candidate_pivot(i, current_high)
if (
- (candidate_pivot_value - current_low) / candidate_pivot_value
- >= thresholds[candidate_pivot_pos]
- and (candidate_pivot_pos - last_pivot_pos) >= depth
- and is_reversal_confirmed(candidate_pivot_pos, i, TrendDirection.DOWN)
- ):
+ candidate_pivot_value - current_low
+ ) / candidate_pivot_value >= thresholds[
+ candidate_pivot_pos
+ ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN):
add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
state = TrendDirection.DOWN
+
elif state == TrendDirection.DOWN:
if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
- update_candidate_pivot(i, current_low, TrendDirection.DOWN)
+ update_candidate_pivot(i, current_low)
if (
- (current_high - candidate_pivot_value) / candidate_pivot_value
- >= thresholds[candidate_pivot_pos]
- and (candidate_pivot_pos - last_pivot_pos) >= depth
- and is_reversal_confirmed(candidate_pivot_pos, i, TrendDirection.UP)
- ):
+ current_high - candidate_pivot_value
+ ) / candidate_pivot_value >= thresholds[
+ candidate_pivot_pos
+ ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP):
add_pivot(
candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
)