train_window = self.__optuna_period_params[dk.pair].get(
"train_period_candles"
)
- X = X.tail(train_window)
- y = y.tail(train_window)
+ X = X.iloc[-train_window:]
+ y = y.iloc[-train_window:]
train_weights = train_weights[-train_window:]
test_window = self.__optuna_period_params[dk.pair].get(
"test_period_candles"
)
- X_test = X_test.tail(test_window)
- y_test = y_test.tail(test_window)
+ X_test = X_test.iloc[-test_window:]
+ y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
# FIXME: find a better way to propagate optuna computed params to strategy
train_window = trial.suggest_int(
"train_period_candles", min_train_window, max_train_window, step=candles_step
)
- X = X.tail(train_window)
- y = y.tail(train_window)
+ X = X.iloc[-train_window:]
+ y = y.iloc[-train_window:]
train_weights = train_weights[-train_window:]
min_test_window: int = int(min_train_window * test_size)
test_window = trial.suggest_int(
"test_period_candles", min_test_window, max_test_window, step=candles_step
)
- X_test = X_test.tail(test_window)
- y_test = y_test.tail(test_window)
+ min_label_period_candles: int = 10
+ max_label_period_candles: int = min(
+ max(fit_live_predictions_candles // 6, min_label_period_candles), test_window
+ )
+ label_period_candles = trial.suggest_int(
+ "label_period_candles",
+ min_label_period_candles,
+ max_label_period_candles,
+ step=candles_step,
+ )
+ test_window = (test_window // label_period_candles) * label_period_candles
+ if test_window < min_label_period_candles:
+ raise optuna.TrialPruned(
+ f"Adjusted test window {test_window} is too small for minimum label period {min_label_period_candles}."
+ )
+ X_test = X_test.iloc[-test_window:]
+ y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
# Fit the model
)
y_pred = model.predict(X_test)
- min_label_period_candles: int = 10
- max_label_period_candles: int = max(
- fit_live_predictions_candles // 6, min_label_period_candles
- )
- label_period_candles = trial.suggest_int(
- "label_period_candles",
- min_label_period_candles,
- max_label_period_candles,
- step=candles_step,
- )
+ n_windows = len(y_test) // label_period_candles
y_test_windows = [
- y_test[i : i + label_period_candles]
- for i in range(0, len(y_test) - label_period_candles + 1)
+ y_test.iloc[i : i + label_period_candles].to_numpy()
+ for i in np.arange(0, label_period_candles * n_windows, label_period_candles)
+ ]
+ test_weights_windows = [
+ test_weights[i : i + label_period_candles]
+ for i in np.arange(0, label_period_candles * n_windows, label_period_candles)
]
y_pred_windows = [
y_pred[i : i + label_period_candles]
- for i in range(0, len(y_pred) - label_period_candles + 1)
+ for i in np.arange(0, label_period_candles * n_windows, label_period_candles)
]
- y_test = np.array([window.mean() for window in y_test_windows])
- y_pred = np.array([window.mean() for window in y_pred_windows])
+ y_test = np.array([window for window in y_test_windows])
+ test_weights = np.concatenate(np.array([window for window in test_weights_windows]))
+ y_pred = np.array([window for window in y_pred_windows])
- error = sklearn.metrics.root_mean_squared_error(y_test, y_pred)
+ error = sklearn.metrics.root_mean_squared_error(
+ y_test, y_pred, sample_weight=test_weights
+ )
return error
)
y_pred = model.predict(X_test)
- error = sklearn.metrics.root_mean_squared_error(y_test, y_pred)
+ error = sklearn.metrics.root_mean_squared_error(
+ y_test, y_pred, sample_weight=test_weights
+ )
return error
train_window = self.__optuna_period_params[dk.pair].get(
"train_period_candles"
)
- X = X.tail(train_window)
- y = y.tail(train_window)
+ X = X.iloc[-train_window:]
+ y = y.iloc[-train_window:]
train_weights = train_weights[-train_window:]
test_window = self.__optuna_period_params[dk.pair].get(
"test_period_candles"
)
- X_test = X_test.tail(test_window)
- y_test = y_test.tail(test_window)
+ X_test = X_test.iloc[-test_window:]
+ y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
# FIXME: find a better way to propagate optuna computed params to strategy
train_window = trial.suggest_int(
"train_period_candles", min_train_window, max_train_window, step=candles_step
)
- X = X.tail(train_window)
- y = y.tail(train_window)
+ X = X.iloc[-train_window:]
+ y = y.iloc[-train_window:]
train_weights = train_weights[-train_window:]
min_test_window: int = int(min_train_window * test_size)
test_window = trial.suggest_int(
"test_period_candles", min_test_window, max_test_window, step=candles_step
)
- X_test = X_test.tail(test_window)
- y_test = y_test.tail(test_window)
+ min_label_period_candles: int = 10
+ max_label_period_candles: int = min(
+ max(fit_live_predictions_candles // 6, min_label_period_candles), test_window
+ )
+ label_period_candles = trial.suggest_int(
+ "label_period_candles",
+ min_label_period_candles,
+ max_label_period_candles,
+ step=candles_step,
+ )
+ test_window = (test_window // label_period_candles) * label_period_candles
+ if test_window < min_label_period_candles:
+ raise optuna.TrialPruned(
+ f"Adjusted test window {test_window} is too small for minimum label period {min_label_period_candles}."
+ )
+ X_test = X_test.iloc[-test_window:]
+ y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
# Fit the model
)
y_pred = model.predict(X_test)
- min_label_period_candles: int = 10
- max_label_period_candles: int = max(
- fit_live_predictions_candles // 6, min_label_period_candles
- )
- label_period_candles = trial.suggest_int(
- "label_period_candles",
- min_label_period_candles,
- max_label_period_candles,
- step=candles_step,
- )
+ n_windows = len(y_test) // label_period_candles
y_test_windows = [
- y_test[i : i + label_period_candles]
- for i in range(0, len(y_test) - label_period_candles + 1)
+ y_test.iloc[i : i + label_period_candles].to_numpy()
+ for i in np.arange(0, label_period_candles * n_windows, label_period_candles)
+ ]
+ test_weights_windows = [
+ test_weights[i : i + label_period_candles]
+ for i in np.arange(0, label_period_candles * n_windows, label_period_candles)
]
y_pred_windows = [
y_pred[i : i + label_period_candles]
- for i in range(0, len(y_pred) - label_period_candles + 1)
+ for i in np.arange(0, label_period_candles * n_windows, label_period_candles)
]
- y_test = np.array([window.mean() for window in y_test_windows])
- y_pred = np.array([window.mean() for window in y_pred_windows])
+ y_test = np.array([window for window in y_test_windows])
+ test_weights = np.concatenate(np.array([window for window in test_weights_windows]))
+ y_pred = np.array([window for window in y_pred_windows])
- error = sklearn.metrics.root_mean_squared_error(y_test, y_pred)
+ error = sklearn.metrics.root_mean_squared_error(
+ y_test, y_pred, sample_weight=test_weights
+ )
return error
)
y_pred = model.predict(X_test)
- error = sklearn.metrics.root_mean_squared_error(y_test, y_pred)
+ error = sklearn.metrics.root_mean_squared_error(
+ y_test, y_pred, sample_weight=test_weights
+ )
return error