https://github.com/sponsors/robcaulk
"""
- version = "3.7.22"
+ version = "3.7.23"
@cached_property
def _optuna_config(self) -> dict:
DOWN = -1
+def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
+ highs = df["high"].values
+ lows = df["low"].values
+ fractal_highs = []
+ fractal_lows = []
+ for i in range(fractal_period, max(fractal_period, len(df) - fractal_period)):
+ valid_high = True
+ valid_low = True
+ for j in range(1, fractal_period + 1):
+ if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]:
+ valid_high = False
+ if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]:
+ valid_low = False
+ if not valid_high and not valid_low:
+ break
+ if valid_high:
+ fractal_highs.append(i)
+ if valid_low:
+ fractal_lows.append(i)
+ return fractal_highs, fractal_lows
+
+
def zigzag(
df: pd.DataFrame,
- period: int = 14,
- ratio: float = 1.0,
+ natr_period: int = 14,
+ natr_ratio: float = 1.0,
+ fractal_period: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
if df.empty or len(df) < 2:
return [], [], []
+ fractal_highs, fractal_lows = find_fractals(df, fractal_period)
+ fractal_high_set = set(fractal_highs)
+ fractal_low_set = set(fractal_lows)
+ is_fractal_high = [i in fractal_high_set for i in range(len(df))]
+ is_fractal_low = [i in fractal_low_set for i in range(len(df))]
+
indices = df.index.tolist()
- thresholds = (ta.NATR(df, timeperiod=period) * ratio).fillna(method="bfill").values
+ thresholds = (
+ (ta.NATR(df, timeperiod=natr_period) * natr_ratio).fillna(method="bfill").values
+ )
highs = df["high"].values
lows = df["low"].values
pivots_values[-1] = value
pivots_directions[-1] = direction
- initial_high_pos = 0
- initial_low_pos = 0
+ start_pos = fractal_period
+ initial_high_pos = start_pos
+ initial_low_pos = start_pos
initial_high = highs[initial_high_pos]
initial_low = lows[initial_low_pos]
- for i in range(1, len(df)):
+ for i in range(start_pos + 1, len(df)):
if highs[i] > initial_high:
initial_high, initial_high_pos = highs[i], i
if lows[i] < initial_low:
initial_move_from_high = (initial_high - lows[i]) / initial_high
initial_move_from_low = (highs[i] - initial_low) / initial_low
- if initial_move_from_high >= thresholds[i]:
+ if (
+ initial_move_from_high >= thresholds[i]
+ and is_fractal_high[initial_high_pos]
+ ):
add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
state = TrendDirection.DOWN
break
- elif initial_move_from_low >= thresholds[i]:
+ elif initial_move_from_low >= thresholds[i] and is_fractal_low[initial_low_pos]:
add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
state = TrendDirection.UP
break
for i in range(i + 1, len(df)):
last_pivot_val = pivots_values[-1]
if state == TrendDirection.UP:
- if highs[i] > last_pivot_val:
+ if highs[i] > last_pivot_val and is_fractal_high[i]:
update_last_pivot(i, highs[i], TrendDirection.UP)
- elif (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] and (
- i - last_pivot_pos
- ) >= depth:
+ elif (
+ (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i]
+ and (i - last_pivot_pos) >= depth
+ and is_fractal_low[i]
+ ):
add_pivot(i, lows[i], TrendDirection.DOWN)
state = TrendDirection.DOWN
elif state == TrendDirection.DOWN:
- if lows[i] < last_pivot_val:
+ if lows[i] < last_pivot_val and is_fractal_low[i]:
update_last_pivot(i, lows[i], TrendDirection.DOWN)
- elif (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] and (
- i - last_pivot_pos
- ) >= depth:
+ elif (
+ (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i]
+ and (i - last_pivot_pos) >= depth
+ and is_fractal_high[i]
+ ):
add_pivot(i, highs[i], TrendDirection.UP)
state = TrendDirection.UP
final_pos = len(df) - 1
- if state != TrendDirection.NEUTRAL and (final_pos - last_pivot_pos) >= depth:
- last_pivot_val = pivots_values[-1]
- price_move = (
- (highs[final_pos] - last_pivot_val) / last_pivot_val
- if state == TrendDirection.UP
- else (last_pivot_val - lows[final_pos]) / last_pivot_val
+ last_pivot_val = pivots_values[-1]
+ final_price_move = (
+ (highs[final_pos] - last_pivot_val) / last_pivot_val
+ if state == TrendDirection.UP
+ else (last_pivot_val - lows[final_pos]) / last_pivot_val
+ )
+ if (
+ state != TrendDirection.NEUTRAL
+ and (final_pos - last_pivot_pos) >= depth
+ and final_price_move >= thresholds[final_pos]
+ and indices[final_pos] != pivots_indices[-1]
+ and (
+ (state == TrendDirection.UP and is_fractal_high[final_pos])
+ or (state == TrendDirection.DOWN and is_fractal_low[final_pos])
+ )
+ ):
+ add_pivot(
+ final_pos,
+ highs[final_pos] if state == TrendDirection.UP else lows[final_pos],
+ state,
)
-
- if (
- price_move >= thresholds[final_pos]
- and indices[final_pos] != pivots_indices[-1]
- ):
- add_pivot(
- final_pos,
- highs[final_pos] if state == TrendDirection.UP else lows[final_pos],
- state,
- )
return pivots_indices, pivots_values, pivots_directions
_, pivots_values, _ = zigzag(
df,
- period=label_period_candles,
- ratio=label_natr_ratio,
+ natr_period=label_period_candles,
+ natr_ratio=label_natr_ratio,
)
if len(pivots_values) < 2:
INTERFACE_VERSION = 3
def version(self) -> str:
- return "3.3.16"
+ return "3.3.17"
timeframe = "5m"
pair = str(metadata.get("pair"))
pivots_indices, _, pivots_directions = zigzag(
dataframe,
- period=self.get_label_period_candles(pair),
- ratio=self.get_label_natr_ratio(pair),
+ natr_period=self.get_label_period_candles(pair),
+ natr_ratio=self.get_label_natr_ratio(pair),
)
dataframe[EXTREMA_COLUMN] = 0
for pivot_idx, pivot_dir in zip(pivots_indices, pivots_directions):
DOWN = -1
+def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
+ highs = df["high"].values
+ lows = df["low"].values
+ fractal_highs = []
+ fractal_lows = []
+ for i in range(fractal_period, max(fractal_period, len(df) - fractal_period)):
+ valid_high = True
+ valid_low = True
+ for j in range(1, fractal_period + 1):
+ if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]:
+ valid_high = False
+ if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]:
+ valid_low = False
+ if not valid_high and not valid_low:
+ break
+ if valid_high:
+ fractal_highs.append(i)
+ if valid_low:
+ fractal_lows.append(i)
+ return fractal_highs, fractal_lows
+
+
def zigzag(
df: pd.DataFrame,
- period: int = 14,
- ratio: float = 1.0,
+ natr_period: int = 14,
+ natr_ratio: float = 1.0,
+ fractal_period: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
if df.empty or len(df) < 2:
return [], [], []
+ fractal_highs, fractal_lows = find_fractals(df, fractal_period)
+ fractal_high_set = set(fractal_highs)
+ fractal_low_set = set(fractal_lows)
+ is_fractal_high = [i in fractal_high_set for i in range(len(df))]
+ is_fractal_low = [i in fractal_low_set for i in range(len(df))]
+
indices = df.index.tolist()
- thresholds = (ta.NATR(df, timeperiod=period) * ratio).fillna(method="bfill").values
+ thresholds = (
+ (ta.NATR(df, timeperiod=natr_period) * natr_ratio).fillna(method="bfill").values
+ )
highs = df["high"].values
lows = df["low"].values
pivots_values[-1] = value
pivots_directions[-1] = direction
- initial_high_pos = 0
- initial_low_pos = 0
+ start_pos = fractal_period
+ initial_high_pos = start_pos
+ initial_low_pos = start_pos
initial_high = highs[initial_high_pos]
initial_low = lows[initial_low_pos]
- for i in range(1, len(df)):
+ for i in range(start_pos + 1, len(df)):
if highs[i] > initial_high:
initial_high, initial_high_pos = highs[i], i
if lows[i] < initial_low:
initial_move_from_high = (initial_high - lows[i]) / initial_high
initial_move_from_low = (highs[i] - initial_low) / initial_low
- if initial_move_from_high >= thresholds[i]:
+ if (
+ initial_move_from_high >= thresholds[i]
+ and is_fractal_high[initial_high_pos]
+ ):
add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
state = TrendDirection.DOWN
break
- elif initial_move_from_low >= thresholds[i]:
+ elif initial_move_from_low >= thresholds[i] and is_fractal_low[initial_low_pos]:
add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
state = TrendDirection.UP
break
for i in range(i + 1, len(df)):
last_pivot_val = pivots_values[-1]
if state == TrendDirection.UP:
- if highs[i] > last_pivot_val:
+ if highs[i] > last_pivot_val and is_fractal_high[i]:
update_last_pivot(i, highs[i], TrendDirection.UP)
- elif (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] and (
- i - last_pivot_pos
- ) >= depth:
+ elif (
+ (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i]
+ and (i - last_pivot_pos) >= depth
+ and is_fractal_low[i]
+ ):
add_pivot(i, lows[i], TrendDirection.DOWN)
state = TrendDirection.DOWN
elif state == TrendDirection.DOWN:
- if lows[i] < last_pivot_val:
+ if lows[i] < last_pivot_val and is_fractal_low[i]:
update_last_pivot(i, lows[i], TrendDirection.DOWN)
- elif (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] and (
- i - last_pivot_pos
- ) >= depth:
+ elif (
+ (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i]
+ and (i - last_pivot_pos) >= depth
+ and is_fractal_high[i]
+ ):
add_pivot(i, highs[i], TrendDirection.UP)
state = TrendDirection.UP
final_pos = len(df) - 1
- if state != TrendDirection.NEUTRAL and (final_pos - last_pivot_pos) >= depth:
- last_pivot_val = pivots_values[-1]
- price_move = (
- (highs[final_pos] - last_pivot_val) / last_pivot_val
- if state == TrendDirection.UP
- else (last_pivot_val - lows[final_pos]) / last_pivot_val
+ last_pivot_val = pivots_values[-1]
+ final_price_move = (
+ (highs[final_pos] - last_pivot_val) / last_pivot_val
+ if state == TrendDirection.UP
+ else (last_pivot_val - lows[final_pos]) / last_pivot_val
+ )
+ if (
+ state != TrendDirection.NEUTRAL
+ and (final_pos - last_pivot_pos) >= depth
+ and final_price_move >= thresholds[final_pos]
+ and indices[final_pos] != pivots_indices[-1]
+ and (
+ (state == TrendDirection.UP and is_fractal_high[final_pos])
+ or (state == TrendDirection.DOWN and is_fractal_low[final_pos])
+ )
+ ):
+ add_pivot(
+ final_pos,
+ highs[final_pos] if state == TrendDirection.UP else lows[final_pos],
+ state,
)
-
- if (
- price_move >= thresholds[final_pos]
- and indices[final_pos] != pivots_indices[-1]
- ):
- add_pivot(
- final_pos,
- highs[final_pos] if state == TrendDirection.UP else lows[final_pos],
- state,
- )
return pivots_indices, pivots_values, pivots_directions