from enum import IntEnum
import logging
import json
-import math
import random
from statistics import median
import time
import warnings
import talib.abstract as ta
-from functools import cached_property, lru_cache
+from functools import cached_property
from typing import Any, Callable, Optional
from pathlib import Path
from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
https://github.com/sponsors/robcaulk
"""
- version = "3.7.92"
+ version = "3.7.93"
@cached_property
def _optuna_config(self) -> dict:
return volatility_quantile_cache[pos]
- def calculate_slope_confirmation_window(
- pos: int,
- min_window: int = 3,
- max_window: int = 5,
- ) -> int:
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return int(round(median([min_window, max_window])))
-
- return np.clip(
- round(min_window + (max_window - min_window) * volatility_quantile),
- min_window,
- max_window,
- ).astype(int)
-
def calculate_slopes_ok_threshold(
pos: int,
- min_threshold: float = 0.5,
- max_threshold: float = 0.75,
+ min_threshold: float = 0.65,
+ max_threshold: float = 0.85,
) -> float:
volatility_quantile = calculate_volatility_quantile(pos)
if np.isnan(volatility_quantile):
return min_threshold + (max_threshold - min_threshold) * volatility_quantile
- @lru_cache(maxsize=4096)
- def calculate_slopes_ok_min_max(slopes_ok_threshold: float) -> tuple[int, int]:
- min_slope_bound1 = math.ceil(1 / slopes_ok_threshold)
- min_slope_bound2 = math.ceil(1 / (1 - slopes_ok_threshold))
- return min(min_slope_bound1, min_slope_bound2), max(
- min_slope_bound1, min_slope_bound2
- )
-
- def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int:
- min_slopes_ok, max_slopes_ok = calculate_slopes_ok_min_max(slopes_ok_threshold)
- if min_slopes_ok == max_slopes_ok:
- return min_slopes_ok
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return int(round(median([min_slopes_ok, max_slopes_ok])))
-
- return np.clip(
- round(
- min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile
- ),
- min_slopes_ok,
- max_slopes_ok,
- ).astype(int)
-
def update_candidate_pivot(pos: int, value: float):
nonlocal candidate_pivot_pos, candidate_pivot_value
if 0 <= pos < n:
last_pivot_pos = pos
reset_candidate_pivot()
- slope_ok_cache: dict[tuple[int, int, bool, int, float], bool] = {}
+ slope_ok_cache: dict[tuple[int, int, int, float]] = {}
def get_slope_ok(
pos: int,
+ candidate_pivot_pos: int,
direction: TrendDirection,
- enable_weighting: bool,
- slope_confirmation_window: int,
min_slope: float,
) -> bool:
cache_key = (
pos,
+ candidate_pivot_pos,
direction.value,
- enable_weighting,
- slope_confirmation_window,
min_slope,
)
if cache_key in slope_ok_cache:
return slope_ok_cache[cache_key]
- next_start = pos
- next_end = min(next_start + slope_confirmation_window, n)
- next_closes = closes[next_start:next_end]
-
- if len(next_closes) < 2:
+ if pos <= candidate_pivot_pos:
slope_ok_cache[cache_key] = False
return slope_ok_cache[cache_key]
- log_next_closes = np.log(next_closes)
- log_next_closes_length = len(log_next_closes)
+ log_candidate_pivot_close = np.log(closes[candidate_pivot_pos])
+ log_current_close = np.log(closes[pos])
- polyfit_kwargs = {}
- if enable_weighting:
- polyfit_kwargs["w"] = np.linspace(0.5, 1.5, log_next_closes_length)
- log_next_slope = np.polyfit(
- range(log_next_closes_length),
- log_next_closes,
- 1,
- **polyfit_kwargs,
- )[0]
+ log_slope_close = (log_current_close - log_candidate_pivot_close) / (
+ pos - candidate_pivot_pos
+ )
if direction == TrendDirection.DOWN:
- slope_ok_cache[cache_key] = log_next_slope < -min_slope
+ slope_ok_cache[cache_key] = log_slope_close < -min_slope
elif direction == TrendDirection.UP:
- slope_ok_cache[cache_key] = log_next_slope > min_slope
+ slope_ok_cache[cache_key] = log_slope_close > min_slope
else:
slope_ok_cache[cache_key] = False
pos: int,
candidate_pivot_pos: int,
direction: TrendDirection,
- enable_weighting: bool = False,
min_slope: float = np.finfo(float).eps,
+ alpha: float = 0.05,
) -> bool:
- slope_confirmation_window = calculate_slope_confirmation_window(
- candidate_pivot_pos
- )
+ start_pos = candidate_pivot_pos + 1
+ end_pos = min(pos + 1, n)
+ n_slopes = max(0, end_pos - start_pos)
+
+ if n_slopes < 1:
+ return False
slopes_ok: list[bool] = []
- for i in range(candidate_pivot_pos + 1, min(pos + 1, n)):
+ for i in range(start_pos, end_pos):
slopes_ok.append(
get_slope_ok(
pos=i,
+ candidate_pivot_pos=candidate_pivot_pos,
direction=direction,
- enable_weighting=enable_weighting,
- slope_confirmation_window=slope_confirmation_window,
min_slope=min_slope,
)
)
slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos)
- min_slopes_ok = calculate_min_slopes_ok(
- candidate_pivot_pos, slopes_ok_threshold
+ n_slopes_ok = sum(slopes_ok)
+ binomtest = sp.stats.binomtest(
+ k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater"
)
- n_slopes_ok = len(slopes_ok)
- if n_slopes_ok > 0:
- return (
- n_slopes_ok >= min_slopes_ok
- and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold
- )
- return False
+ return (
+ binomtest.pvalue <= alpha
+ and (n_slopes_ok / n_slopes) >= slopes_ok_threshold
+ )
start_pos = 0
initial_high_pos = start_pos
from enum import IntEnum
from functools import lru_cache
-import math
from statistics import median
import numpy as np
import pandas as pd
return volatility_quantile_cache[pos]
- def calculate_slope_confirmation_window(
- pos: int,
- min_window: int = 3,
- max_window: int = 5,
- ) -> int:
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return int(round(median([min_window, max_window])))
-
- return np.clip(
- round(min_window + (max_window - min_window) * volatility_quantile),
- min_window,
- max_window,
- ).astype(int)
-
def calculate_slopes_ok_threshold(
pos: int,
- min_threshold: float = 0.5,
- max_threshold: float = 0.75,
+ min_threshold: float = 0.65,
+ max_threshold: float = 0.85,
) -> float:
volatility_quantile = calculate_volatility_quantile(pos)
if np.isnan(volatility_quantile):
return min_threshold + (max_threshold - min_threshold) * volatility_quantile
- @lru_cache(maxsize=4096)
- def calculate_slopes_ok_min_max(slopes_ok_threshold: float) -> tuple[int, int]:
- min_slope_bound1 = math.ceil(1 / slopes_ok_threshold)
- min_slope_bound2 = math.ceil(1 / (1 - slopes_ok_threshold))
- return min(min_slope_bound1, min_slope_bound2), max(
- min_slope_bound1, min_slope_bound2
- )
-
- def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int:
- min_slopes_ok, max_slopes_ok = calculate_slopes_ok_min_max(slopes_ok_threshold)
- if min_slopes_ok == max_slopes_ok:
- return min_slopes_ok
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return int(round(median([min_slopes_ok, max_slopes_ok])))
-
- return np.clip(
- round(
- min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile
- ),
- min_slopes_ok,
- max_slopes_ok,
- ).astype(int)
-
def update_candidate_pivot(pos: int, value: float):
nonlocal candidate_pivot_pos, candidate_pivot_value
if 0 <= pos < n:
last_pivot_pos = pos
reset_candidate_pivot()
- slope_ok_cache: dict[tuple[int, int, bool, int, float], bool] = {}
+ slope_ok_cache: dict[tuple[int, int, int, float]] = {}
def get_slope_ok(
pos: int,
+ candidate_pivot_pos: int,
direction: TrendDirection,
- enable_weighting: bool,
- slope_confirmation_window: int,
min_slope: float,
) -> bool:
cache_key = (
pos,
+ candidate_pivot_pos,
direction.value,
- enable_weighting,
- slope_confirmation_window,
min_slope,
)
if cache_key in slope_ok_cache:
return slope_ok_cache[cache_key]
- next_start = pos
- next_end = min(next_start + slope_confirmation_window, n)
- next_closes = closes[next_start:next_end]
-
- if len(next_closes) < 2:
+ if pos <= candidate_pivot_pos:
slope_ok_cache[cache_key] = False
return slope_ok_cache[cache_key]
- log_next_closes = np.log(next_closes)
- log_next_closes_length = len(log_next_closes)
+ log_candidate_pivot_close = np.log(closes[candidate_pivot_pos])
+ log_current_close = np.log(closes[pos])
- polyfit_kwargs = {}
- if enable_weighting:
- polyfit_kwargs["w"] = np.linspace(0.5, 1.5, log_next_closes_length)
- log_next_slope = np.polyfit(
- range(log_next_closes_length),
- log_next_closes,
- 1,
- **polyfit_kwargs,
- )[0]
+ log_slope_close = (log_current_close - log_candidate_pivot_close) / (
+ pos - candidate_pivot_pos
+ )
if direction == TrendDirection.DOWN:
- slope_ok_cache[cache_key] = log_next_slope < -min_slope
+ slope_ok_cache[cache_key] = log_slope_close < -min_slope
elif direction == TrendDirection.UP:
- slope_ok_cache[cache_key] = log_next_slope > min_slope
+ slope_ok_cache[cache_key] = log_slope_close > min_slope
else:
slope_ok_cache[cache_key] = False
pos: int,
candidate_pivot_pos: int,
direction: TrendDirection,
- enable_weighting: bool = False,
min_slope: float = np.finfo(float).eps,
+ alpha: float = 0.05,
) -> bool:
- slope_confirmation_window = calculate_slope_confirmation_window(
- candidate_pivot_pos
- )
+ start_pos = candidate_pivot_pos + 1
+ end_pos = min(pos + 1, n)
+ n_slopes = max(0, end_pos - start_pos)
+
+ if n_slopes < 1:
+ return False
slopes_ok: list[bool] = []
- for i in range(candidate_pivot_pos + 1, min(pos + 1, n)):
+ for i in range(start_pos, end_pos):
slopes_ok.append(
get_slope_ok(
pos=i,
+ candidate_pivot_pos=candidate_pivot_pos,
direction=direction,
- enable_weighting=enable_weighting,
- slope_confirmation_window=slope_confirmation_window,
min_slope=min_slope,
)
)
slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos)
- min_slopes_ok = calculate_min_slopes_ok(
- candidate_pivot_pos, slopes_ok_threshold
+ n_slopes_ok = sum(slopes_ok)
+ binomtest = sp.stats.binomtest(
+ k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater"
)
- n_slopes_ok = len(slopes_ok)
- if n_slopes_ok > 0:
- return (
- n_slopes_ok >= min_slopes_ok
- and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold
- )
- return False
+ return (
+ binomtest.pvalue <= alpha
+ and (n_slopes_ok / n_slopes) >= slopes_ok_threshold
+ )
start_pos = 0
initial_high_pos = start_pos