https://github.com/sponsors/robcaulk
"""
- version = "3.7.27"
+ version = "3.7.28"
@cached_property
def _optuna_config(self) -> dict:
return error
-class TrendDirection(IntEnum):
- NEUTRAL = 0
- UP = 1
- DOWN = -1
-
-
def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
if len(df) < 2 * fractal_period + 1:
return [], []
)
+class TrendDirection(IntEnum):
+ NEUTRAL = 0
+ UP = 1
+ DOWN = -1
+
+
def zigzag(
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 1.0,
- fractal_period: int = 2,
confirmation_window: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
- if (
- df.empty
- or len(df) < max(natr_period, 2 * fractal_period + 1) + confirmation_window
- ):
+ if df.empty or len(df) < natr_period + confirmation_window:
return [], [], []
- fractal_high_indices, fractal_low_indices = find_fractals(df, fractal_period)
- is_fractal_high = np.zeros(len(df), dtype=bool)
- is_fractal_high[fractal_high_indices] = True
- is_fractal_low = np.zeros(len(df), dtype=bool)
- is_fractal_low[fractal_low_indices] = True
-
indices = df.index.tolist()
thresholds = (
(ta.NATR(df, timeperiod=natr_period) * natr_ratio).fillna(method="bfill").values
return all(c > lows[pos] for c in next_closes)
return False
- start_pos = fractal_period
+ start_pos = 0
initial_high_pos = start_pos
initial_low_pos = start_pos
initial_high = highs[initial_high_pos]
initial_move_from_high = (initial_high - lows[i]) / initial_high
initial_move_from_low = (highs[i] - initial_low) / initial_low
- if (
- initial_move_from_high >= thresholds[i]
- and is_fractal_high[initial_high_pos]
- and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN)
+ if initial_move_from_high >= thresholds[i] and is_reversal_confirmed(
+ initial_high_pos, TrendDirection.DOWN
):
add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
state = TrendDirection.DOWN
break
- elif (
- initial_move_from_low >= thresholds[i]
- and is_fractal_low[initial_low_pos]
- and is_reversal_confirmed(initial_low_pos, TrendDirection.UP)
+ elif initial_move_from_low >= thresholds[i] and is_reversal_confirmed(
+ initial_low_pos, TrendDirection.UP
):
add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
state = TrendDirection.UP
elif (
(last_pivot_val - current_low) / last_pivot_val >= thresholds[i]
and (i - last_pivot_pos) >= depth
- and is_fractal_low[i]
and is_reversal_confirmed(i, TrendDirection.DOWN)
):
add_pivot(i, current_low, TrendDirection.DOWN)
elif (
(current_high - last_pivot_val) / last_pivot_val >= thresholds[i]
and (i - last_pivot_pos) >= depth
- and is_fractal_high[i]
and is_reversal_confirmed(i, TrendDirection.UP)
):
add_pivot(i, current_high, TrendDirection.UP)
return jaw, teeth, lips
-class TrendDirection(IntEnum):
- NEUTRAL = 0
- UP = 1
- DOWN = -1
-
-
def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
if len(df) < 2 * fractal_period + 1:
return [], []
)
+class TrendDirection(IntEnum):
+ NEUTRAL = 0
+ UP = 1
+ DOWN = -1
+
+
def zigzag(
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 1.0,
- fractal_period: int = 2,
confirmation_window: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
- if (
- df.empty
- or len(df) < max(natr_period, 2 * fractal_period + 1) + confirmation_window
- ):
+ if df.empty or len(df) < natr_period + confirmation_window:
return [], [], []
- fractal_high_indices, fractal_low_indices = find_fractals(df, fractal_period)
- is_fractal_high = np.zeros(len(df), dtype=bool)
- is_fractal_high[fractal_high_indices] = True
- is_fractal_low = np.zeros(len(df), dtype=bool)
- is_fractal_low[fractal_low_indices] = True
-
indices = df.index.tolist()
thresholds = (
(ta.NATR(df, timeperiod=natr_period) * natr_ratio).fillna(method="bfill").values
return all(c > lows[pos] for c in next_closes)
return False
- start_pos = fractal_period
+ start_pos = 0
initial_high_pos = start_pos
initial_low_pos = start_pos
initial_high = highs[initial_high_pos]
initial_move_from_high = (initial_high - lows[i]) / initial_high
initial_move_from_low = (highs[i] - initial_low) / initial_low
- if (
- initial_move_from_high >= thresholds[i]
- and is_fractal_high[initial_high_pos]
- and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN)
+ if initial_move_from_high >= thresholds[i] and is_reversal_confirmed(
+ initial_high_pos, TrendDirection.DOWN
):
add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
state = TrendDirection.DOWN
break
- elif (
- initial_move_from_low >= thresholds[i]
- and is_fractal_low[initial_low_pos]
- and is_reversal_confirmed(initial_low_pos, TrendDirection.UP)
+ elif initial_move_from_low >= thresholds[i] and is_reversal_confirmed(
+ initial_low_pos, TrendDirection.UP
):
add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
state = TrendDirection.UP
elif (
(last_pivot_val - current_low) / last_pivot_val >= thresholds[i]
and (i - last_pivot_pos) >= depth
- and is_fractal_low[i]
and is_reversal_confirmed(i, TrendDirection.DOWN)
):
add_pivot(i, current_low, TrendDirection.DOWN)
elif (
(current_high - last_pivot_val) / last_pivot_val >= thresholds[i]
and (i - last_pivot_pos) >= depth
- and is_fractal_high[i]
and is_reversal_confirmed(i, TrendDirection.UP)
):
add_pivot(i, current_high, TrendDirection.UP)