candles_step: int,
model_training_parameters: dict,
) -> float:
- min_train_window: int = fit_live_predictions_candles * int(round(1 / test_size - 1))
- max_train_window: int = len(X)
- if max_train_window < min_train_window:
- min_train_window = max_train_window
- train_window: int = trial.suggest_int(
- "train_period_candles", min_train_window, max_train_window, step=candles_step
- )
- X = X.iloc[-train_window:]
- y = y.iloc[-train_window:]
- train_weights = train_weights[-train_window:]
-
- min_test_window: int = fit_live_predictions_candles
+ min_test_window: int = fit_live_predictions_candles * 2
+ if len(X_test) < min_test_window:
+ logger.warning(f"Insufficient test data: {len(X_test)} < {min_test_window}")
+ return np.inf
max_test_window: int = len(X_test)
- if max_test_window < min_test_window:
- min_test_window = max_test_window
test_window: int = trial.suggest_int(
"test_period_candles", min_test_window, max_test_window, step=candles_step
)
y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
+ min_train_window: int = min_test_window * int(round(1 / test_size - 1))
+ if len(X) < min_train_window:
+ logger.warning(f"Insufficient train data: {len(X)} < {min_train_window}")
+ return np.inf
+ max_train_window: int = len(X)
+ train_window: int = trial.suggest_int(
+ "train_period_candles", min_train_window, max_train_window, step=candles_step
+ )
+ X = X.iloc[-train_window:]
+ y = y.iloc[-train_window:]
+ train_weights = train_weights[-train_window:]
+
model = fit_regressor(
regressor=regressor,
X=X,
)
y_pred = model.predict(X_test)
- error = sklearn.metrics.root_mean_squared_error(
+ return sklearn.metrics.root_mean_squared_error(
y_test, y_pred, sample_weight=test_weights
)
- return error
-
def get_optuna_study_model_parameters(
trial: optuna.trial.Trial, regressor: str
)
y_pred = model.predict(X_test)
- error = sklearn.metrics.root_mean_squared_error(
+ return sklearn.metrics.root_mean_squared_error(
y_test, y_pred, sample_weight=test_weights
)
- return error
-
def calculate_quantile(values: np.ndarray, value: float) -> float:
if values.size == 0: