N_TRIALS = 36
TEST_SIZE = 0.1
+EXTREMA_COLUMN = "&s-extrema"
+MINIMA_THRESHOLD_COLUMN = "&s-minima_threshold"
+MAXIMA_THRESHOLD_COLUMN = "&s-maxima_threshold"
+
warnings.simplefilter(action="ignore", category=FutureWarning)
logger = logging.getLogger(__name__)
start = time.time()
if self.__optuna_hyperopt:
- study_name = dk.pair
- storage = self.get_optuna_storage(dk)
- pruner = optuna.pruners.HyperbandPruner()
- study = optuna.create_study(
- study_name=study_name,
- sampler=optuna.samplers.TPESampler(
- multivariate=True,
- group=True,
- ),
- pruner=pruner,
- direction=optuna.study.StudyDirection.MINIMIZE,
- storage=storage,
- load_if_exists=True,
+ params = self.optuna_optimize(
+ dk, X, y, train_weights, X_test, y_test, test_weights
)
- hyperopt_failed = False
- try:
- study.optimize(
- lambda trial: objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- self.data_split_parameters.get("test_size", TEST_SIZE),
- self.freqai_info.get("fit_live_predictions_candles", 100),
- self.__optuna_config.get("candles_step", 100),
- self.model_training_parameters,
- ),
- n_trials=self.__optuna_config.get("n_trials", N_TRIALS),
- n_jobs=self.__optuna_config.get("n_jobs", 1),
- timeout=self.__optuna_config.get("timeout", 3600),
- gc_after_trial=True,
- )
- except Exception as e:
- logger.error(
- f"Optuna hyperopt failed: {e}. Please consider using a concurrency friendly storage backend like 'file' or lower the number of jobs."
- )
- hyperopt_failed = True
- if not hyperopt_failed:
+ if params:
if dk.pair not in self.__optuna_hp:
self.__optuna_hp[dk.pair] = {}
-
- self.__optuna_hp[dk.pair]["rmse"] = study.best_value
- self.__optuna_hp[dk.pair].update(study.best_params)
- # log params
- for key, value in self.__optuna_hp[dk.pair].items():
- logger.info(f"Optuna hyperopt | {key:>20s} : {value}")
+ self.__optuna_hp[dk.pair] = params
train_window = self.__optuna_hp[dk.pair].get("train_period_candles")
X = X.tail(train_window)
)
if not warmed_up:
- dk.data["extra_returns_per_train"]["&s-maxima_threshold"] = 2
- dk.data["extra_returns_per_train"]["&s-minima_threshold"] = -2
+ dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = 2
+ dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = -2
else:
if self.__optuna_hyperopt:
label_period_candles = self.__optuna_hp.get(pair, {}).get(
num_candles,
label_period_candles,
)
- dk.data["extra_returns_per_train"]["&s-minima_threshold"] = min_pred
- dk.data["extra_returns_per_train"]["&s-maxima_threshold"] = max_pred
+ dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = min_pred
+ dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = max_pred
dk.data["labels_mean"], dk.data["labels_std"] = {}, {}
for label in dk.label_list + dk.unique_class_list:
pred_df, fit_live_predictions_candles, label_period_candles
)
+ def optuna_optimize(
+ self,
+ dk: FreqaiDataKitchen,
+ X,
+ y,
+ train_weights,
+ X_test,
+ y_test,
+ test_weights,
+ ) -> dict | None:
+ study_name = dk.pair
+ storage = self.get_optuna_storage(dk)
+ pruner = optuna.pruners.HyperbandPruner()
+ study = optuna.create_study(
+ study_name=study_name,
+ sampler=optuna.samplers.TPESampler(
+ multivariate=True,
+ group=True,
+ ),
+ pruner=pruner,
+ direction=optuna.study.StudyDirection.MINIMIZE,
+ storage=storage,
+ load_if_exists=True,
+ )
+ hyperopt_failed = False
+ try:
+ study.optimize(
+ lambda trial: objective(
+ trial,
+ X,
+ y,
+ train_weights,
+ X_test,
+ y_test,
+ test_weights,
+ self.data_split_parameters.get("test_size", TEST_SIZE),
+ self.freqai_info.get("fit_live_predictions_candles", 100),
+ self.__optuna_config.get("candles_step", 100),
+ self.model_training_parameters,
+ ),
+ n_trials=self.__optuna_config.get("n_trials", N_TRIALS),
+ n_jobs=self.__optuna_config.get("n_jobs", 1),
+ timeout=self.__optuna_config.get("timeout", 3600),
+ gc_after_trial=True,
+ )
+ except Exception as e:
+ logger.error(
+ f"Optuna hyperopt failed: {e}. Please consider using a concurrency friendly storage backend like 'file' or lower the number of jobs."
+ )
+ hyperopt_failed = True
+
+ if not hyperopt_failed:
+ params = {"rmse": study.best_value, **study.best_params}
+ # log params
+ for key, value in params.items():
+ logger.info(f"Optuna hyperopt | {key:>20s} : {value}")
+ return params
+
+ return None
+
def log_sum_exp_min_max_pred(
pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
label_period_frequency: int = int(
fit_live_predictions_candles / label_period_candles
)
- extrema = pred_df.tail(label_period_candles * label_period_frequency)["&s-extrema"]
+ extrema = pred_df.tail(label_period_candles * label_period_frequency)[
+ EXTREMA_COLUMN
+ ]
beta = 10.0
min_pred = smooth_min(extrema, beta=beta)
max_pred = smooth_max(extrema, beta=beta)
)
min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
- return min_pred["&s-extrema"], max_pred["&s-extrema"]
+ return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
def median_min_max_pred(
)
min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
- return min_pred["&s-extrema"], max_pred["&s-extrema"]
+ return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
def objective(
N_TRIALS = 36
TEST_SIZE = 0.1
+EXTREMA_COLUMN = "&s-extrema"
+MINIMA_THRESHOLD_COLUMN = "&s-minima_threshold"
+MAXIMA_THRESHOLD_COLUMN = "&s-maxima_threshold"
+
warnings.simplefilter(action="ignore", category=FutureWarning)
logger = logging.getLogger(__name__)
start = time.time()
if self.__optuna_hyperopt:
- study_name = dk.pair
- storage = self.get_optuna_storage(dk)
- pruner = optuna.pruners.HyperbandPruner()
- study = optuna.create_study(
- study_name=study_name,
- sampler=optuna.samplers.TPESampler(
- multivariate=True,
- group=True,
- ),
- pruner=pruner,
- direction=optuna.study.StudyDirection.MINIMIZE,
- storage=storage,
- load_if_exists=True,
+ params = self.optuna_optimize(
+ dk, X, y, train_weights, X_test, y_test, test_weights
)
- hyperopt_failed = False
- try:
- study.optimize(
- lambda trial: objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- self.data_split_parameters.get("test_size", TEST_SIZE),
- self.freqai_info.get("fit_live_predictions_candles", 100),
- self.__optuna_config.get("candles_step", 100),
- self.model_training_parameters,
- ),
- n_trials=self.__optuna_config.get("n_trials", N_TRIALS),
- n_jobs=self.__optuna_config.get("n_jobs", 1),
- timeout=self.__optuna_config.get("timeout", 3600),
- gc_after_trial=True,
- )
- except Exception as e:
- logger.error(
- f"Optuna hyperopt failed: {e}. Please consider using a concurrency friendly storage backend like 'file' or lower the number of jobs."
- )
- hyperopt_failed = True
- if not hyperopt_failed:
+ if params:
if dk.pair not in self.__optuna_hp:
self.__optuna_hp[dk.pair] = {}
-
- self.__optuna_hp[dk.pair]["rmse"] = study.best_value
- self.__optuna_hp[dk.pair].update(study.best_params)
- # log params
- for key, value in self.__optuna_hp[dk.pair].items():
- logger.info(f"Optuna hyperopt | {key:>20s} : {value}")
+ self.__optuna_hp[dk.pair] = params
train_window = self.__optuna_hp[dk.pair].get("train_period_candles")
X = X.tail(train_window)
)
if not warmed_up:
- dk.data["extra_returns_per_train"]["&s-maxima_threshold"] = 2
- dk.data["extra_returns_per_train"]["&s-minima_threshold"] = -2
+ dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = 2
+ dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = -2
else:
if self.__optuna_hyperopt:
label_period_candles = self.__optuna_hp.get(pair, {}).get(
num_candles,
label_period_candles,
)
- dk.data["extra_returns_per_train"]["&s-minima_threshold"] = min_pred
- dk.data["extra_returns_per_train"]["&s-maxima_threshold"] = max_pred
+ dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = min_pred
+ dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = max_pred
dk.data["labels_mean"], dk.data["labels_std"] = {}, {}
for label in dk.label_list + dk.unique_class_list:
pred_df, fit_live_predictions_candles, label_period_candles
)
+ def optuna_optimize(
+ self,
+ dk: FreqaiDataKitchen,
+ X,
+ y,
+ train_weights,
+ X_test,
+ y_test,
+ test_weights,
+ ) -> dict | None:
+ study_name = dk.pair
+ storage = self.get_optuna_storage(dk)
+ pruner = optuna.pruners.HyperbandPruner()
+ study = optuna.create_study(
+ study_name=study_name,
+ sampler=optuna.samplers.TPESampler(
+ multivariate=True,
+ group=True,
+ ),
+ pruner=pruner,
+ direction=optuna.study.StudyDirection.MINIMIZE,
+ storage=storage,
+ load_if_exists=True,
+ )
+ hyperopt_failed = False
+ try:
+ study.optimize(
+ lambda trial: objective(
+ trial,
+ X,
+ y,
+ train_weights,
+ X_test,
+ y_test,
+ test_weights,
+ self.data_split_parameters.get("test_size", TEST_SIZE),
+ self.freqai_info.get("fit_live_predictions_candles", 100),
+ self.__optuna_config.get("candles_step", 100),
+ self.model_training_parameters,
+ ),
+ n_trials=self.__optuna_config.get("n_trials", N_TRIALS),
+ n_jobs=self.__optuna_config.get("n_jobs", 1),
+ timeout=self.__optuna_config.get("timeout", 3600),
+ gc_after_trial=True,
+ )
+ except Exception as e:
+ logger.error(
+ f"Optuna hyperopt failed: {e}. Please consider using a concurrency friendly storage backend like 'file' or lower the number of jobs."
+ )
+ hyperopt_failed = True
+
+ if not hyperopt_failed:
+ params = {"rmse": study.best_value, **study.best_params}
+ # log params
+ for key, value in params.items():
+ logger.info(f"Optuna hyperopt | {key:>20s} : {value}")
+ return params
+
+ return None
+
def log_sum_exp_min_max_pred(
pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
label_period_frequency: int = int(
fit_live_predictions_candles / label_period_candles
)
- extrema = pred_df.tail(label_period_candles * label_period_frequency)["&s-extrema"]
+ extrema = pred_df.tail(label_period_candles * label_period_frequency)[
+ EXTREMA_COLUMN
+ ]
beta = 10.0
min_pred = smooth_min(extrema, beta=beta)
max_pred = smooth_max(extrema, beta=beta)
)
min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
- return min_pred["&s-extrema"], max_pred["&s-extrema"]
+ return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
def median_min_max_pred(
)
min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
- return min_pred["&s-extrema"], max_pred["&s-extrema"]
+ return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
def objective(
logger = logging.getLogger(__name__)
+EXTREMA_COLUMN = "&s-extrema"
+MINIMA_THRESHOLD_COLUMN = "&s-minima_threshold"
+MAXIMA_THRESHOLD_COLUMN = "&s-maxima_threshold"
+
class QuickAdapterV3(IStrategy):
"""
"subplots": {
"accuracy": {"rmse": {"color": "#c28ce3", "type": "line"}},
"extrema": {
- "&s-extrema": {"color": "#f53580", "type": "line"},
- "&s-minima_threshold": {"color": "#4ae747", "type": "line"},
- "&s-maxima_threshold": {"color": "#e6be0b", "type": "line"},
+ EXTREMA_COLUMN: {"color": "#f53580", "type": "line"},
+ MINIMA_THRESHOLD_COLUMN: {"color": "#4ae747", "type": "line"},
+ MAXIMA_THRESHOLD_COLUMN: {"color": "#e6be0b", "type": "line"},
},
"min_max": {
"maxima": {"color": "#0dd6de", "type": "bar"},
self.freqai_info["feature_parameters"]["label_period_candles"],
)
)
- dataframe["&s-extrema"] = 0
+ dataframe[EXTREMA_COLUMN] = 0
min_peaks, _ = find_peaks(
-dataframe["low"].values,
distance=label_period_candles,
distance=label_period_candles,
)
for mp in min_peaks:
- dataframe.at[mp, "&s-extrema"] = -1
+ dataframe.at[mp, EXTREMA_COLUMN] = -1
for mp in max_peaks:
- dataframe.at[mp, "&s-extrema"] = 1
- dataframe["minima"] = np.where(dataframe["&s-extrema"] == -1, -1, 0)
- dataframe["maxima"] = np.where(dataframe["&s-extrema"] == 1, 1, 0)
- dataframe["&s-extrema"] = (
- dataframe["&s-extrema"]
+ dataframe.at[mp, EXTREMA_COLUMN] = 1
+ dataframe["minima"] = np.where(dataframe[EXTREMA_COLUMN] == -1, -1, 0)
+ dataframe["maxima"] = np.where(dataframe[EXTREMA_COLUMN] == 1, 1, 0)
+ dataframe[EXTREMA_COLUMN] = (
+ dataframe[EXTREMA_COLUMN]
.rolling(window=6, win_type="gaussian", center=True)
.mean(std=0.5)
)
1,
)
- dataframe["minima_threshold"] = dataframe["&s-minima_threshold"]
- dataframe["maxima_threshold"] = dataframe["&s-maxima_threshold"]
+ dataframe["minima_threshold"] = dataframe[MINIMA_THRESHOLD_COLUMN]
+ dataframe["maxima_threshold"] = dataframe[MAXIMA_THRESHOLD_COLUMN]
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
enter_long_conditions = [
df["do_predict"] == 1,
df["DI_catch"] == 1,
- df["&s-extrema"] < df["minima_threshold"],
+ df[EXTREMA_COLUMN] < df["minima_threshold"],
]
if enter_long_conditions:
enter_short_conditions = [
df["do_predict"] == 1,
df["DI_catch"] == 1,
- df["&s-extrema"] > df["maxima_threshold"],
+ df[EXTREMA_COLUMN] > df["maxima_threshold"],
]
if enter_short_conditions:
return "outlier_detected"
if (
- last_candle["&s-extrema"] < last_candle["minima_threshold"]
+ last_candle[EXTREMA_COLUMN] < last_candle["minima_threshold"]
and entry_tag == "short"
):
return "minima_detected_short"
if (
- last_candle["&s-extrema"] > last_candle["maxima_threshold"]
+ last_candle[EXTREMA_COLUMN] > last_candle["maxima_threshold"]
and entry_tag == "long"
):
return "maxima_detected_long"