"&s-minima_threshold": -2,
"&s-maxima_threshold": 2,
"label_period_candles": 100,
- "rmse": 0
+ "hp_rmse": 0,
+ "period_rmse": 0
},
"feature_parameters": {
"include_corr_pairlist": [
import logging
+import json
from typing import Any, Dict
+from pathlib import Path
from lightgbm import LGBMRegressor
import time
and self.__optuna_config.get("enabled", False)
and self.data_split_parameters.get("test_size", TEST_SIZE) > 0
)
+ self.__optuna_hp_rmse: dict[str, float | None] = {}
+ self.__optuna_period_rmse: dict[str, float | None] = {}
self.__optuna_hp_params: dict[str, dict] = {}
self.__optuna_period_params: dict[str, dict] = {}
start = time.time()
if self.__optuna_hyperopt:
- optuna_hp_params = self.optuna_hp_optimize(
+ optuna_hp_params, optuna_hp_rmse = self.optuna_hp_optimize(
dk, X, y, train_weights, X_test, y_test, test_weights
)
if optuna_hp_params:
**model_training_parameters,
**self.__optuna_hp_params[dk.pair],
}
+ if optuna_hp_rmse:
+ if dk.pair not in self.__optuna_hp_rmse:
+ self.__optuna_hp_rmse[dk.pair] = None
+ self.__optuna_hp_rmse[dk.pair] = optuna_hp_rmse
- optuna_period_params = self.optuna_period_optimize(
+ optuna_period_params, optuna_period_rmse = self.optuna_period_optimize(
dk,
X,
y,
if dk.pair not in self.__optuna_period_params:
self.__optuna_period_params[dk.pair] = {}
self.__optuna_period_params[dk.pair] = optuna_period_params
+ if optuna_period_rmse:
+ if dk.pair not in self.__optuna_period_rmse:
+ self.__optuna_period_rmse[dk.pair] = None
+ self.__optuna_period_rmse[dk.pair] = optuna_period_rmse
if self.__optuna_period_params.get(dk.pair):
train_window = self.__optuna_period_params[dk.pair].get(
"label_period_candles", self.ft_params["label_period_candles"]
)
)
- dk.data["extra_returns_per_train"]["rmse"] = self.__optuna_period_params.get(
+ dk.data["extra_returns_per_train"]["hp_rmse"] = self.__optuna_hp_rmse.get(
pair, {}
- ).get("rmse", 0)
+ )
+ dk.data["extra_returns_per_train"]["period_rmse"] = (
+ self.__optuna_period_rmse.get(pair, {})
+ )
def eval_set_and_weights(self, X_test, y_test, test_weights):
if self.data_split_parameters.get("test_size", TEST_SIZE) == 0:
X_test,
y_test,
test_weights,
- ) -> dict | None:
+ ) -> tuple[dict | None, float | None]:
study_name = f"hp-{dk.pair}"
storage = self.optuna_storage(dk)
pruner = optuna.pruners.HyperbandPruner()
)
except Exception as e:
logger.error(f"Optuna hp hyperopt failed: {e}", exc_info=True)
- return None
+ return None, None
time_spent = time.time() - start
logger.info(f"Optuna hp hyperopt done ({time_spent:.2f} secs)")
+ hp_best_params_path = Path(
+ dk.full_path / f"{dk.pair.split('/')[0]}_optuna_hp_best_params.json"
+ )
+ with hp_best_params_path.open("w", encoding="utf-8") as write_file:
+ json.dump(study.best_params, write_file, indent=4)
params = study.best_params
# log params
for key, value in {"rmse": study.best_value, **params}.items():
logger.info(f"Optuna hp hyperopt | {key:>20s} : {value}")
- return params
+ return params, study.best_value
def optuna_period_optimize(
self,
y_test,
test_weights,
model_training_parameters,
- ) -> dict | None:
+ ) -> tuple[dict | None, float | None]:
study_name = f"period-{dk.pair}"
storage = self.optuna_storage(dk)
pruner = optuna.pruners.HyperbandPruner()
if self.optuna_study_has_best_params(previous_study):
study.enqueue_trial(previous_study.best_params)
elif self.__optuna_period_params.get(dk.pair):
- previous_best_params = self.__optuna_period_params[dk.pair].copy()
- del previous_best_params["rmse"]
- study.enqueue_trial(previous_best_params)
+ study.enqueue_trial(self.__optuna_period_params[dk.pair])
start = time.time()
try:
study.optimize(
)
except Exception as e:
logger.error(f"Optuna period hyperopt failed: {e}", exc_info=True)
- return None
+ return None, None
time_spent = time.time() - start
logger.info(f"Optuna period hyperopt done ({time_spent:.2f} secs)")
- params = {"rmse": study.best_value, **study.best_params}
+ period_best_params_path = Path(
+ dk.full_path / f"{dk.pair.split('/')[0]}_optuna_period_best_params.json"
+ )
+ with period_best_params_path.open("w", encoding="utf-8") as write_file:
+ json.dump(study.best_params, write_file, indent=4)
+ params = study.best_params
# log params
- for key, value in params.items():
+ for key, value in {"rmse": study.best_value, **params}.items():
logger.info(f"Optuna period hyperopt | {key:>20s} : {value}")
- return params
+ return params, study.best_value
def optuna_study_load_and_cleanup(
self, study_name: str, storage
import logging
+import json
from typing import Any, Dict
+from pathlib import Path
from xgboost import XGBRegressor
import time
and self.__optuna_config.get("enabled", False)
and self.data_split_parameters.get("test_size", TEST_SIZE) > 0
)
+ self.__optuna_hp_rmse: dict[str, float | None] = {}
+ self.__optuna_period_rmse: dict[str, float | None] = {}
self.__optuna_hp_params: dict[str, dict] = {}
self.__optuna_period_params: dict[str, dict] = {}
start = time.time()
if self.__optuna_hyperopt:
- optuna_hp_params = self.optuna_hp_optimize(
+ optuna_hp_params, optuna_hp_rmse = self.optuna_hp_optimize(
dk, X, y, train_weights, X_test, y_test, test_weights
)
if optuna_hp_params:
**model_training_parameters,
**self.__optuna_hp_params[dk.pair],
}
+ if optuna_hp_rmse:
+ if dk.pair not in self.__optuna_hp_rmse:
+ self.__optuna_hp_rmse[dk.pair] = None
+ self.__optuna_hp_rmse[dk.pair] = optuna_hp_rmse
- optuna_period_params = self.optuna_period_optimize(
+ optuna_period_params, optuna_period_rmse = self.optuna_period_optimize(
dk,
X,
y,
if dk.pair not in self.__optuna_period_params:
self.__optuna_period_params[dk.pair] = {}
self.__optuna_period_params[dk.pair] = optuna_period_params
+ if optuna_period_rmse:
+ if dk.pair not in self.__optuna_period_rmse:
+ self.__optuna_period_rmse[dk.pair] = None
+ self.__optuna_period_rmse[dk.pair] = optuna_period_rmse
if self.__optuna_period_params.get(dk.pair):
train_window = self.__optuna_period_params[dk.pair].get(
"label_period_candles", self.ft_params["label_period_candles"]
)
)
- dk.data["extra_returns_per_train"]["rmse"] = self.__optuna_period_params.get(
+ dk.data["extra_returns_per_train"]["hp_rmse"] = self.__optuna_hp_rmse.get(
pair, {}
- ).get("rmse", 0)
+ )
+ dk.data["extra_returns_per_train"]["period_rmse"] = (
+ self.__optuna_period_rmse.get(pair, {})
+ )
def eval_set_and_weights(self, X_test, y_test, test_weights):
if self.data_split_parameters.get("test_size", TEST_SIZE) == 0:
X_test,
y_test,
test_weights,
- ) -> dict | None:
+ ) -> tuple[dict | None, float | None]:
study_name = f"hp-{dk.pair}"
storage = self.optuna_storage(dk)
pruner = optuna.pruners.HyperbandPruner()
)
except Exception as e:
logger.error(f"Optuna hp hyperopt failed: {e}", exc_info=True)
- return None
+ return None, None
time_spent = time.time() - start
logger.info(f"Optuna hp hyperopt done ({time_spent:.2f} secs)")
+ hp_best_params_path = Path(
+ dk.full_path / f"{dk.pair.split('/')[0]}_optuna_hp_best_params.json"
+ )
+ with hp_best_params_path.open("w", encoding="utf-8") as write_file:
+ json.dump(study.best_params, write_file, indent=4)
params = study.best_params
# log params
for key, value in {"rmse": study.best_value, **params}.items():
logger.info(f"Optuna hp hyperopt | {key:>20s} : {value}")
- return params
+ return params, study.best_value
def optuna_period_optimize(
self,
y_test,
test_weights,
model_training_parameters,
- ) -> dict | None:
+ ) -> tuple[dict | None, float | None]:
study_name = f"period-{dk.pair}"
storage = self.optuna_storage(dk)
pruner = optuna.pruners.HyperbandPruner()
if self.optuna_study_has_best_params(previous_study):
study.enqueue_trial(previous_study.best_params)
elif self.__optuna_period_params.get(dk.pair):
- previous_best_params = self.__optuna_period_params[dk.pair].copy()
- del previous_best_params["rmse"]
- study.enqueue_trial(previous_best_params)
+ study.enqueue_trial(self.__optuna_period_params[dk.pair])
start = time.time()
try:
study.optimize(
)
except Exception as e:
logger.error(f"Optuna period hyperopt failed: {e}", exc_info=True)
- return None
+ return None, None
time_spent = time.time() - start
logger.info(f"Optuna period hyperopt done ({time_spent:.2f} secs)")
- params = {"rmse": study.best_value, **study.best_params}
+ period_best_params_path = Path(
+ dk.full_path / f"{dk.pair.split('/')[0]}_optuna_period_best_params.json"
+ )
+ with period_best_params_path.open("w", encoding="utf-8") as write_file:
+ json.dump(study.best_params, write_file, indent=4)
+ params = study.best_params
# log params
- for key, value in params.items():
+ for key, value in {"rmse": study.best_value, **params}.items():
logger.info(f"Optuna period hyperopt | {key:>20s} : {value}")
- return params
+ return params, study.best_value
def optuna_study_load_and_cleanup(
self, study_name: str, storage
return {
"main_plot": {},
"subplots": {
- "accuracy": {"rmse": {"color": "#c28ce3", "type": "line"}},
+ "accuracy": {
+ "hp_rmse": {"color": "#c28ce3", "type": "line"},
+ "period_rmse": {"color": "#a3087a", "type": "line"},
+ },
"extrema": {
EXTREMA_COLUMN: {"color": "#f53580", "type": "line"},
MINIMA_THRESHOLD_COLUMN: {"color": "#4ae747", "type": "line"},