https://github.com/sponsors/robcaulk
"""
- version = "3.7.17"
+ version = "3.7.18"
@cached_property
def _optuna_config(self) -> dict:
label_period_candles: int,
) -> tuple[float, float]:
temperature = float(
- self.freqai_info.get("prediction_thresholds_temperature", 175.0)
+ self.freqai_info.get("prediction_thresholds_temperature", 225.0)
)
extrema = pred_df[EXTREMA_COLUMN].iloc[
-(
) -> dict:
study_model_parameters = {
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
- "min_child_weight": trial.suggest_int("min_child_weight", 1, 200),
+ "min_child_weight": trial.suggest_int("min_child_weight", 1, 100),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
- "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
- "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
+ "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 100.0, log=True),
+ "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 100.0, log=True),
}
if regressor == "xgboost":
study_model_parameters.update(
{
"max_depth": trial.suggest_int("max_depth", 3, 15),
- "gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True),
+ "gamma": trial.suggest_float("gamma", 1e-8, 10.0, log=True),
}
)
elif regressor == "lightgbm":
study_model_parameters.update(
{
- "num_leaves": trial.suggest_int("num_leaves", 2, 256),
+ "num_leaves": trial.suggest_int("num_leaves", 8, 256),
"min_split_gain": trial.suggest_float(
- "min_split_gain", 1e-8, 1.0, log=True
+ "min_split_gain", 1e-8, 10.0, log=True
),
"min_child_samples": trial.suggest_int("min_child_samples", 10, 100),
}
return error
-def dynamic_zigzag(
+def zigzag(
df: pd.DataFrame,
period: int = 14,
- natr: bool = True,
ratio: float = 1.0,
+ depth: int = 7,
) -> tuple[list[int], list[float], list[int]]:
- """
- Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR.
-
- Parameters:
- df (pd.DataFrame): OHLCV DataFrame.
- period (int): Period for ATR/NATR calculation (default: 14).
- natr (bool): Use NATR (True) or ATR (False) (default: True).
- ratio (float): ratio for dynamic threshold (default: 1.0).
-
- Returns:
- tuple: Lists of indices, extrema, and directions.
- """
- if df.empty:
+ if df.empty or len(df) < 2:
return [], [], []
- if natr:
- thresholds = ta.NATR(df, timeperiod=period)
- else:
- thresholds = ta.ATR(df, timeperiod=period)
- thresholds = thresholds.ffill().bfill() * ratio
-
- indices = []
- extrema = []
- directions = []
-
- first_high = df["high"].iloc[0]
- first_low = df["low"].iloc[0]
- first_threshold = thresholds.iloc[0]
-
- if natr:
- first_move = (first_high - first_low) / first_low
- else:
- first_move = first_high - first_low
- if first_move >= first_threshold:
- current_dir = 1
- current_extreme = first_high
- else:
- current_dir = -1
- current_extreme = first_low
- current_extreme_idx = df.index[0]
-
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
-
+ indices = df.index.tolist()
+ thresholds = (
+ (ta.NATR(df, timeperiod=period).shift(1) * ratio).fillna(method="bfill").values
+ )
+ highs = df["high"].values
+ lows = df["low"].values
+
+ pivots_indices, pivots_values, pivots_directions = [], [], []
+ state = 0 # 0=neutral, 1=up, -1=down
+ last_pivot_pos = -depth
+
+ def add_pivot(pos: int, value: float, direction: int):
+ nonlocal last_pivot_pos
+ pivots_indices.append(indices[pos])
+ pivots_values.append(value)
+ pivots_directions.append(direction)
+ last_pivot_pos = pos
+
+ def update_last_pivot(pos: int, value: float, direction: int):
+ if pivots_indices:
+ pivots_indices[-1] = indices[pos]
+ pivots_values[-1] = value
+ pivots_directions[-1] = direction
+
+ initial_high = highs[0]
+ initial_low = lows[0]
+ initial_high_pos = 0
+ initial_low_pos = 0
for i in range(1, len(df)):
- current_idx = df.index[i]
- h = df.at[current_idx, "high"]
- l = df.at[current_idx, "low"]
- threshold = thresholds.iloc[i]
-
- if current_dir == 1: # Looking for higher high
- if h > current_extreme:
- current_extreme = h
- current_extreme_idx = current_idx
- continue
- if natr:
- reversal = (current_extreme - l) / current_extreme >= threshold
- else:
- reversal = (current_extreme - l) >= threshold
- if reversal:
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
-
- current_dir = -1
- current_extreme = l
- current_extreme_idx = current_idx
-
- elif current_dir == -1: # Looking for lower low
- if l < current_extreme:
- current_extreme = l
- current_extreme_idx = current_idx
- continue
- if natr:
- reversal = (h - current_extreme) / current_extreme >= threshold
- else:
- reversal = (h - current_extreme) >= threshold
- if reversal:
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
+ if highs[i] > initial_high:
+ initial_high, initial_high_pos = highs[i], i
+ if lows[i] < initial_low:
+ initial_low, initial_low_pos = lows[i], i
+
+ if (highs[i] - initial_low) / initial_low > thresholds[i]:
+ add_pivot(initial_low_pos, initial_low, -1)
+ state = 1
+ break
+ elif (initial_high - lows[i]) / initial_high > thresholds[i]:
+ add_pivot(initial_high_pos, initial_high, 1)
+ state = -1
+ break
+ else:
+ return [], [], []
- current_dir = 1
- current_extreme = h
- current_extreme_idx = current_idx
+ for i in range(i + 1, len(df)):
+ if state == 1:
+ if highs[i] > pivots_values[-1]:
+ update_last_pivot(i, highs[i], 1)
+ elif (pivots_values[-1] - lows[i]) / pivots_values[-1] >= thresholds[
+ i
+ ] and (i - last_pivot_pos) >= depth:
+ add_pivot(i, lows[i], -1)
+ state = -1
+ elif state == -1:
+ if lows[i] < pivots_values[-1]:
+ update_last_pivot(i, lows[i], -1)
+ elif (highs[i] - pivots_values[-1]) / pivots_values[-1] >= thresholds[
+ i
+ ] and (i - last_pivot_pos) >= depth:
+ add_pivot(i, highs[i], 1)
+ state = 1
+
+ if state != 0 and (len(df) - 1 - last_pivot_pos) >= depth:
+ final_pos = len(df) - 1
+ last_pivot_val = pivots_values[-1]
+ price_move = (
+ (highs[final_pos] - last_pivot_val) / last_pivot_val
+ if state == 1
+ else (last_pivot_val - lows[final_pos]) / last_pivot_val
+ )
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
+ if (
+ price_move >= thresholds[final_pos]
+ and indices[final_pos] != pivots_indices[-1]
+ ):
+ add_pivot(
+ final_pos, highs[final_pos] if state == 1 else lows[final_pos], state
+ )
- return indices[1:], extrema[1:], directions[1:]
+ return pivots_indices, pivots_values, pivots_directions
def label_objective(
max_label_period_candles,
step=candles_step,
)
- label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.0675, 0.175)
+ label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.07, 0.3)
df = df.iloc[
-(
if df.empty:
return -float("inf"), -float("inf")
- _, peak_values, _ = dynamic_zigzag(
+ _, pivot_values, _ = zigzag(
df,
period=label_period_candles,
ratio=label_natr_ratio,
)
- if len(peak_values) < 2:
+ if len(pivot_values) < 2:
return -float("inf"), -float("inf")
scaled_natr_label_period_candles = (
ta.NATR(df, timeperiod=label_period_candles) * label_natr_ratio
)
- return scaled_natr_label_period_candles.median(), len(peak_values)
+ return scaled_natr_label_period_candles.median(), len(pivot_values)
def smoothed_max(series: pd.Series, temperature=1.0) -> float:
def zigzag(
- df: pd.DataFrame, threshold: float = 0.05
-) -> tuple[list[int], list[float], list[int]]:
- """
- Calculate the ZigZag indicator for a OHLCV DataFrame.
-
- Parameters:
- df (pd.DataFrame): OHLCV DataFrame.
- threshold (float): Percentage threshold for reversal (default 0.05 for 5%).
-
- Returns:
- tuple: Lists of indices, extrema, and directions.
- """
- if df.empty:
- return [], [], []
-
- indices = []
- extrema = []
- directions = []
-
- first_high = df["high"].iloc[0]
- first_low = df["low"].iloc[0]
-
- if (first_high - first_low) / first_low >= threshold:
- current_dir = 1
- current_extreme = first_high
- else:
- current_dir = -1
- current_extreme = first_low
- current_extreme_idx = df.index[0]
-
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
-
- for i in range(1, len(df)):
- current_idx = df.index[i]
- h = df.at[current_idx, "high"]
- l = df.at[current_idx, "low"]
-
- if current_dir == 1: # Looking for higher high
- if h > current_extreme:
- current_extreme = h
- current_extreme_idx = current_idx
- continue
- if (current_extreme - l) / current_extreme >= threshold:
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
-
- current_dir = -1
- current_extreme = l
- current_extreme_idx = current_idx
-
- elif current_dir == -1: # Looking for lower low
- if l < current_extreme:
- current_extreme = l
- current_extreme_idx = current_idx
- continue
- if (h - current_extreme) / current_extreme >= threshold:
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
-
- current_dir = 1
- current_extreme = h
- current_extreme_idx = current_idx
-
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
-
- return indices[1:], extrema[1:], directions[1:]
-
-
-def dynamic_zigzag(
df: pd.DataFrame,
period: int = 14,
- natr: bool = True,
ratio: float = 1.0,
+ depth: int = 7,
) -> tuple[list[int], list[float], list[int]]:
- """
- Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR.
-
- Parameters:
- df (pd.DataFrame): OHLCV DataFrame.
- period (int): Period for ATR/NATR calculation (default: 14).
- natr (bool): Use NATR (True) or ATR (False) (default: True).
- ratio (float): ratio for dynamic threshold (default: 1.0).
-
- Returns:
- tuple: Lists of indices, extrema, and directions.
- """
- if df.empty:
+ if df.empty or len(df) < 2:
return [], [], []
- if natr:
- thresholds = ta.NATR(df, timeperiod=period)
+ indices = df.index.tolist()
+ thresholds = (
+ (ta.NATR(df, timeperiod=period).shift(1) * ratio).fillna(method="bfill").values
+ )
+ highs = df["high"].values
+ lows = df["low"].values
+
+ pivots_indices, pivots_values, pivots_directions = [], [], []
+ state = 0 # 0=neutral, 1=up, -1=down
+ last_pivot_pos = -depth
+
+ def add_pivot(pos: int, value: float, direction: int):
+ nonlocal last_pivot_pos
+ pivots_indices.append(indices[pos])
+ pivots_values.append(value)
+ pivots_directions.append(direction)
+ last_pivot_pos = pos
+
+ def update_last_pivot(pos: int, value: float, direction: int):
+ if pivots_indices:
+ pivots_indices[-1] = indices[pos]
+ pivots_values[-1] = value
+ pivots_directions[-1] = direction
+
+ initial_high = highs[0]
+ initial_low = lows[0]
+ initial_high_pos = 0
+ initial_low_pos = 0
+ for i in range(1, len(df)):
+ if highs[i] > initial_high:
+ initial_high, initial_high_pos = highs[i], i
+ if lows[i] < initial_low:
+ initial_low, initial_low_pos = lows[i], i
+
+ if (highs[i] - initial_low) / initial_low > thresholds[i]:
+ add_pivot(initial_low_pos, initial_low, -1)
+ state = 1
+ break
+ elif (initial_high - lows[i]) / initial_high > thresholds[i]:
+ add_pivot(initial_high_pos, initial_high, 1)
+ state = -1
+ break
else:
- thresholds = ta.ATR(df, timeperiod=period)
- thresholds = thresholds.ffill().bfill() * ratio
-
- indices = []
- extrema = []
- directions = []
-
- first_high = df["high"].iloc[0]
- first_low = df["low"].iloc[0]
- first_threshold = thresholds.iloc[0]
+ return [], [], []
- if natr:
- first_move = (first_high - first_low) / first_low
- else:
- first_move = first_high - first_low
- if first_move >= first_threshold:
- current_dir = 1
- current_extreme = first_high
- else:
- current_dir = -1
- current_extreme = first_low
- current_extreme_idx = df.index[0]
+ for i in range(i + 1, len(df)):
+ if state == 1:
+ if highs[i] > pivots_values[-1]:
+ update_last_pivot(i, highs[i], 1)
+ elif (pivots_values[-1] - lows[i]) / pivots_values[-1] >= thresholds[
+ i
+ ] and (i - last_pivot_pos) >= depth:
+ add_pivot(i, lows[i], -1)
+ state = -1
+ elif state == -1:
+ if lows[i] < pivots_values[-1]:
+ update_last_pivot(i, lows[i], -1)
+ elif (highs[i] - pivots_values[-1]) / pivots_values[-1] >= thresholds[
+ i
+ ] and (i - last_pivot_pos) >= depth:
+ add_pivot(i, highs[i], 1)
+ state = 1
+
+ if state != 0 and (len(df) - 1 - last_pivot_pos) >= depth:
+ final_pos = len(df) - 1
+ last_pivot_val = pivots_values[-1]
+ price_move = (
+ (highs[final_pos] - last_pivot_val) / last_pivot_val
+ if state == 1
+ else (last_pivot_val - lows[final_pos]) / last_pivot_val
+ )
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
+ if (
+ price_move >= thresholds[final_pos]
+ and indices[final_pos] != pivots_indices[-1]
+ ):
+ add_pivot(
+ final_pos, highs[final_pos] if state == 1 else lows[final_pos], state
+ )
- for i in range(1, len(df)):
- current_idx = df.index[i]
- h = df.at[current_idx, "high"]
- l = df.at[current_idx, "low"]
- threshold = thresholds.iloc[i]
-
- if current_dir == 1: # Looking for higher high
- if h > current_extreme:
- current_extreme = h
- current_extreme_idx = current_idx
- continue
- if natr:
- reversal = (current_extreme - l) / current_extreme >= threshold
- else:
- reversal = (current_extreme - l) >= threshold
- if reversal:
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
-
- current_dir = -1
- current_extreme = l
- current_extreme_idx = current_idx
-
- elif current_dir == -1: # Looking for lower low
- if l < current_extreme:
- current_extreme = l
- current_extreme_idx = current_idx
- continue
- if natr:
- reversal = (h - current_extreme) / current_extreme >= threshold
- else:
- reversal = (h - current_extreme) >= threshold
- if reversal:
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
- last_idx = current_extreme_idx
-
- current_dir = 1
- current_extreme = h
- current_extreme_idx = current_idx
-
- if current_extreme_idx != last_idx:
- indices.append(current_extreme_idx)
- extrema.append(current_extreme)
- directions.append(current_dir)
-
- return indices[1:], extrema[1:], directions[1:]
+ return pivots_indices, pivots_values, pivots_directions