from enum import IntEnum
import logging
import json
+import math
import random
from statistics import median
import time
https://github.com/sponsors/robcaulk
"""
- version = "3.7.89"
+ version = "3.7.90"
@cached_property
def _optuna_config(self) -> dict:
return int(round(median([min_window, max_window])))
return np.clip(
- round(max_window - (max_window - min_window) * volatility_quantile),
+ round(min_window + (max_window - min_window) * volatility_quantile),
min_window,
max_window,
).astype(int)
+ def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int:
+ min_slopes_ok = max(
+ math.ceil(1 / slopes_ok_threshold),
+ math.ceil(1 / (1 - slopes_ok_threshold)),
+ 4,
+ )
+ max_slopes_ok = math.ceil(min_slopes_ok * 2)
+ volatility_quantile = calculate_volatility_quantile(pos)
+ if np.isnan(volatility_quantile):
+ return int(round(median([min_slopes_ok, max_slopes_ok])))
+
+ return np.clip(
+ round(
+ min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile
+ ),
+ min_slopes_ok,
+ max_slopes_ok,
+ ).astype(int)
+
def update_candidate_pivot(pos: int, value: float):
nonlocal candidate_pivot_pos, candidate_pivot_value
if 0 <= pos < n:
last_pivot_pos = pos
reset_candidate_pivot()
+ slopes_ok_cache: dict[tuple[int, int, int], bool] = {}
+
+ def get_slope_ok(
+ pos: int,
+ direction: TrendDirection,
+ enable_weighting: bool,
+ slope_confirmation_window: int,
+ min_slope: float,
+ ) -> bool:
+ cache_key = (
+ pos,
+ direction.value,
+ enable_weighting,
+ slope_confirmation_window,
+ min_slope,
+ )
+
+ if cache_key in slopes_ok_cache:
+ return slopes_ok_cache[cache_key]
+
+ next_start = pos
+ next_end = min(next_start + slope_confirmation_window, n)
+ next_closes = closes[next_start:next_end]
+
+ if len(next_closes) < 2:
+ slopes_ok_cache[cache_key] = False
+ return slopes_ok_cache[cache_key]
+
+ log_next_closes = np.log(next_closes)
+ log_next_closes_length = len(log_next_closes)
+
+ polyfit_kwargs = {}
+ if enable_weighting:
+ polyfit_kwargs = {"w": np.linspace(0.5, 1.5, log_next_closes_length)}
+ log_next_slope = np.polyfit(
+ range(log_next_closes_length),
+ log_next_closes,
+ 1,
+ **polyfit_kwargs,
+ )[0]
+
+ if direction == TrendDirection.DOWN:
+ slopes_ok_cache[cache_key] = log_next_slope < -min_slope
+ elif direction == TrendDirection.UP:
+ slopes_ok_cache[cache_key] = log_next_slope > min_slope
+ else:
+ slopes_ok_cache[cache_key] = False
+
+ return slopes_ok_cache[cache_key]
+
def is_pivot_confirmed(
+ pos: int,
candidate_pivot_pos: int,
direction: TrendDirection,
- min_slope: float = np.finfo(float).eps,
enable_weighting: bool = False,
+ min_slope: float = np.finfo(float).eps,
slopes_ok_threshold: float = 0.75,
) -> bool:
slope_confirmation_window = calculate_slope_confirmation_window(
)
slopes_ok: list[bool] = []
+ for i in range(candidate_pivot_pos + 1, min(pos + 1, n)):
+ slopes_ok.append(
+ get_slope_ok(
+ pos=i,
+ direction=direction,
+ enable_weighting=enable_weighting,
+ slope_confirmation_window=slope_confirmation_window,
+ min_slope=min_slope,
+ )
+ )
- for i in range(candidate_pivot_pos + 1, n):
- next_start = i
- next_end = min(next_start + slope_confirmation_window, n)
-
- next_closes = closes[next_start:next_end]
-
- if len(next_closes) >= 2:
- log_next_closes = np.log(next_closes)
- log_next_closes_length = len(log_next_closes)
- polyfit_kwargs = {}
- if enable_weighting:
- polyfit_kwargs = {
- "w": np.linspace(0.5, 1.5, log_next_closes_length)
- }
- log_next_slope = np.polyfit(
- range(log_next_closes_length),
- log_next_closes,
- 1,
- **polyfit_kwargs,
- )[0]
- if direction == TrendDirection.DOWN:
- slopes_ok.append(log_next_slope < -min_slope)
- elif direction == TrendDirection.UP:
- slopes_ok.append(log_next_slope > min_slope)
- else:
- slopes_ok.append(False)
-
- if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold:
- return True
+ min_slopes_ok = calculate_min_slopes_ok(
+ candidate_pivot_pos, slopes_ok_threshold
+ )
+ n_slopes_ok = len(slopes_ok)
+ if n_slopes_ok > 0:
+ return (
+ n_slopes_ok >= min_slopes_ok
+ and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold
+ )
return False
candidate_pivot_value - current_low
) / candidate_pivot_value >= thresholds[
candidate_pivot_pos
- ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN):
+ ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN):
add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
state = TrendDirection.DOWN
current_high - candidate_pivot_value
) / candidate_pivot_value >= thresholds[
candidate_pivot_pos
- ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP):
+ ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP):
add_pivot(
candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
)
from enum import IntEnum
from functools import lru_cache
+import math
from statistics import median
import numpy as np
import pandas as pd
return int(round(median([min_window, max_window])))
return np.clip(
- round(max_window - (max_window - min_window) * volatility_quantile),
+ round(min_window + (max_window - min_window) * volatility_quantile),
min_window,
max_window,
).astype(int)
+ def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int:
+ min_slopes_ok = max(
+ math.ceil(1 / slopes_ok_threshold),
+ math.ceil(1 / (1 - slopes_ok_threshold)),
+ 4,
+ )
+ max_slopes_ok = math.ceil(min_slopes_ok * 2)
+ volatility_quantile = calculate_volatility_quantile(pos)
+ if np.isnan(volatility_quantile):
+ return int(round(median([min_slopes_ok, max_slopes_ok])))
+
+ return np.clip(
+ round(
+ min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile
+ ),
+ min_slopes_ok,
+ max_slopes_ok,
+ ).astype(int)
+
def update_candidate_pivot(pos: int, value: float):
nonlocal candidate_pivot_pos, candidate_pivot_value
if 0 <= pos < n:
last_pivot_pos = pos
reset_candidate_pivot()
+ slopes_ok_cache: dict[tuple[int, int, int], bool] = {}
+
+ def get_slope_ok(
+ pos: int,
+ direction: TrendDirection,
+ enable_weighting: bool,
+ slope_confirmation_window: int,
+ min_slope: float,
+ ) -> bool:
+ cache_key = (
+ pos,
+ direction.value,
+ enable_weighting,
+ slope_confirmation_window,
+ min_slope,
+ )
+
+ if cache_key in slopes_ok_cache:
+ return slopes_ok_cache[cache_key]
+
+ next_start = pos
+ next_end = min(next_start + slope_confirmation_window, n)
+ next_closes = closes[next_start:next_end]
+
+ if len(next_closes) < 2:
+ slopes_ok_cache[cache_key] = False
+ return slopes_ok_cache[cache_key]
+
+ log_next_closes = np.log(next_closes)
+ log_next_closes_length = len(log_next_closes)
+
+ polyfit_kwargs = {}
+ if enable_weighting:
+ polyfit_kwargs = {"w": np.linspace(0.5, 1.5, log_next_closes_length)}
+ log_next_slope = np.polyfit(
+ range(log_next_closes_length),
+ log_next_closes,
+ 1,
+ **polyfit_kwargs,
+ )[0]
+
+ if direction == TrendDirection.DOWN:
+ slopes_ok_cache[cache_key] = log_next_slope < -min_slope
+ elif direction == TrendDirection.UP:
+ slopes_ok_cache[cache_key] = log_next_slope > min_slope
+ else:
+ slopes_ok_cache[cache_key] = False
+
+ return slopes_ok_cache[cache_key]
+
def is_pivot_confirmed(
+ pos: int,
candidate_pivot_pos: int,
direction: TrendDirection,
- min_slope: float = np.finfo(float).eps,
enable_weighting: bool = False,
+ min_slope: float = np.finfo(float).eps,
slopes_ok_threshold: float = 0.75,
) -> bool:
slope_confirmation_window = calculate_slope_confirmation_window(
)
slopes_ok: list[bool] = []
+ for i in range(candidate_pivot_pos + 1, min(pos + 1, n)):
+ slopes_ok.append(
+ get_slope_ok(
+ pos=i,
+ direction=direction,
+ enable_weighting=enable_weighting,
+ slope_confirmation_window=slope_confirmation_window,
+ min_slope=min_slope,
+ )
+ )
- for i in range(candidate_pivot_pos + 1, n):
- next_start = i
- next_end = min(next_start + slope_confirmation_window, n)
-
- next_closes = closes[next_start:next_end]
-
- if len(next_closes) >= 2:
- log_next_closes = np.log(next_closes)
- log_next_closes_length = len(log_next_closes)
- polyfit_kwargs = {}
- if enable_weighting:
- polyfit_kwargs = {
- "w": np.linspace(0.5, 1.5, log_next_closes_length)
- }
- log_next_slope = np.polyfit(
- range(log_next_closes_length),
- log_next_closes,
- 1,
- **polyfit_kwargs,
- )[0]
- if direction == TrendDirection.DOWN:
- slopes_ok.append(log_next_slope < -min_slope)
- elif direction == TrendDirection.UP:
- slopes_ok.append(log_next_slope > min_slope)
- else:
- slopes_ok.append(False)
-
- if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold:
- return True
+ min_slopes_ok = calculate_min_slopes_ok(
+ candidate_pivot_pos, slopes_ok_threshold
+ )
+ n_slopes_ok = len(slopes_ok)
+ if n_slopes_ok > 0:
+ return (
+ n_slopes_ok >= min_slopes_ok
+ and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold
+ )
return False
candidate_pivot_value - current_low
) / candidate_pivot_value >= thresholds[
candidate_pivot_pos
- ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN):
+ ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN):
add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
state = TrendDirection.DOWN
current_high - candidate_pivot_value
) / candidate_pivot_value >= thresholds[
candidate_pivot_pos
- ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP):
+ ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP):
add_pivot(
candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
)