import optuna
import sklearn
import warnings
+import talib.abstract as ta
from functools import cached_property
from typing import Any, Callable, Optional
https://github.com/sponsors/robcaulk
"""
- version = "3.6.9"
+ version = "3.7.0"
@cached_property
def _optuna_config(self) -> dict:
)
self._optuna_hp_rmse: dict[str, float] = {}
self._optuna_train_rmse: dict[str, float] = {}
+ self._optuna_label_values: dict[str, dict] = {}
self._optuna_hp_params: dict[str, dict] = {}
self._optuna_train_params: dict[str, dict] = {}
+ self._optuna_label_params: dict[str, dict] = {}
for pair in self.pairs:
self._optuna_hp_rmse[pair] = -1
self._optuna_train_rmse[pair] = -1
+ self._optuna_label_values[pair] = [-1, -1]
self._optuna_hp_params[pair] = (
self.optuna_load_best_params(pair, "hp") or {}
)
self._optuna_train_params[pair] = (
self.optuna_load_best_params(pair, "train") or {}
)
+ self._optuna_label_params[pair] = self.optuna_load_best_params(
+ pair, "label"
+ ) or {
+ "label_period_candles": self.ft_params.get("label_period_candles", 50),
+ "label_natr_ratio": self.ft_params.get("label_natr_ratio", 0.075),
+ }
logger.info(
f"Initialized {self.__class__.__name__} {self.freqai_info.get('regressor', 'xgboost')} regressor model version {self.version}"
)
params = self._optuna_hp_params.get(pair)
elif namespace == "train":
params = self._optuna_train_params.get(pair)
+ elif namespace == "label":
+ params = self._optuna_label_params.get(pair)
else:
raise ValueError(f"Invalid namespace: {namespace}")
return params
self._optuna_hp_params[pair] = params
elif namespace == "train":
self._optuna_train_params[pair] = params
+ elif namespace == "label":
+ self._optuna_label_params[pair] = params
else:
raise ValueError(f"Invalid namespace: {namespace}")
else:
raise ValueError(f"Invalid namespace: {namespace}")
+ def get_optuna_values(self, pair: str, namespace: str) -> list:
+ if namespace == "label":
+ values = self._optuna_label_values.get(pair)
+ else:
+ raise ValueError(f"Invalid namespace: {namespace}")
+ return values
+
+ def set_optuna_values(self, pair: str, namespace: str, values: list) -> None:
+ if namespace == "label":
+ self._optuna_label_values[pair] = values
+ else:
+ raise ValueError(f"Invalid namespace: {namespace}")
+
def fit(self, data_dictionary: dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
User sets up the training and test data to fit their desired model here
start = time.time()
if self._optuna_hyperopt:
self.optuna_optimize(
- dk.pair,
- "hp",
- lambda trial: hp_objective(
+ pair=dk.pair,
+ namespace="hp",
+ objective=lambda trial: hp_objective(
trial,
self.freqai_info.get("regressor", "xgboost"),
X,
test_weights,
model_training_parameters,
),
+ direction=optuna.study.StudyDirection.MINIMIZE,
)
optuna_hp_params = self.get_optuna_params(dk.pair, "hp")
}
self.optuna_optimize(
- dk.pair,
- "train",
- lambda trial: train_objective(
+ pair=dk.pair,
+ namespace="train",
+ objective=lambda trial: train_objective(
trial,
self.freqai_info.get("regressor", "xgboost"),
X,
self._optuna_config.get("candles_step"),
model_training_parameters,
),
+ direction=optuna.study.StudyDirection.MINIMIZE,
)
optuna_train_params = self.get_optuna_params(dk.pair, "train")
eval_set, eval_weights = self.eval_set_and_weights(X_test, y_test, test_weights)
- model = train_regressor(
+ model = fit_regressor(
regressor=self.freqai_info.get("regressor", "xgboost"),
X=X,
y=y,
def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None:
warmed_up = True
- num_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
+ fit_live_predictions_candles = self.freqai_info.get(
+ "fit_live_predictions_candles", 100
+ )
+
+ df = self.data_provider.get_pair_dataframe(pair)
+ self.optuna_optimize(
+ pair=pair,
+ namespace="label",
+ objective=lambda trial: label_objective(
+ trial,
+ df,
+ fit_live_predictions_candles,
+ self._optuna_config.get("candles_step"),
+ ),
+ directions=[
+ optuna.study.StudyDirection.MAXIMIZE,
+ optuna.study.StudyDirection.MAXIMIZE,
+ ],
+ )
+
if self.live:
if not hasattr(self, "exchange_candles"):
self.exchange_candles = len(self.dd.model_return_values[pair].index)
candles_diff = len(self.dd.historic_predictions[pair].index) - (
- num_candles + self.exchange_candles
+ fit_live_predictions_candles + self.exchange_candles
)
if candles_diff < 0:
logger.warning(
pred_df_full = (
self.dd.historic_predictions[pair]
- .iloc[-num_candles:]
+ .iloc[-fit_live_predictions_candles:]
.reset_index(drop=True)
)
dk.data["extra_returns_per_train"]["DI_value_param3"] = f[2]
dk.data["extra_returns_per_train"]["DI_cutoff"] = cutoff
+ dk.data["extra_returns_per_train"]["label_period_candles"] = (
+ self.get_optuna_params(pair, "label").get("label_period_candles")
+ )
+ dk.data["extra_returns_per_train"]["label_natr_ratio"] = self.get_optuna_params(
+ pair, "label"
+ ).get("label_natr_ratio")
+
dk.data["extra_returns_per_train"]["hp_rmse"] = self.get_optuna_rmse(pair, "hp")
dk.data["extra_returns_per_train"]["train_rmse"] = self.get_optuna_rmse(
pair, "train"
)
return min_pred, max_pred
+ @staticmethod
+ def get_multi_objective_study_best_trial(
+ namespace: str, study: optuna.study.Study
+ ) -> Optional[optuna.trial.FrozenTrial]:
+ if not QuickAdapterRegressorV3.optuna_study_has_best_trials(study):
+ return None
+ best_trials = study.best_trials
+ if namespace == "label":
+ range_sizes = [trial.values[1] for trial in best_trials]
+ mean_range_size = np.mean(range_sizes)
+ equal_mean_trials = [
+ trial
+ for trial in best_trials
+ if np.isclose(trial.values[1], mean_range_size)
+ ]
+ if equal_mean_trials:
+ return max(equal_mean_trials, key=lambda trial: trial.values[0])
+ nearest_above_mean = (np.inf, -np.inf, None)
+ nearest_below_mean = (-np.inf, -np.inf, None)
+ for idx, trial in enumerate(best_trials):
+ range_size = trial.values[1]
+ if range_size >= mean_range_size:
+ if range_size < nearest_above_mean[0] or (
+ range_size == nearest_above_mean[0]
+ and trial.values[0] > nearest_above_mean[1]
+ ):
+ nearest_above_mean = (range_size, trial.values[0], idx)
+ if range_size <= mean_range_size:
+ if range_size > nearest_below_mean[0] or (
+ range_size == nearest_below_mean[0]
+ and trial.values[0] > nearest_below_mean[1]
+ ):
+ nearest_below_mean = (range_size, trial.values[0], idx)
+ if nearest_above_mean[2] is None or nearest_below_mean[2] is None:
+ return None
+ above_mean_trial = best_trials[nearest_above_mean[2]]
+ below_mean_trial = best_trials[nearest_below_mean[2]]
+ if above_mean_trial.values[0] >= below_mean_trial.values[0]:
+ return above_mean_trial
+ else:
+ return below_mean_trial
+ else:
+ raise ValueError(f"Invalid namespace: {namespace}")
+
def optuna_optimize(
self,
pair: str,
namespace: str,
- objective: Callable[[optuna.Trial], float],
+ objective: Callable[[optuna.trial.Trial], float],
+ direction: Optional[optuna.study.StudyDirection] = None,
+ directions: Optional[list[optuna.study.StudyDirection]] = None,
) -> None:
identifier = self.freqai_info.get("identifier")
- study = self.optuna_create_study(pair, f"{identifier}-{namespace}-{pair}")
+ study = self.optuna_create_study(
+ pair=pair,
+ study_name=f"{identifier}-{pair}-{namespace}",
+ direction=direction,
+ directions=directions,
+ )
if not study:
return
if self._optuna_config.get("warm_start"):
self.optuna_enqueue_previous_best_params(pair, namespace, study)
- logger.info(f"Optuna {namespace} hyperopt started")
+ is_study_multi_objective = direction is None and directions is not None
+ if is_study_multi_objective is True:
+ objective_type = "multi objective"
+ else:
+ objective_type = "single objective"
+ logger.info(f"Optuna {pair} {namespace} {objective_type} hyperopt started")
start_time = time.time()
try:
study.optimize(
except Exception as e:
time_spent = time.time() - start_time
logger.error(
- f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): {str(e)}",
+ f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): {str(e)}",
exc_info=True,
)
return
time_spent = time.time() - start_time
- if QuickAdapterRegressorV3.optuna_study_has_best_params(study):
- logger.info(f"Optuna {namespace} hyperopt done ({time_spent:.2f} secs)")
- self.set_optuna_params(pair, namespace, study.best_params)
+ if is_study_multi_objective is False:
+ if not QuickAdapterRegressorV3.optuna_study_has_best_params(study):
+ logger.error(
+ f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): no study best params found"
+ )
+ return
self.set_optuna_rmse(pair, namespace, study.best_value)
- for key, value in {
+ self.set_optuna_params(pair, namespace, study.best_params)
+ study_results = {
"rmse": self.get_optuna_rmse(pair, namespace),
**self.get_optuna_params(pair, namespace),
- }.items():
- logger.info(f"Optuna {namespace} hyperopt | {key:>20s} : {value}")
- self.optuna_save_best_params(pair, namespace)
+ }
else:
- logger.error(
- f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
+ best_trial = QuickAdapterRegressorV3.get_multi_objective_study_best_trial(
+ "label", study
+ )
+ if not best_trial:
+ logger.error(
+ f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): no study best trial found"
+ )
+ return
+ self.set_optuna_values(pair, namespace, best_trial.values)
+ self.set_optuna_params(pair, namespace, best_trial.params)
+ study_results = {
+ "values": self.get_optuna_values(pair, namespace),
+ **self.get_optuna_params(pair, namespace),
+ }
+ logger.info(
+ f"Optuna {pair} {namespace} {objective_type} done ({time_spent:.2f} secs)"
+ )
+ for key, value in study_results.items():
+ logger.info(
+ f"Optuna {pair} {namespace} {objective_type} hyperopt | {key:>20s} : {value}"
)
+ self.optuna_save_best_params(pair, namespace)
def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage:
storage_dir = self.full_path
return storage
def optuna_create_study(
- self, pair: str, study_name: str
+ self,
+ pair: str,
+ study_name: str,
+ direction: Optional[optuna.study.StudyDirection] = None,
+ directions: Optional[list[optuna.study.StudyDirection]] = None,
) -> Optional[optuna.study.Study]:
try:
storage = self.optuna_storage(pair)
multivariate=True, group=True, seed=1
),
pruner=optuna.pruners.HyperbandPruner(),
- direction=optuna.study.StudyDirection.MINIMIZE,
+ direction=direction,
+ directions=directions,
storage=storage,
load_if_exists=not self._optuna_config.get("continuous"),
)
except ValueError:
return False
+ @staticmethod
+ def optuna_study_has_best_trials(study: Optional[optuna.study.Study]) -> bool:
+ if study is None:
+ return False
+ try:
+ _ = study.best_trials
+ return True
+ # file backend storage raises KeyError
+ except KeyError:
+ return False
+ # sqlite backend storage raises ValueError
+ except ValueError:
+ return False
+
-def get_callbacks(trial: optuna.Trial, regressor: str) -> list[Callable]:
+def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]:
if regressor == "xgboost":
callbacks = [
optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
return callbacks
-def train_regressor(
+def fit_regressor(
regressor: str,
X: pd.DataFrame,
y: pd.DataFrame,
return model
-def round_to_nearest(value: float, step: int) -> int:
- """
- Round a value to the nearest multiple of a given step.
- :param value: The value to round.
- :param step: The step size to round to (must be non-zero).
- :return: The rounded value.
- :raises ValueError: If step is zero.
- """
- if step == 0:
- raise ValueError("step must be non-zero")
- return int(round(value / step) * step)
-
-
def train_objective(
- trial: optuna.Trial,
+ trial: optuna.trial.Trial,
regressor: str,
X: pd.DataFrame,
y: pd.DataFrame,
y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
- model = train_regressor(
+ model = fit_regressor(
regressor=regressor,
X=X,
y=y,
return error
-def get_optuna_study_model_parameters(trial: optuna.Trial, regressor: str) -> dict:
+def get_optuna_study_model_parameters(
+ trial: optuna.trial.Trial, regressor: str
+) -> dict:
study_model_parameters = {
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
"min_child_weight": trial.suggest_int("min_child_weight", 1, 200),
def hp_objective(
- trial: optuna.Trial,
+ trial: optuna.trial.Trial,
regressor: str,
X: pd.DataFrame,
y: pd.DataFrame,
study_model_parameters = get_optuna_study_model_parameters(trial, regressor)
model_training_parameters = {**model_training_parameters, **study_model_parameters}
- model = train_regressor(
+ model = fit_regressor(
regressor=regressor,
X=X,
y=y,
return error
+def dynamic_zigzag(
+ df: pd.DataFrame,
+ period: int = 14,
+ natr: bool = True,
+ ratio: float = 1.0,
+) -> tuple[list[int], list[float], list[int]]:
+ """
+ Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR.
+
+ Parameters:
+ df (pd.DataFrame): OHLCV DataFrame.
+ period (int): Period for ATR/NATR calculation (default: 14).
+ natr (bool): Use NATR (True) or ATR (False) (default: True).
+ ratio (float): ratio for dynamic threshold (default: 1.0).
+
+ Returns:
+ tuple: Lists of indices, extrema, and directions.
+ """
+ if df.empty:
+ return [], [], []
+
+ if natr:
+ thresholds = ta.NATR(df, timeperiod=period)
+ else:
+ thresholds = ta.ATR(df, timeperiod=period)
+ thresholds = thresholds.ffill().bfill() * ratio
+
+ indices = []
+ extrema = []
+ directions = []
+
+ first_high = df["high"].iloc[0]
+ first_low = df["low"].iloc[0]
+ first_threshold = thresholds.iloc[0]
+
+ if natr:
+ first_move = (first_high - first_low) / first_low
+ else:
+ first_move = first_high - first_low
+ if first_move >= first_threshold:
+ current_dir = 1
+ current_extreme = first_high
+ else:
+ current_dir = -1
+ current_extreme = first_low
+ current_extreme_idx = df.index[0]
+
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ for i in range(1, len(df)):
+ current_idx = df.index[i]
+ h = df.at[current_idx, "high"]
+ l = df.at[current_idx, "low"]
+ threshold = thresholds.iloc[i]
+
+ if current_dir == 1: # Looking for higher high
+ if h > current_extreme:
+ current_extreme = h
+ current_extreme_idx = current_idx
+ continue
+ if natr:
+ reversal = (current_extreme - l) / current_extreme >= threshold
+ else:
+ reversal = (current_extreme - l) >= threshold
+ if reversal:
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ current_dir = -1
+ current_extreme = l
+ current_extreme_idx = current_idx
+
+ elif current_dir == -1: # Looking for lower low
+ if l < current_extreme:
+ current_extreme = l
+ current_extreme_idx = current_idx
+ continue
+ if natr:
+ reversal = (h - current_extreme) / current_extreme >= threshold
+ else:
+ reversal = (h - current_extreme) >= threshold
+ if reversal:
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+ last_idx = current_extreme_idx
+
+ current_dir = 1
+ current_extreme = h
+ current_extreme_idx = current_idx
+
+ if current_extreme_idx != last_idx:
+ indices.append(current_extreme_idx)
+ extrema.append(current_extreme)
+ directions.append(current_dir)
+
+ return indices[1:], extrema[1:], directions[1:]
+
+
+def label_objective(
+ trial: optuna.trial.Trial,
+ df: pd.DataFrame,
+ fit_live_predictions_candles: int,
+ candles_step: int,
+) -> tuple[float, float]:
+ min_label_period_candles: int = round_to_nearest(
+ max(fit_live_predictions_candles // 16, 20), candles_step
+ )
+ max_label_period_candles: int = round_to_nearest(
+ max(fit_live_predictions_candles // 4, min_label_period_candles),
+ candles_step,
+ )
+ label_period_candles = trial.suggest_int(
+ "label_period_candles",
+ min_label_period_candles,
+ max_label_period_candles,
+ step=candles_step,
+ )
+ label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.05, 0.1)
+
+ _, peak_values, _ = dynamic_zigzag(
+ df,
+ period=label_period_candles,
+ ratio=label_natr_ratio,
+ )
+
+ if len(peak_values) < 2:
+ return -float("inf"), -float("inf")
+
+ previous_value = peak_values[0]
+ peak_ranges = []
+ for peak_value in peak_values[1:]:
+ peak_ranges.append(abs(peak_value - previous_value))
+ previous_value = peak_value
+
+ return np.mean(peak_ranges), len(peak_ranges)
+
+
def smoothed_max(series: pd.Series, temperature=1.0) -> float:
data_array = series.to_numpy()
if data_array.size == 0:
return np.nan
if temperature < 0:
raise ValueError("temperature must be non-negative.")
- if temperature == 0:
+ if np.close(temperature, 0):
return data_array.max()
return sp.special.logsumexp(temperature * data_array) / temperature
return np.nan
if temperature < 0:
raise ValueError("temperature must be non-negative.")
- if temperature == 0:
+ if np.close(temperature, 0):
return data_array.min()
return -sp.special.logsumexp(-temperature * data_array) / temperature
+
+
+def round_to_nearest(value: float, step: int) -> int:
+ """
+ Round a value to the nearest multiple of a given step.
+ :param value: The value to round.
+ :param step: The step size to round to (must be non-zero).
+ :return: The rounded value.
+ :raises ValueError: If step is zero.
+ """
+ if step == 0:
+ raise ValueError("step must be non-zero")
+ return int(round(value / step) * step)
+import json
import logging
from functools import reduce, cached_property
import datetime
import math
from pathlib import Path
-from statistics import geometric_mean
import talib.abstract as ta
from pandas import DataFrame, Series, isna
from typing import Optional
INTERFACE_VERSION = 3
def version(self) -> str:
- return "3.2.16"
+ return "3.3.0"
timeframe = "5m"
stoploss = -0.02
use_custom_stoploss = True
- @cached_property
- def trailing_stoploss_natr_ratio(self) -> float:
- return self.config.get("trailing_stoploss_natr_ratio", 0.025)
-
# Trailing stop:
trailing_stop = False
trailing_stop_positive = 0.01
trailing_stop_positive_offset = 0.011
trailing_only_offset_is_reached = True
- @cached_property
- def label_natr_ratio(self) -> float:
- return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.075)
-
@cached_property
def entry_natr_ratio(self) -> float:
return self.config.get("entry_pricing", {}).get("entry_natr_ratio", 0.00025)
/ "models"
/ f"{self.freqai_info.get('identifier', 'no_id_provided')}"
)
+ self._label_params: dict[str, dict] = {}
+ for pair in self.pairs:
+ self._label_params[pair] = self.optuna_load_best_params(pair, "label") or {
+ "label_period_candles": self.freqai_info["feature_parameters"].get(
+ "label_period_candles", 50
+ ),
+ "label_natr_ratio": self.freqai_info["feature_parameters"].get(
+ "label_natr_ratio", 0.075
+ ),
+ }
def feature_engineering_expand_all(
self, dataframe: DataFrame, period: int, metadata: dict, **kwargs
return dataframe
def get_label_period_candles(self, pair: str) -> int:
+ label_period_candles = self._label_params.get(pair).get("label_period_candles")
+ if label_period_candles:
+ return label_period_candles
return self.freqai_info["feature_parameters"].get("label_period_candles", 50)
+ def set_label_period_candles(self, pair: str, label_period_candles: int):
+ if label_period_candles:
+ self._label_params[pair]["label_period_candles"] = label_period_candles
+
+ def get_label_natr_ratio(self, pair: str) -> float:
+ label_natr_ratio = self._label_params.get(pair).get("label_natr_ratio")
+ if label_natr_ratio:
+ return label_natr_ratio
+ return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.075)
+
+ def set_label_natr_ratio(self, pair: str, label_natr_ratio: float):
+ if label_natr_ratio:
+ self._label_params[pair]["label_natr_ratio"] = label_natr_ratio
+
+ def get_trailing_stoploss_natr_ratio(self, pair: str) -> float:
+ return self.get_label_natr_ratio(pair) * 0.025
+
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs):
- label_period_candles = self.get_label_period_candles(str(metadata.get("pair")))
+ pair = str(metadata.get("pair"))
peak_indices, _, peak_directions = dynamic_zigzag(
- dataframe, period=label_period_candles, ratio=self.label_natr_ratio
+ dataframe,
+ period=self.get_label_period_candles(pair),
+ ratio=self.get_label_natr_ratio(pair),
)
dataframe[EXTREMA_COLUMN] = 0
for peak_idx, peak_dir in zip(peak_indices, peak_directions):
pair = str(metadata.get("pair"))
- label_period_candles = self.get_label_period_candles(pair)
+ self.set_label_period_candles(pair, dataframe["label_period_candles"].iloc[-1])
+ self.set_label_natr_ratio(pair, dataframe["label_natr_ratio"].iloc[-1])
+
dataframe["natr_label_period_candles"] = ta.NATR(
- dataframe, timeperiod=label_period_candles
+ dataframe, timeperiod=self.get_label_period_candles(pair)
)
dataframe["minima_threshold"] = dataframe[MINIMA_THRESHOLD_COLUMN]
return (
current_rate
* current_natr
- * self.trailing_stoploss_natr_ratio
+ * self.get_trailing_stoploss_natr_ratio(trade.pair)
* (1 / math.log10(1 + 0.25 * trade_duration_candles))
)
if isna(current_natr):
return None
trade_take_profit_distance = (
- trade.open_rate * entry_natr * self.trailing_stoploss_natr_ratio
+ trade.open_rate
+ * entry_natr
+ * self.get_trailing_stoploss_natr_ratio(trade.pair)
)
return max(
trade_take_profit_distance,
- geometric_mean(
+ np.mean(
[
trade_take_profit_distance,
- current_rate * current_natr * self.trailing_stoploss_natr_ratio,
+ current_rate
+ * current_natr
+ * self.get_trailing_stoploss_natr_ratio(trade.pair),
]
)
* math.log10(9 + trade_duration_candles)
stoploss_distance = self.get_stoploss_distance(df, trade, current_rate)
if isna(stoploss_distance):
return None
- if stoploss_distance == 0:
+ if np.close(stoploss_distance, 0):
return None
sign = 1 if trade.is_short else -1
return stoploss_from_absolute(
take_profit_distance = self.get_take_profit_distance(df, trade, current_rate)
if isna(take_profit_distance):
return None
- if take_profit_distance == 0:
+ if np.close(take_profit_distance, 0):
return None
if trade.is_short:
take_profit_price = trade.open_rate - take_profit_distance
extrema_smoothing,
smoothing_methods["gaussian"],
)
+
+ def optuna_load_best_params(self, pair: str, namespace: str) -> Optional[dict]:
+ best_params_path = Path(
+ self.models_full_path
+ / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
+ )
+ if best_params_path.is_file():
+ with best_params_path.open("r", encoding="utf-8") as read_file:
+ return json.load(read_file)
+ return None