https://github.com/sponsors/robcaulk
"""
- version = "3.6.8"
+ version = "3.6.7"
@cached_property
def _optuna_config(self) -> dict:
and self.data_split_parameters.get("test_size", TEST_SIZE) > 0
)
self._optuna_hp_rmse: dict[str, float] = {}
- self._optuna_period_rmse: dict[str, float] = {}
+ self._optuna_train_rmse: dict[str, float] = {}
self._optuna_hp_params: dict[str, dict] = {}
- self._optuna_period_params: dict[str, dict] = {}
+ self._optuna_train_params: dict[str, dict] = {}
for pair in self.pairs:
self._optuna_hp_rmse[pair] = -1
- self._optuna_period_rmse[pair] = -1
+ self._optuna_train_rmse[pair] = -1
self._optuna_hp_params[pair] = (
self.optuna_load_best_params(pair, "hp") or {}
)
- self._optuna_period_params[pair] = (
- self.optuna_load_best_params(pair, "period") or {}
+ self._optuna_train_params[pair] = (
+ self.optuna_load_best_params(pair, "train") or {}
)
logger.info(
f"Initialized {self.__class__.__name__} {self.freqai_info.get('regressor', 'xgboost')} regressor model version {self.version}"
def get_optuna_params(self, pair: str, namespace: str) -> dict[str, Any]:
if namespace == "hp":
params = self._optuna_hp_params.get(pair)
- elif namespace == "period":
- params = self._optuna_period_params.get(pair)
+ elif namespace == "train":
+ params = self._optuna_train_params.get(pair)
else:
raise ValueError(f"Invalid namespace: {namespace}")
return params
def set_optuna_params(self, pair: str, namespace: str, params: dict) -> None:
if namespace == "hp":
self._optuna_hp_params[pair] = params
- elif namespace == "period":
- self._optuna_period_params[pair] = params
+ elif namespace == "train":
+ self._optuna_train_params[pair] = params
else:
raise ValueError(f"Invalid namespace: {namespace}")
def get_optuna_rmse(self, pair: str, namespace: str) -> float:
if namespace == "hp":
rmse = self._optuna_hp_rmse.get(pair)
- elif namespace == "period":
- rmse = self._optuna_period_rmse.get(pair)
+ elif namespace == "train":
+ rmse = self._optuna_train_rmse.get(pair)
else:
raise ValueError(f"Invalid namespace: {namespace}")
return rmse
def set_optuna_rmse(self, pair: str, namespace: str, rmse: float) -> None:
if namespace == "hp":
self._optuna_hp_rmse[pair] = rmse
- elif namespace == "period":
- self._optuna_period_rmse[pair] = rmse
+ elif namespace == "train":
+ self._optuna_train_rmse[pair] = rmse
else:
raise ValueError(f"Invalid namespace: {namespace}")
- def get_label_period_candles(self, pair: str) -> int:
- label_period_candles = self.get_optuna_params(pair, "period").get(
- "label_period_candles"
- )
- if label_period_candles:
- return label_period_candles
- return self.ft_params.get("label_period_candles", 50)
-
def fit(self, data_dictionary: dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
User sets up the training and test data to fit their desired model here
self.optuna_optimize(
dk.pair,
- "period",
- lambda trial: period_objective(
+ "train",
+ lambda trial: train_objective(
trial,
self.freqai_info.get("regressor", "xgboost"),
X,
y_test,
test_weights,
self.freqai_info.get("fit_live_predictions_candles", 100),
- self.ft_params.get("label_period_candles", 50),
self._optuna_config.get("candles_step"),
model_training_parameters,
),
)
- optuna_period_params = self.get_optuna_params(dk.pair, "period")
- if optuna_period_params:
- train_window = optuna_period_params.get("train_period_candles")
+ optuna_train_params = self.get_optuna_params(dk.pair, "train")
+ if optuna_train_params:
+ train_window = optuna_train_params.get("train_period_candles")
X = X.iloc[-train_window:]
y = y.iloc[-train_window:]
train_weights = train_weights[-train_window:]
- test_window = optuna_period_params.get("test_period_candles")
+ test_window = optuna_train_params.get("test_period_candles")
X_test = X_test.iloc[-test_window:]
y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
dk.data["extra_returns_per_train"]["DI_value_param3"] = f[2]
dk.data["extra_returns_per_train"]["DI_cutoff"] = cutoff
- dk.data["extra_returns_per_train"]["label_period_candles"] = (
- self.get_label_period_candles(pair)
- )
dk.data["extra_returns_per_train"]["hp_rmse"] = self.get_optuna_rmse(pair, "hp")
- dk.data["extra_returns_per_train"]["period_rmse"] = self.get_optuna_rmse(
- pair, "period"
+ dk.data["extra_returns_per_train"]["train_rmse"] = self.get_optuna_rmse(
+ pair, "train"
)
def eval_set_and_weights(
def min_max_pred(self, pred_df: pd.DataFrame) -> tuple[float, float]:
temperature = float(
- self.freqai_info.get("prediction_thresholds_temperature", 140.0)
+ self.freqai_info.get("prediction_thresholds_temperature", 120.0)
)
min_pred = smoothed_min(
pred_df[EXTREMA_COLUMN],
return int(round(value / step) * step)
-def period_objective(
+def train_objective(
trial: optuna.Trial,
regressor: str,
X: pd.DataFrame,
y_test: pd.DataFrame,
test_weights: np.ndarray,
fit_live_predictions_candles: int,
- label_period_candles: int,
candles_step: int,
model_training_parameters: dict,
) -> float:
)
y_pred = model.predict(X_test)
- # TODO: implement a label_period_candles optimization compatible with ZigZag
- label_period_candles: int = trial.suggest_int(
- "label_period_candles",
- label_period_candles,
- label_period_candles,
- step=candles_step,
- )
-
- # min_label_period_candles: int = round_to_nearest(
- # max(fit_live_predictions_candles // 12, 20), candles_step
- # )
- # max_label_period_candles: int = round_to_nearest(
- # max(fit_live_predictions_candles // 4, min_label_period_candles),
- # candles_step,
- # )
- # label_period_candles: int = trial.suggest_int(
- # "label_period_candles",
- # min_label_period_candles,
- # max_label_period_candles,
- # step=candles_step,
- # )
- # if label_period_candles > test_window:
- # return float("inf")
- # label_periods_candles: int = (
- # test_window // label_period_candles
- # ) * label_period_candles
- # if label_periods_candles == 0:
- # return float("inf")
- # y_test = y_test.iloc[-label_periods_candles:]
- # test_weights = test_weights[-label_periods_candles:]
- # y_pred = y_pred[-label_periods_candles:]
-
error = sklearn.metrics.root_mean_squared_error(
y_test, y_pred, sample_weight=test_weights
)
-import json
import logging
from functools import reduce, cached_property
import datetime
INTERFACE_VERSION = 3
def version(self) -> str:
- return "3.2.15"
+ return "3.2.16"
timeframe = "5m"
"subplots": {
"accuracy": {
"hp_rmse": {"color": "#c28ce3", "type": "line"},
- "period_rmse": {"color": "#a3087a", "type": "line"},
+ "train_rmse": {"color": "#a3087a", "type": "line"},
},
"extrema": {
EXTREMA_COLUMN: {"color": "#f53580", "type": "line"},
/ "models"
/ f"{self.freqai_info.get('identifier', 'no_id_provided')}"
)
- self._period_params: dict[str, dict] = {}
- for pair in self.pairs:
- self._period_params[pair] = self.load_period_best_params(pair) or {}
- def feature_engineering_expand_all(self, dataframe, period, **kwargs):
+ def feature_engineering_expand_all(
+ self, dataframe: DataFrame, period: int, metadata: dict, **kwargs
+ ):
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period)
dataframe["%-aroonosc-period"] = ta.AROONOSC(dataframe, timeperiod=period)
dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period)
dataframe["%-natr-period"] = ta.NATR(dataframe, timeperiod=period)
return dataframe
- def feature_engineering_expand_basic(self, dataframe, **kwargs):
+ def feature_engineering_expand_basic(
+ self, dataframe: DataFrame, metadata: dict, **kwargs
+ ):
dataframe["%-close_pct_change"] = dataframe["close"].pct_change()
dataframe["%-raw_volume"] = dataframe["volume"]
dataframe["%-obv"] = ta.OBV(dataframe)
+ pair = str(metadata.get("pair"))
+ label_period_candles = self.get_label_period_candles(pair)
+ dataframe["%-natr_label_period_candles"] = ta.NATR(
+ dataframe, timeperiod=label_period_candles
+ )
+ dataframe["%-atr_label_period_candles"] = ta.ATR(
+ dataframe, timeperiod=label_period_candles
+ )
dataframe["%-ewo"] = ewo(
dataframe=dataframe,
pricemode="close",
dataframe["%-raw_high"] = dataframe["high"]
return dataframe
- def feature_engineering_standard(self, dataframe, **kwargs):
+ def feature_engineering_standard(self, dataframe: DataFrame, **kwargs):
dataframe["%-day_of_week"] = (dataframe["date"].dt.dayofweek + 1) / 7
dataframe["%-hour_of_day"] = (dataframe["date"].dt.hour + 1) / 25
return dataframe
def get_label_period_candles(self, pair: str) -> int:
- label_period_candles = self._period_params.get(pair).get("label_period_candles")
- if label_period_candles:
- return label_period_candles
return self.freqai_info["feature_parameters"].get("label_period_candles", 50)
- def set_freqai_targets(self, dataframe, metadata, **kwargs):
+ def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs):
label_period_candles = self.get_label_period_candles(str(metadata.get("pair")))
peak_indices, _, peak_directions = dynamic_zigzag(
dataframe, period=label_period_candles, ratio=self.label_natr_ratio
pair = str(metadata.get("pair"))
- self._period_params[pair]["label_period_candles"] = dataframe[
- "label_period_candles"
- ].iloc[-1]
-
label_period_candles = self.get_label_period_candles(pair)
dataframe["natr_label_period_candles"] = ta.NATR(
dataframe, timeperiod=label_period_candles
extrema_smoothing,
smoothing_methods["gaussian"],
)
-
- def load_period_best_params(self, pair: str) -> Optional[dict]:
- best_params_path = Path(
- self.models_full_path
- / f"optuna-period-best-params-{pair.split('/')[0]}.json"
- )
- if best_params_path.is_file():
- with best_params_path.open("r", encoding="utf-8") as read_file:
- return json.load(read_file)
- return None