def calculate_min_extrema(
size: int, fit_live_predictions_candles: int, min_extrema: int = 4
) -> int:
- return int(round((size / fit_live_predictions_candles) * min_extrema))
+ return int(round(size / fit_live_predictions_candles) * min_extrema)
def train_objective(
test_ok = True
test_length = len(X_test)
if debug:
- n_test_minima: int = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size
- n_test_maxima: int = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size
+ test_extrema = y_test.get(EXTREMA_COLUMN)
+ n_test_minima: int = sp.signal.find_peaks(-test_extrema)[0].size
+ n_test_maxima: int = sp.signal.find_peaks(test_extrema)[0].size
n_test_extrema: int = n_test_minima + n_test_maxima
min_test_extrema: int = calculate_min_extrema(
test_length, fit_live_predictions_candles
)
X_test = X_test.iloc[-test_window:]
y_test = y_test.iloc[-test_window:]
- n_test_minima = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size
- n_test_maxima = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size
+ test_extrema = y_test.get(EXTREMA_COLUMN)
+ n_test_minima = sp.signal.find_peaks(-test_extrema)[0].size
+ n_test_maxima = sp.signal.find_peaks(test_extrema)[0].size
n_test_extrema = n_test_minima + n_test_maxima
min_test_extrema: int = calculate_min_extrema(
test_window, fit_live_predictions_candles
train_ok = True
train_length = len(X)
if debug:
- n_train_minima: int = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size
- n_train_maxima: int = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size
+ train_extrema = y.get(EXTREMA_COLUMN)
+ n_train_minima: int = sp.signal.find_peaks(-train_extrema)[0].size
+ n_train_maxima: int = sp.signal.find_peaks(train_extrema)[0].size
n_train_extrema: int = n_train_minima + n_train_maxima
min_train_extrema: int = calculate_min_extrema(
train_length, fit_live_predictions_candles
)
X = X.iloc[-train_window:]
y = y.iloc[-train_window:]
- n_train_minima = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size
- n_train_maxima = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size
+ train_extrema = y.get(EXTREMA_COLUMN)
+ n_train_minima = sp.signal.find_peaks(-train_extrema)[0].size
+ n_train_maxima = sp.signal.find_peaks(train_extrema)[0].size
n_train_extrema = n_train_minima + n_train_maxima
min_train_extrema: int = calculate_min_extrema(
train_window, fit_live_predictions_candles
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 6.0,
-) -> tuple[list[int], list[float], list[int], list[float]]:
+) -> tuple[list[int], list[float], list[TrendDirection], list[float]]:
n = len(df)
if df.empty or n < natr_period:
return [], [], [], []
natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
- indices = df.index.tolist()
- thresholds = natr_values * natr_ratio
+ indices: list[int] = df.index.tolist()
+ thresholds: np.ndarray = natr_values * natr_ratio
closes = df.get("close").to_numpy()
highs = df.get("high").to_numpy()
lows = df.get("low").to_numpy()
pivots_indices: list[int] = []
pivots_values: list[float] = []
- pivots_directions: list[int] = []
+ pivots_directions: list[TrendDirection] = []
pivots_scaled_natrs: list[float] = []
last_pivot_pos: int = -1
last_pivot_pos = pos
reset_candidate_pivot()
- slope_ok_cache: dict[tuple[int, int, int, float], bool] = {}
+ slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {}
def get_slope_ok(
pos: int,
cache_key = (
pos,
candidate_pivot_pos,
- direction.value,
+ direction,
min_slope,
)
min_slope: float = np.finfo(float).eps,
alpha: float = 0.05,
) -> bool:
- start_pos = candidate_pivot_pos + 1
+ start_pos = min(candidate_pivot_pos + 1, n)
end_pos = min(pos + 1, n)
n_slopes = max(0, end_pos - start_pos)
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 6.0,
-) -> tuple[list[int], list[float], list[int], list[float]]:
+) -> tuple[list[int], list[float], list[TrendDirection], list[float]]:
n = len(df)
if df.empty or n < natr_period:
return [], [], [], []
natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
- indices = df.index.tolist()
- thresholds = natr_values * natr_ratio
+ indices: list[int] = df.index.tolist()
+ thresholds: np.ndarray = natr_values * natr_ratio
closes = df.get("close").to_numpy()
highs = df.get("high").to_numpy()
lows = df.get("low").to_numpy()
pivots_indices: list[int] = []
pivots_values: list[float] = []
- pivots_directions: list[int] = []
+ pivots_directions: list[TrendDirection] = []
pivots_scaled_natrs: list[float] = []
last_pivot_pos: int = -1
last_pivot_pos = pos
reset_candidate_pivot()
- slope_ok_cache: dict[tuple[int, int, int, float], bool] = {}
+ slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {}
def get_slope_ok(
pos: int,
cache_key = (
pos,
candidate_pivot_pos,
- direction.value,
+ direction,
min_slope,
)
min_slope: float = np.finfo(float).eps,
alpha: float = 0.05,
) -> bool:
- start_pos = candidate_pivot_pos + 1
+ start_pos = min(candidate_pivot_pos + 1, n)
end_pos = min(pos + 1, n)
n_slopes = max(0, end_pos - start_pos)