confirmation_window: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
- if df.empty or len(df) < max(natr_period, 2 * confirmation_window + 1):
+ n = len(df)
+ if df.empty or n < max(natr_period, 2 * confirmation_window + 1):
return [], [], []
indices = df.index.tolist()
def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
- candidate_pivot_pos = pos
- candidate_pivot_value = value
- candidate_pivot_direction = direction
+ if 0 <= pos < n:
+ candidate_pivot_pos = pos
+ candidate_pivot_value = value
+ candidate_pivot_direction = direction
def reset_candidate_pivot():
nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
last_pivot_pos = pos
def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool:
- if pos - confirmation_window < 0 or pos + confirmation_window >= len(df):
+ next_start = pos + 1
+ next_end = min(pos + confirmation_window + 1, n)
+ prev_start = max(pos - confirmation_window, 0)
+ prev_end = pos
+ if next_start >= next_end or prev_start >= prev_end:
return False
- next_slice = slice(pos + 1, pos + confirmation_window + 1)
+
+ next_slice = slice(next_start, next_end)
next_closes = closes[next_slice]
next_highs = highs[next_slice]
next_lows = lows[next_slice]
- previous_slice = slice(pos - confirmation_window, pos)
+ previous_slice = slice(prev_start, prev_end)
previous_closes = closes[previous_slice]
previous_highs = highs[previous_slice]
previous_lows = lows[previous_slice]
+ if len(next_closes) == 0 or len(previous_closes) == 0:
+ return False
if direction == TrendDirection.DOWN:
return (
initial_low_pos = start_pos
initial_high = highs[initial_high_pos]
initial_low = lows[initial_low_pos]
- for i in range(start_pos + 1, len(df)):
+ for i in range(start_pos + 1, n):
current_high = highs[i]
current_low = lows[i]
if current_high > initial_high:
else:
return [], [], []
- for i in range(last_pivot_pos + 1, len(df)):
+ if last_pivot_pos + depth >= n:
+ return pivots_indices, pivots_values, pivots_directions
+
+ for i in range(last_pivot_pos + 1, n):
current_high = highs[i]
current_low = lows[i]
def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
- if len(df) < 2 * fractal_period + 1:
+ n = len(df)
+ if n < 2 * fractal_period + 1:
return [], []
highs = df["high"].values
lows = df["low"].values
- fractal_candidate_indices = np.arange(fractal_period, len(df) - fractal_period)
+ fractal_candidate_indices = np.arange(fractal_period, n - fractal_period)
is_fractal_high = np.ones(len(fractal_candidate_indices), dtype=bool)
is_fractal_low = np.ones(len(fractal_candidate_indices), dtype=bool)
confirmation_window: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
- if df.empty or len(df) < max(natr_period, 2 * confirmation_window + 1):
+ n = len(df)
+ if df.empty or n < max(natr_period, 2 * confirmation_window + 1):
return [], [], []
indices = df.index.tolist()
def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
- candidate_pivot_pos = pos
- candidate_pivot_value = value
- candidate_pivot_direction = direction
+ if 0 <= pos < n:
+ candidate_pivot_pos = pos
+ candidate_pivot_value = value
+ candidate_pivot_direction = direction
def reset_candidate_pivot():
nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
last_pivot_pos = pos
def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool:
- if pos - confirmation_window < 0 or pos + confirmation_window >= len(df):
+ next_start = pos + 1
+ next_end = min(pos + confirmation_window + 1, n)
+ prev_start = max(pos - confirmation_window, 0)
+ prev_end = pos
+ if next_start >= next_end or prev_start >= prev_end:
return False
- next_slice = slice(pos + 1, pos + confirmation_window + 1)
+
+ next_slice = slice(next_start, next_end)
next_closes = closes[next_slice]
next_highs = highs[next_slice]
next_lows = lows[next_slice]
- previous_slice = slice(pos - confirmation_window, pos)
+ previous_slice = slice(prev_start, prev_end)
previous_closes = closes[previous_slice]
previous_highs = highs[previous_slice]
previous_lows = lows[previous_slice]
+ if len(next_closes) == 0 or len(previous_closes) == 0:
+ return False
if direction == TrendDirection.DOWN:
return (
initial_low_pos = start_pos
initial_high = highs[initial_high_pos]
initial_low = lows[initial_low_pos]
- for i in range(start_pos + 1, len(df)):
+ for i in range(start_pos + 1, n):
current_high = highs[i]
current_low = lows[i]
if current_high > initial_high:
else:
return [], [], []
- for i in range(last_pivot_pos + 1, len(df)):
+ if last_pivot_pos + depth >= n:
+ return pivots_indices, pivots_values, pivots_directions
+
+ for i in range(last_pivot_pos + 1, n):
current_high = highs[i]
current_low = lows[i]