def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
+ if len(df) < 2 * fractal_period + 1:
+ return [], []
+
highs = df["high"].values
lows = df["low"].values
- fractal_highs = []
- fractal_lows = []
- for i in range(fractal_period, len(df) - fractal_period):
- valid_high = True
- valid_low = True
- for j in range(1, fractal_period + 1):
- if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]:
- valid_high = False
- if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]:
- valid_low = False
- if not valid_high and not valid_low:
- break
- if valid_high:
- fractal_highs.append(i)
- if valid_low:
- fractal_lows.append(i)
- return fractal_highs, fractal_lows
+
+ fractal_candidate_indices = np.arange(fractal_period, len(df) - fractal_period)
+
+ is_fractal_high = np.ones(len(fractal_candidate_indices), dtype=bool)
+ is_fractal_low = np.ones(len(fractal_candidate_indices), dtype=bool)
+
+ for i in range(1, fractal_period + 1):
+ is_fractal_high &= (
+ highs[fractal_candidate_indices] > highs[fractal_candidate_indices - i]
+ ) & (highs[fractal_candidate_indices] > highs[fractal_candidate_indices + i])
+
+ is_fractal_low &= (
+ lows[fractal_candidate_indices] < lows[fractal_candidate_indices - i]
+ ) & (lows[fractal_candidate_indices] < lows[fractal_candidate_indices + i])
+
+ if not np.any(is_fractal_high) and not np.any(is_fractal_low):
+ break
+
+ return (
+ fractal_candidate_indices[is_fractal_high].tolist(),
+ fractal_candidate_indices[is_fractal_low].tolist(),
+ )
def zigzag(
if df.empty or len(df) < 2:
return [], [], []
- fractal_highs, fractal_lows = find_fractals(df, fractal_period)
- fractal_highs_set = set(fractal_highs)
- fractal_lows_set = set(fractal_lows)
- is_fractal_high = [i in fractal_highs_set for i in range(len(df))]
- is_fractal_low = [i in fractal_lows_set for i in range(len(df))]
+ fractal_high_indices, fractal_low_indices = find_fractals(df, fractal_period)
+ is_fractal_high = np.zeros(len(df), dtype=bool)
+ is_fractal_high[fractal_high_indices] = True
+ is_fractal_low = np.zeros(len(df), dtype=bool)
+ is_fractal_low[fractal_low_indices] = True
indices = df.index.tolist()
thresholds = (
else:
return [], [], []
- for i in range(i + 1, len(df)):
+ for i in range(last_pivot_pos + 1, len(df)):
last_pivot_val = pivots_values[-1]
if state == TrendDirection.UP:
if highs[i] > last_pivot_val and is_fractal_high[i]:
def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
+ if len(df) < 2 * fractal_period + 1:
+ return [], []
+
highs = df["high"].values
lows = df["low"].values
- fractal_highs = []
- fractal_lows = []
- for i in range(fractal_period, len(df) - fractal_period):
- valid_high = True
- valid_low = True
- for j in range(1, fractal_period + 1):
- if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]:
- valid_high = False
- if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]:
- valid_low = False
- if not valid_high and not valid_low:
- break
- if valid_high:
- fractal_highs.append(i)
- if valid_low:
- fractal_lows.append(i)
- return fractal_highs, fractal_lows
+
+ fractal_candidate_indices = np.arange(fractal_period, len(df) - fractal_period)
+
+ is_fractal_high = np.ones(len(fractal_candidate_indices), dtype=bool)
+ is_fractal_low = np.ones(len(fractal_candidate_indices), dtype=bool)
+
+ for i in range(1, fractal_period + 1):
+ is_fractal_high &= (
+ highs[fractal_candidate_indices] > highs[fractal_candidate_indices - i]
+ ) & (highs[fractal_candidate_indices] > highs[fractal_candidate_indices + i])
+
+ is_fractal_low &= (
+ lows[fractal_candidate_indices] < lows[fractal_candidate_indices - i]
+ ) & (lows[fractal_candidate_indices] < lows[fractal_candidate_indices + i])
+
+ if not np.any(is_fractal_high) and not np.any(is_fractal_low):
+ break
+
+ return (
+ fractal_candidate_indices[is_fractal_high].tolist(),
+ fractal_candidate_indices[is_fractal_low].tolist(),
+ )
def zigzag(
if df.empty or len(df) < 2:
return [], [], []
- fractal_highs, fractal_lows = find_fractals(df, fractal_period)
- fractal_highs_set = set(fractal_highs)
- fractal_lows_set = set(fractal_lows)
- is_fractal_high = [i in fractal_highs_set for i in range(len(df))]
- is_fractal_low = [i in fractal_lows_set for i in range(len(df))]
+ fractal_high_indices, fractal_low_indices = find_fractals(df, fractal_period)
+ is_fractal_high = np.zeros(len(df), dtype=bool)
+ is_fractal_high[fractal_high_indices] = True
+ is_fractal_low = np.zeros(len(df), dtype=bool)
+ is_fractal_low[fractal_low_indices] = True
indices = df.index.tolist()
thresholds = (
else:
return [], [], []
- for i in range(i + 1, len(df)):
+ for i in range(last_pivot_pos + 1, len(df)):
last_pivot_val = pivots_values[-1]
if state == TrendDirection.UP:
if highs[i] > last_pivot_val and is_fractal_high[i]: