DOWN = -1
-zigzag_cache: dict[str, tuple[list[int], list[float], list[int]]] = {}
+zigzag_cache: dict[str, tuple[list[int], list[float], list[int]], list[float]] = {}
def zigzag_cached(
natr_period: int = 14,
natr_ratio: float = 6.0,
cache_size: int = 2048,
-) -> tuple[list[int], list[float], list[int]]:
+) -> tuple[list[int], list[float], list[int], list[float]]:
def hash_df(df: pd.DataFrame) -> str:
hasher = hashlib.sha256()
if cache_key in zigzag_cache:
return zigzag_cache[cache_key]
- pivots_indices, pivots_values, pivots_directions = zigzag(
+ pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs = zigzag(
df, natr_period=natr_period, natr_ratio=natr_ratio
)
if len(zigzag_cache) >= cache_size:
pivots_indices,
pivots_values,
pivots_directions,
+ pivots_scaled_natrs,
)
- return pivots_indices, pivots_values, pivots_directions
+ return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs
def zigzag(
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 6.0,
-) -> tuple[list[int], list[float], list[int]]:
+) -> tuple[list[int], list[float], list[int], list[float]]:
min_confirmation_window: int = 3
max_confirmation_window: int = 6
n = len(df)
state: TrendDirection = TrendDirection.NEUTRAL
depth = -1
+ pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs = (
+ [],
+ [],
+ [],
+ [],
+ )
last_pivot_pos = -1
- pivots_indices, pivots_values, pivots_directions = [], [], []
- candidate_pivot_pos = -1
- candidate_pivot_value = np.nan
+ candidate_pivot_pos: int = -1
+ candidate_pivot_value: float = np.nan
candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL
+ candidate_pivot_scaled_natr: float = np.nan
volatility_quantile_cache: dict[int, float] = {}
return min_strength + (max_strength - min_strength) * volatility_quantile
def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
- nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
+ nonlocal \
+ candidate_pivot_pos, \
+ candidate_pivot_value, \
+ candidate_pivot_direction, \
+ candidate_pivot_scaled_natr
if 0 <= pos < n:
candidate_pivot_pos = pos
candidate_pivot_value = value
candidate_pivot_direction = direction
+ candidate_pivot_scaled_natr = thresholds[pos]
def reset_candidate_pivot():
- nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
+ nonlocal \
+ candidate_pivot_pos, \
+ candidate_pivot_value, \
+ candidate_pivot_direction, \
+ candidate_pivot_scaled_natr
candidate_pivot_pos = -1
candidate_pivot_value = np.nan
candidate_pivot_direction = TrendDirection.NEUTRAL
+ candidate_pivot_scaled_natr = np.nan
def add_pivot(pos: int, value: float, direction: TrendDirection):
nonlocal last_pivot_pos, depth
pivots_indices.append(indices[pos])
pivots_values.append(value)
pivots_directions.append(direction)
+ pivots_scaled_natrs.append(thresholds[pos])
last_pivot_pos = pos
depth = calculate_depth(pos)
reset_candidate_pivot()
state = TrendDirection.UP
break
else:
- return [], [], []
+ return [], [], [], []
if n - last_pivot_pos - 1 < depth:
- return pivots_indices, pivots_values, pivots_directions
+ return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs
for i in range(last_pivot_pos + 1, n):
current_high = highs[i]
)
state = TrendDirection.UP
- return pivots_indices, pivots_values, pivots_directions
+ return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs
def label_objective(
if df.empty:
return -np.inf, -np.inf
- _, pivots_values, _ = zigzag_cached(
+ _, pivots_values, _, pivots_scaled_natrs = zigzag_cached(
df,
natr_period=label_period_candles,
natr_ratio=label_natr_ratio,
)
- scaled_natr_label_period_candles = (
- ta.NATR(df, timeperiod=label_period_candles).bfill() / 100.0
- ) * label_natr_ratio
-
- return scaled_natr_label_period_candles.median(), len(pivots_values)
+ return np.median(pivots_scaled_natrs), len(pivots_values)
def smoothed_max(series: pd.Series, temperature=1.0) -> float:
DOWN = -1
-zigzag_cache: dict[str, tuple[list[int], list[float], list[int]]] = {}
+zigzag_cache: dict[str, tuple[list[int], list[float], list[int]], list[float]] = {}
def zigzag_cached(
natr_period: int = 14,
natr_ratio: float = 6.0,
cache_size: int = 2048,
-) -> tuple[list[int], list[float], list[int]]:
+) -> tuple[list[int], list[float], list[int], list[float]]:
def hash_df(df: pd.DataFrame) -> str:
hasher = hashlib.sha256()
if cache_key in zigzag_cache:
return zigzag_cache[cache_key]
- pivots_indices, pivots_values, pivots_directions = zigzag(
+ pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs = zigzag(
df, natr_period=natr_period, natr_ratio=natr_ratio
)
if len(zigzag_cache) >= cache_size:
pivots_indices,
pivots_values,
pivots_directions,
+ pivots_scaled_natrs,
)
- return pivots_indices, pivots_values, pivots_directions
+ return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs
def zigzag(
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 6.0,
-) -> tuple[list[int], list[float], list[int]]:
+) -> tuple[list[int], list[float], list[int], list[float]]:
min_confirmation_window: int = 3
max_confirmation_window: int = 6
n = len(df)
state: TrendDirection = TrendDirection.NEUTRAL
depth = -1
+ pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs = (
+ [],
+ [],
+ [],
+ [],
+ )
last_pivot_pos = -1
- pivots_indices, pivots_values, pivots_directions = [], [], []
- candidate_pivot_pos = -1
- candidate_pivot_value = np.nan
+ candidate_pivot_pos: int = -1
+ candidate_pivot_value: float = np.nan
candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL
+ candidate_pivot_scaled_natr: float = np.nan
volatility_quantile_cache: dict[int, float] = {}
return min_strength + (max_strength - min_strength) * volatility_quantile
def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
- nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
+ nonlocal \
+ candidate_pivot_pos, \
+ candidate_pivot_value, \
+ candidate_pivot_direction, \
+ candidate_pivot_scaled_natr
if 0 <= pos < n:
candidate_pivot_pos = pos
candidate_pivot_value = value
candidate_pivot_direction = direction
+ candidate_pivot_scaled_natr = thresholds[pos]
def reset_candidate_pivot():
- nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
+ nonlocal \
+ candidate_pivot_pos, \
+ candidate_pivot_value, \
+ candidate_pivot_direction, \
+ candidate_pivot_scaled_natr
candidate_pivot_pos = -1
candidate_pivot_value = np.nan
candidate_pivot_direction = TrendDirection.NEUTRAL
+ candidate_pivot_scaled_natr = np.nan
def add_pivot(pos: int, value: float, direction: TrendDirection):
nonlocal last_pivot_pos, depth
pivots_indices.append(indices[pos])
pivots_values.append(value)
pivots_directions.append(direction)
+ pivots_scaled_natrs.append(thresholds[pos])
last_pivot_pos = pos
depth = calculate_depth(pos)
reset_candidate_pivot()
state = TrendDirection.UP
break
else:
- return [], [], []
+ return [], [], [], []
if n - last_pivot_pos - 1 < depth:
- return pivots_indices, pivots_values, pivots_directions
+ return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs
for i in range(last_pivot_pos + 1, n):
current_high = highs[i]
)
state = TrendDirection.UP
- return pivots_indices, pivots_values, pivots_directions
+ return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs