natr_period: int = 14,
natr_ratio: float = 1.0,
fractal_period: int = 2,
+ confirmation_window: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
- if df.empty or len(df) < 2:
+ if (
+ df.empty
+ or len(df) < max(natr_period, 2 * fractal_period + 1) + confirmation_window
+ ):
return [], [], []
fractal_high_indices, fractal_low_indices = find_fractals(df, fractal_period)
pivots_values[-1] = value
pivots_directions[-1] = direction
+ def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool:
+ if pos + confirmation_window >= len(df):
+ return False
+ next_closes = df["close"].values[pos + 1 : pos + confirmation_window + 1]
+ if direction == TrendDirection.DOWN:
+ return all(c < highs[pos] for c in next_closes)
+ elif direction == TrendDirection.UP:
+ return all(c > lows[pos] for c in next_closes)
+ return False
+
start_pos = fractal_period
initial_high_pos = start_pos
initial_low_pos = start_pos
if (
initial_move_from_high >= thresholds[i]
and is_fractal_high[initial_high_pos]
+ and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN)
):
add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
state = TrendDirection.DOWN
break
- elif initial_move_from_low >= thresholds[i] and is_fractal_low[initial_low_pos]:
+ elif (
+ initial_move_from_low >= thresholds[i]
+ and is_fractal_low[initial_low_pos]
+ and is_reversal_confirmed(initial_low_pos, TrendDirection.UP)
+ ):
add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
state = TrendDirection.UP
break
return [], [], []
for i in range(last_pivot_pos + 1, len(df)):
+ current_high = highs[i]
+ current_low = lows[i]
last_pivot_val = pivots_values[-1]
if state == TrendDirection.UP:
- if highs[i] > last_pivot_val and is_fractal_high[i]:
- update_last_pivot(i, highs[i], TrendDirection.UP)
+ if current_high > last_pivot_val:
+ update_last_pivot(i, current_high, TrendDirection.UP)
elif (
- (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i]
+ (last_pivot_val - current_low) / last_pivot_val >= thresholds[i]
and (i - last_pivot_pos) >= depth
and is_fractal_low[i]
+ and is_reversal_confirmed(i, TrendDirection.DOWN)
):
- add_pivot(i, lows[i], TrendDirection.DOWN)
+ add_pivot(i, current_low, TrendDirection.DOWN)
state = TrendDirection.DOWN
elif state == TrendDirection.DOWN:
- if lows[i] < last_pivot_val and is_fractal_low[i]:
- update_last_pivot(i, lows[i], TrendDirection.DOWN)
+ if current_low < last_pivot_val:
+ update_last_pivot(i, current_low, TrendDirection.DOWN)
elif (
- (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i]
+ (current_high - last_pivot_val) / last_pivot_val >= thresholds[i]
and (i - last_pivot_pos) >= depth
and is_fractal_high[i]
+ and is_reversal_confirmed(i, TrendDirection.UP)
):
- add_pivot(i, highs[i], TrendDirection.UP)
+ add_pivot(i, current_high, TrendDirection.UP)
state = TrendDirection.UP
return pivots_indices, pivots_values, pivots_directions
natr_period: int = 14,
natr_ratio: float = 1.0,
fractal_period: int = 2,
+ confirmation_window: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
- if df.empty or len(df) < 2:
+ if (
+ df.empty
+ or len(df) < max(natr_period, 2 * fractal_period + 1) + confirmation_window
+ ):
return [], [], []
fractal_high_indices, fractal_low_indices = find_fractals(df, fractal_period)
pivots_values[-1] = value
pivots_directions[-1] = direction
+ def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool:
+ if pos + confirmation_window >= len(df):
+ return False
+ next_closes = df["close"].values[pos + 1 : pos + confirmation_window + 1]
+ if direction == TrendDirection.DOWN:
+ return all(c < highs[pos] for c in next_closes)
+ elif direction == TrendDirection.UP:
+ return all(c > lows[pos] for c in next_closes)
+ return False
+
start_pos = fractal_period
initial_high_pos = start_pos
initial_low_pos = start_pos
if (
initial_move_from_high >= thresholds[i]
and is_fractal_high[initial_high_pos]
+ and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN)
):
add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
state = TrendDirection.DOWN
break
- elif initial_move_from_low >= thresholds[i] and is_fractal_low[initial_low_pos]:
+ elif (
+ initial_move_from_low >= thresholds[i]
+ and is_fractal_low[initial_low_pos]
+ and is_reversal_confirmed(initial_low_pos, TrendDirection.UP)
+ ):
add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
state = TrendDirection.UP
break
return [], [], []
for i in range(last_pivot_pos + 1, len(df)):
+ current_high = highs[i]
+ current_low = lows[i]
last_pivot_val = pivots_values[-1]
if state == TrendDirection.UP:
- if highs[i] > last_pivot_val and is_fractal_high[i]:
- update_last_pivot(i, highs[i], TrendDirection.UP)
+ if current_high > last_pivot_val:
+ update_last_pivot(i, current_high, TrendDirection.UP)
elif (
- (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i]
+ (last_pivot_val - current_low) / last_pivot_val >= thresholds[i]
and (i - last_pivot_pos) >= depth
and is_fractal_low[i]
+ and is_reversal_confirmed(i, TrendDirection.DOWN)
):
- add_pivot(i, lows[i], TrendDirection.DOWN)
+ add_pivot(i, current_low, TrendDirection.DOWN)
state = TrendDirection.DOWN
elif state == TrendDirection.DOWN:
- if lows[i] < last_pivot_val and is_fractal_low[i]:
- update_last_pivot(i, lows[i], TrendDirection.DOWN)
+ if current_low < last_pivot_val:
+ update_last_pivot(i, current_low, TrendDirection.DOWN)
elif (
- (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i]
+ (current_high - last_pivot_val) / last_pivot_val >= thresholds[i]
and (i - last_pivot_pos) >= depth
and is_fractal_high[i]
+ and is_reversal_confirmed(i, TrendDirection.UP)
):
- add_pivot(i, highs[i], TrendDirection.UP)
+ add_pivot(i, current_high, TrendDirection.UP)
state = TrendDirection.UP
return pivots_indices, pivots_values, pivots_directions