From: Jérôme Benoit Date: Wed, 9 Apr 2025 13:44:51 +0000 (+0200) Subject: feat(qav3): peaks identification optimisation zigzag compatible X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=6d17efee6090b9108b584f6e77dd9689db2bf569;p=freqai-strategies.git feat(qav3): peaks identification optimisation zigzag compatible Signed-off-by: Jérôme Benoit --- diff --git a/quickadapter/user_data/config-template.json b/quickadapter/user_data/config-template.json index 1b68675..975095d 100644 --- a/quickadapter/user_data/config-template.json +++ b/quickadapter/user_data/config-template.json @@ -134,6 +134,8 @@ "DI_cutoff": 2, "&s-minima_threshold": -2, "&s-maxima_threshold": 2, + "label_period_candles": 100, + "label_natr_ratio": 0.075, "hp_rmse": -1, "train_rmse": -1 }, diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index f51ac5d..154f264 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -7,6 +7,7 @@ import scipy as sp import optuna import sklearn import warnings +import talib.abstract as ta from functools import cached_property from typing import Any, Callable, Optional @@ -43,7 +44,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.6.9" + version = "3.7.0" @cached_property def _optuna_config(self) -> dict: @@ -86,17 +87,26 @@ class QuickAdapterRegressorV3(BaseRegressionModel): ) self._optuna_hp_rmse: dict[str, float] = {} self._optuna_train_rmse: dict[str, float] = {} + self._optuna_label_values: dict[str, dict] = {} self._optuna_hp_params: dict[str, dict] = {} self._optuna_train_params: dict[str, dict] = {} + self._optuna_label_params: dict[str, dict] = {} for pair in self.pairs: self._optuna_hp_rmse[pair] = -1 self._optuna_train_rmse[pair] = -1 + self._optuna_label_values[pair] = [-1, -1] self._optuna_hp_params[pair] = ( self.optuna_load_best_params(pair, "hp") or {} ) self._optuna_train_params[pair] = ( self.optuna_load_best_params(pair, "train") or {} ) + self._optuna_label_params[pair] = self.optuna_load_best_params( + pair, "label" + ) or { + "label_period_candles": self.ft_params.get("label_period_candles", 50), + "label_natr_ratio": self.ft_params.get("label_natr_ratio", 0.075), + } logger.info( f"Initialized {self.__class__.__name__} {self.freqai_info.get('regressor', 'xgboost')} regressor model version {self.version}" ) @@ -106,6 +116,8 @@ class QuickAdapterRegressorV3(BaseRegressionModel): params = self._optuna_hp_params.get(pair) elif namespace == "train": params = self._optuna_train_params.get(pair) + elif namespace == "label": + params = self._optuna_label_params.get(pair) else: raise ValueError(f"Invalid namespace: {namespace}") return params @@ -115,6 +127,8 @@ class QuickAdapterRegressorV3(BaseRegressionModel): self._optuna_hp_params[pair] = params elif namespace == "train": self._optuna_train_params[pair] = params + elif namespace == "label": + self._optuna_label_params[pair] = params else: raise ValueError(f"Invalid namespace: {namespace}") @@ -135,6 +149,19 @@ class QuickAdapterRegressorV3(BaseRegressionModel): else: raise ValueError(f"Invalid namespace: {namespace}") + def get_optuna_values(self, pair: str, namespace: str) -> list: + if namespace == "label": + values = self._optuna_label_values.get(pair) + else: + raise ValueError(f"Invalid namespace: {namespace}") + return values + + def set_optuna_values(self, pair: str, namespace: str, values: list) -> None: + if namespace == "label": + self._optuna_label_values[pair] = values + else: + raise ValueError(f"Invalid namespace: {namespace}") + def fit(self, data_dictionary: dict, dk: FreqaiDataKitchen, **kwargs) -> Any: """ User sets up the training and test data to fit their desired model here @@ -158,9 +185,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel): start = time.time() if self._optuna_hyperopt: self.optuna_optimize( - dk.pair, - "hp", - lambda trial: hp_objective( + pair=dk.pair, + namespace="hp", + objective=lambda trial: hp_objective( trial, self.freqai_info.get("regressor", "xgboost"), X, @@ -171,6 +198,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): test_weights, model_training_parameters, ), + direction=optuna.study.StudyDirection.MINIMIZE, ) optuna_hp_params = self.get_optuna_params(dk.pair, "hp") @@ -181,9 +209,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel): } self.optuna_optimize( - dk.pair, - "train", - lambda trial: train_objective( + pair=dk.pair, + namespace="train", + objective=lambda trial: train_objective( trial, self.freqai_info.get("regressor", "xgboost"), X, @@ -196,6 +224,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): self._optuna_config.get("candles_step"), model_training_parameters, ), + direction=optuna.study.StudyDirection.MINIMIZE, ) optuna_train_params = self.get_optuna_params(dk.pair, "train") @@ -212,7 +241,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): eval_set, eval_weights = self.eval_set_and_weights(X_test, y_test, test_weights) - model = train_regressor( + model = fit_regressor( regressor=self.freqai_info.get("regressor", "xgboost"), X=X, y=y, @@ -230,12 +259,31 @@ class QuickAdapterRegressorV3(BaseRegressionModel): def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None: warmed_up = True - num_candles = self.freqai_info.get("fit_live_predictions_candles", 100) + fit_live_predictions_candles = self.freqai_info.get( + "fit_live_predictions_candles", 100 + ) + + df = self.data_provider.get_pair_dataframe(pair) + self.optuna_optimize( + pair=pair, + namespace="label", + objective=lambda trial: label_objective( + trial, + df, + fit_live_predictions_candles, + self._optuna_config.get("candles_step"), + ), + directions=[ + optuna.study.StudyDirection.MAXIMIZE, + optuna.study.StudyDirection.MAXIMIZE, + ], + ) + if self.live: if not hasattr(self, "exchange_candles"): self.exchange_candles = len(self.dd.model_return_values[pair].index) candles_diff = len(self.dd.historic_predictions[pair].index) - ( - num_candles + self.exchange_candles + fit_live_predictions_candles + self.exchange_candles ) if candles_diff < 0: logger.warning( @@ -245,7 +293,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): pred_df_full = ( self.dd.historic_predictions[pair] - .iloc[-num_candles:] + .iloc[-fit_live_predictions_candles:] .reset_index(drop=True) ) @@ -286,6 +334,13 @@ class QuickAdapterRegressorV3(BaseRegressionModel): dk.data["extra_returns_per_train"]["DI_value_param3"] = f[2] dk.data["extra_returns_per_train"]["DI_cutoff"] = cutoff + dk.data["extra_returns_per_train"]["label_period_candles"] = ( + self.get_optuna_params(pair, "label").get("label_period_candles") + ) + dk.data["extra_returns_per_train"]["label_natr_ratio"] = self.get_optuna_params( + pair, "label" + ).get("label_natr_ratio") + dk.data["extra_returns_per_train"]["hp_rmse"] = self.get_optuna_rmse(pair, "hp") dk.data["extra_returns_per_train"]["train_rmse"] = self.get_optuna_rmse( pair, "train" @@ -317,21 +372,77 @@ class QuickAdapterRegressorV3(BaseRegressionModel): ) return min_pred, max_pred + @staticmethod + def get_multi_objective_study_best_trial( + namespace: str, study: optuna.study.Study + ) -> Optional[optuna.trial.FrozenTrial]: + if not QuickAdapterRegressorV3.optuna_study_has_best_trials(study): + return None + best_trials = study.best_trials + if namespace == "label": + range_sizes = [trial.values[1] for trial in best_trials] + mean_range_size = np.mean(range_sizes) + equal_mean_trials = [ + trial + for trial in best_trials + if np.isclose(trial.values[1], mean_range_size) + ] + if equal_mean_trials: + return max(equal_mean_trials, key=lambda trial: trial.values[0]) + nearest_above_mean = (np.inf, -np.inf, None) + nearest_below_mean = (-np.inf, -np.inf, None) + for idx, trial in enumerate(best_trials): + range_size = trial.values[1] + if range_size >= mean_range_size: + if range_size < nearest_above_mean[0] or ( + range_size == nearest_above_mean[0] + and trial.values[0] > nearest_above_mean[1] + ): + nearest_above_mean = (range_size, trial.values[0], idx) + if range_size <= mean_range_size: + if range_size > nearest_below_mean[0] or ( + range_size == nearest_below_mean[0] + and trial.values[0] > nearest_below_mean[1] + ): + nearest_below_mean = (range_size, trial.values[0], idx) + if nearest_above_mean[2] is None or nearest_below_mean[2] is None: + return None + above_mean_trial = best_trials[nearest_above_mean[2]] + below_mean_trial = best_trials[nearest_below_mean[2]] + if above_mean_trial.values[0] >= below_mean_trial.values[0]: + return above_mean_trial + else: + return below_mean_trial + else: + raise ValueError(f"Invalid namespace: {namespace}") + def optuna_optimize( self, pair: str, namespace: str, - objective: Callable[[optuna.Trial], float], + objective: Callable[[optuna.trial.Trial], float], + direction: Optional[optuna.study.StudyDirection] = None, + directions: Optional[list[optuna.study.StudyDirection]] = None, ) -> None: identifier = self.freqai_info.get("identifier") - study = self.optuna_create_study(pair, f"{identifier}-{namespace}-{pair}") + study = self.optuna_create_study( + pair=pair, + study_name=f"{identifier}-{pair}-{namespace}", + direction=direction, + directions=directions, + ) if not study: return if self._optuna_config.get("warm_start"): self.optuna_enqueue_previous_best_params(pair, namespace, study) - logger.info(f"Optuna {namespace} hyperopt started") + is_study_multi_objective = direction is None and directions is not None + if is_study_multi_objective is True: + objective_type = "multi objective" + else: + objective_type = "single objective" + logger.info(f"Optuna {pair} {namespace} {objective_type} hyperopt started") start_time = time.time() try: study.optimize( @@ -344,26 +455,47 @@ class QuickAdapterRegressorV3(BaseRegressionModel): except Exception as e: time_spent = time.time() - start_time logger.error( - f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): {str(e)}", + f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): {str(e)}", exc_info=True, ) return time_spent = time.time() - start_time - if QuickAdapterRegressorV3.optuna_study_has_best_params(study): - logger.info(f"Optuna {namespace} hyperopt done ({time_spent:.2f} secs)") - self.set_optuna_params(pair, namespace, study.best_params) + if is_study_multi_objective is False: + if not QuickAdapterRegressorV3.optuna_study_has_best_params(study): + logger.error( + f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): no study best params found" + ) + return self.set_optuna_rmse(pair, namespace, study.best_value) - for key, value in { + self.set_optuna_params(pair, namespace, study.best_params) + study_results = { "rmse": self.get_optuna_rmse(pair, namespace), **self.get_optuna_params(pair, namespace), - }.items(): - logger.info(f"Optuna {namespace} hyperopt | {key:>20s} : {value}") - self.optuna_save_best_params(pair, namespace) + } else: - logger.error( - f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" + best_trial = QuickAdapterRegressorV3.get_multi_objective_study_best_trial( + "label", study + ) + if not best_trial: + logger.error( + f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): no study best trial found" + ) + return + self.set_optuna_values(pair, namespace, best_trial.values) + self.set_optuna_params(pair, namespace, best_trial.params) + study_results = { + "values": self.get_optuna_values(pair, namespace), + **self.get_optuna_params(pair, namespace), + } + logger.info( + f"Optuna {pair} {namespace} {objective_type} done ({time_spent:.2f} secs)" + ) + for key, value in study_results.items(): + logger.info( + f"Optuna {pair} {namespace} {objective_type} hyperopt | {key:>20s} : {value}" ) + self.optuna_save_best_params(pair, namespace) def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage: storage_dir = self.full_path @@ -384,7 +516,11 @@ class QuickAdapterRegressorV3(BaseRegressionModel): return storage def optuna_create_study( - self, pair: str, study_name: str + self, + pair: str, + study_name: str, + direction: Optional[optuna.study.StudyDirection] = None, + directions: Optional[list[optuna.study.StudyDirection]] = None, ) -> Optional[optuna.study.Study]: try: storage = self.optuna_storage(pair) @@ -405,7 +541,8 @@ class QuickAdapterRegressorV3(BaseRegressionModel): multivariate=True, group=True, seed=1 ), pruner=optuna.pruners.HyperbandPruner(), - direction=optuna.study.StudyDirection.MINIMIZE, + direction=direction, + directions=directions, storage=storage, load_if_exists=not self._optuna_config.get("continuous"), ) @@ -482,8 +619,22 @@ class QuickAdapterRegressorV3(BaseRegressionModel): except ValueError: return False + @staticmethod + def optuna_study_has_best_trials(study: Optional[optuna.study.Study]) -> bool: + if study is None: + return False + try: + _ = study.best_trials + return True + # file backend storage raises KeyError + except KeyError: + return False + # sqlite backend storage raises ValueError + except ValueError: + return False + -def get_callbacks(trial: optuna.Trial, regressor: str) -> list[Callable]: +def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]: if regressor == "xgboost": callbacks = [ optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse") @@ -495,7 +646,7 @@ def get_callbacks(trial: optuna.Trial, regressor: str) -> list[Callable]: return callbacks -def train_regressor( +def fit_regressor( regressor: str, X: pd.DataFrame, y: pd.DataFrame, @@ -542,21 +693,8 @@ def train_regressor( return model -def round_to_nearest(value: float, step: int) -> int: - """ - Round a value to the nearest multiple of a given step. - :param value: The value to round. - :param step: The step size to round to (must be non-zero). - :return: The rounded value. - :raises ValueError: If step is zero. - """ - if step == 0: - raise ValueError("step must be non-zero") - return int(round(value / step) * step) - - def train_objective( - trial: optuna.Trial, + trial: optuna.trial.Trial, regressor: str, X: pd.DataFrame, y: pd.DataFrame, @@ -590,7 +728,7 @@ def train_objective( y_test = y_test.iloc[-test_window:] test_weights = test_weights[-test_window:] - model = train_regressor( + model = fit_regressor( regressor=regressor, X=X, y=y, @@ -609,7 +747,9 @@ def train_objective( return error -def get_optuna_study_model_parameters(trial: optuna.Trial, regressor: str) -> dict: +def get_optuna_study_model_parameters( + trial: optuna.trial.Trial, regressor: str +) -> dict: study_model_parameters = { "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), "min_child_weight": trial.suggest_int("min_child_weight", 1, 200), @@ -639,7 +779,7 @@ def get_optuna_study_model_parameters(trial: optuna.Trial, regressor: str) -> di def hp_objective( - trial: optuna.Trial, + trial: optuna.trial.Trial, regressor: str, X: pd.DataFrame, y: pd.DataFrame, @@ -652,7 +792,7 @@ def hp_objective( study_model_parameters = get_optuna_study_model_parameters(trial, regressor) model_training_parameters = {**model_training_parameters, **study_model_parameters} - model = train_regressor( + model = fit_regressor( regressor=regressor, X=X, y=y, @@ -671,13 +811,158 @@ def hp_objective( return error +def dynamic_zigzag( + df: pd.DataFrame, + period: int = 14, + natr: bool = True, + ratio: float = 1.0, +) -> tuple[list[int], list[float], list[int]]: + """ + Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR. + + Parameters: + df (pd.DataFrame): OHLCV DataFrame. + period (int): Period for ATR/NATR calculation (default: 14). + natr (bool): Use NATR (True) or ATR (False) (default: True). + ratio (float): ratio for dynamic threshold (default: 1.0). + + Returns: + tuple: Lists of indices, extrema, and directions. + """ + if df.empty: + return [], [], [] + + if natr: + thresholds = ta.NATR(df, timeperiod=period) + else: + thresholds = ta.ATR(df, timeperiod=period) + thresholds = thresholds.ffill().bfill() * ratio + + indices = [] + extrema = [] + directions = [] + + first_high = df["high"].iloc[0] + first_low = df["low"].iloc[0] + first_threshold = thresholds.iloc[0] + + if natr: + first_move = (first_high - first_low) / first_low + else: + first_move = first_high - first_low + if first_move >= first_threshold: + current_dir = 1 + current_extreme = first_high + else: + current_dir = -1 + current_extreme = first_low + current_extreme_idx = df.index[0] + + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + for i in range(1, len(df)): + current_idx = df.index[i] + h = df.at[current_idx, "high"] + l = df.at[current_idx, "low"] + threshold = thresholds.iloc[i] + + if current_dir == 1: # Looking for higher high + if h > current_extreme: + current_extreme = h + current_extreme_idx = current_idx + continue + if natr: + reversal = (current_extreme - l) / current_extreme >= threshold + else: + reversal = (current_extreme - l) >= threshold + if reversal: + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + current_dir = -1 + current_extreme = l + current_extreme_idx = current_idx + + elif current_dir == -1: # Looking for lower low + if l < current_extreme: + current_extreme = l + current_extreme_idx = current_idx + continue + if natr: + reversal = (h - current_extreme) / current_extreme >= threshold + else: + reversal = (h - current_extreme) >= threshold + if reversal: + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + current_dir = 1 + current_extreme = h + current_extreme_idx = current_idx + + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + + return indices[1:], extrema[1:], directions[1:] + + +def label_objective( + trial: optuna.trial.Trial, + df: pd.DataFrame, + fit_live_predictions_candles: int, + candles_step: int, +) -> tuple[float, float]: + min_label_period_candles: int = round_to_nearest( + max(fit_live_predictions_candles // 16, 20), candles_step + ) + max_label_period_candles: int = round_to_nearest( + max(fit_live_predictions_candles // 4, min_label_period_candles), + candles_step, + ) + label_period_candles = trial.suggest_int( + "label_period_candles", + min_label_period_candles, + max_label_period_candles, + step=candles_step, + ) + label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.05, 0.1) + + _, peak_values, _ = dynamic_zigzag( + df, + period=label_period_candles, + ratio=label_natr_ratio, + ) + + if len(peak_values) < 2: + return -float("inf"), -float("inf") + + previous_value = peak_values[0] + peak_ranges = [] + for peak_value in peak_values[1:]: + peak_ranges.append(abs(peak_value - previous_value)) + previous_value = peak_value + + return np.mean(peak_ranges), len(peak_ranges) + + def smoothed_max(series: pd.Series, temperature=1.0) -> float: data_array = series.to_numpy() if data_array.size == 0: return np.nan if temperature < 0: raise ValueError("temperature must be non-negative.") - if temperature == 0: + if np.close(temperature, 0): return data_array.max() return sp.special.logsumexp(temperature * data_array) / temperature @@ -688,6 +973,19 @@ def smoothed_min(series: pd.Series, temperature=1.0) -> float: return np.nan if temperature < 0: raise ValueError("temperature must be non-negative.") - if temperature == 0: + if np.close(temperature, 0): return data_array.min() return -sp.special.logsumexp(-temperature * data_array) / temperature + + +def round_to_nearest(value: float, step: int) -> int: + """ + Round a value to the nearest multiple of a given step. + :param value: The value to round. + :param step: The step size to round to (must be non-zero). + :return: The rounded value. + :raises ValueError: If step is zero. + """ + if step == 0: + raise ValueError("step must be non-zero") + return int(round(value / step) * step) diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 0547d93..98fff8c 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -1,9 +1,9 @@ +import json import logging from functools import reduce, cached_property import datetime import math from pathlib import Path -from statistics import geometric_mean import talib.abstract as ta from pandas import DataFrame, Series, isna from typing import Optional @@ -58,27 +58,19 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.2.16" + return "3.3.0" timeframe = "5m" stoploss = -0.02 use_custom_stoploss = True - @cached_property - def trailing_stoploss_natr_ratio(self) -> float: - return self.config.get("trailing_stoploss_natr_ratio", 0.025) - # Trailing stop: trailing_stop = False trailing_stop_positive = 0.01 trailing_stop_positive_offset = 0.011 trailing_only_offset_is_reached = True - @cached_property - def label_natr_ratio(self) -> float: - return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.075) - @cached_property def entry_natr_ratio(self) -> float: return self.config.get("entry_pricing", {}).get("entry_natr_ratio", 0.00025) @@ -181,6 +173,16 @@ class QuickAdapterV3(IStrategy): / "models" / f"{self.freqai_info.get('identifier', 'no_id_provided')}" ) + self._label_params: dict[str, dict] = {} + for pair in self.pairs: + self._label_params[pair] = self.optuna_load_best_params(pair, "label") or { + "label_period_candles": self.freqai_info["feature_parameters"].get( + "label_period_candles", 50 + ), + "label_natr_ratio": self.freqai_info["feature_parameters"].get( + "label_natr_ratio", 0.075 + ), + } def feature_engineering_expand_all( self, dataframe: DataFrame, period: int, metadata: dict, **kwargs @@ -352,12 +354,34 @@ class QuickAdapterV3(IStrategy): return dataframe def get_label_period_candles(self, pair: str) -> int: + label_period_candles = self._label_params.get(pair).get("label_period_candles") + if label_period_candles: + return label_period_candles return self.freqai_info["feature_parameters"].get("label_period_candles", 50) + def set_label_period_candles(self, pair: str, label_period_candles: int): + if label_period_candles: + self._label_params[pair]["label_period_candles"] = label_period_candles + + def get_label_natr_ratio(self, pair: str) -> float: + label_natr_ratio = self._label_params.get(pair).get("label_natr_ratio") + if label_natr_ratio: + return label_natr_ratio + return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.075) + + def set_label_natr_ratio(self, pair: str, label_natr_ratio: float): + if label_natr_ratio: + self._label_params[pair]["label_natr_ratio"] = label_natr_ratio + + def get_trailing_stoploss_natr_ratio(self, pair: str) -> float: + return self.get_label_natr_ratio(pair) * 0.025 + def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs): - label_period_candles = self.get_label_period_candles(str(metadata.get("pair"))) + pair = str(metadata.get("pair")) peak_indices, _, peak_directions = dynamic_zigzag( - dataframe, period=label_period_candles, ratio=self.label_natr_ratio + dataframe, + period=self.get_label_period_candles(pair), + ratio=self.get_label_natr_ratio(pair), ) dataframe[EXTREMA_COLUMN] = 0 for peak_idx, peak_dir in zip(peak_indices, peak_directions): @@ -381,9 +405,11 @@ class QuickAdapterV3(IStrategy): pair = str(metadata.get("pair")) - label_period_candles = self.get_label_period_candles(pair) + self.set_label_period_candles(pair, dataframe["label_period_candles"].iloc[-1]) + self.set_label_natr_ratio(pair, dataframe["label_natr_ratio"].iloc[-1]) + dataframe["natr_label_period_candles"] = ta.NATR( - dataframe, timeperiod=label_period_candles + dataframe, timeperiod=self.get_label_period_candles(pair) ) dataframe["minima_threshold"] = dataframe[MINIMA_THRESHOLD_COLUMN] @@ -471,7 +497,7 @@ class QuickAdapterV3(IStrategy): return ( current_rate * current_natr - * self.trailing_stoploss_natr_ratio + * self.get_trailing_stoploss_natr_ratio(trade.pair) * (1 / math.log10(1 + 0.25 * trade_duration_candles)) ) @@ -488,14 +514,18 @@ class QuickAdapterV3(IStrategy): if isna(current_natr): return None trade_take_profit_distance = ( - trade.open_rate * entry_natr * self.trailing_stoploss_natr_ratio + trade.open_rate + * entry_natr + * self.get_trailing_stoploss_natr_ratio(trade.pair) ) return max( trade_take_profit_distance, - geometric_mean( + np.mean( [ trade_take_profit_distance, - current_rate * current_natr * self.trailing_stoploss_natr_ratio, + current_rate + * current_natr + * self.get_trailing_stoploss_natr_ratio(trade.pair), ] ) * math.log10(9 + trade_duration_candles) @@ -519,7 +549,7 @@ class QuickAdapterV3(IStrategy): stoploss_distance = self.get_stoploss_distance(df, trade, current_rate) if isna(stoploss_distance): return None - if stoploss_distance == 0: + if np.close(stoploss_distance, 0): return None sign = 1 if trade.is_short else -1 return stoploss_from_absolute( @@ -567,7 +597,7 @@ class QuickAdapterV3(IStrategy): take_profit_distance = self.get_take_profit_distance(df, trade, current_rate) if isna(take_profit_distance): return None - if take_profit_distance == 0: + if np.close(take_profit_distance, 0): return None if trade.is_short: take_profit_price = trade.open_rate - take_profit_distance @@ -682,3 +712,13 @@ class QuickAdapterV3(IStrategy): extrema_smoothing, smoothing_methods["gaussian"], ) + + def optuna_load_best_params(self, pair: str, namespace: str) -> Optional[dict]: + best_params_path = Path( + self.models_full_path + / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json" + ) + if best_params_path.is_file(): + with best_params_path.open("r", encoding="utf-8") as read_file: + return json.load(read_file) + return None diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 82c16fd..160b57f 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -318,10 +318,6 @@ def zigzag( extrema = [] directions = [] - current_dir = 0 - current_extreme = None - current_extreme_idx = None - first_high = df["high"].iloc[0] first_low = df["low"].iloc[0] @@ -406,19 +402,15 @@ def dynamic_zigzag( thresholds = ta.NATR(df, timeperiod=period) else: thresholds = ta.ATR(df, timeperiod=period) - thresholds = thresholds.ffill().bfill() + thresholds = thresholds.ffill().bfill() * ratio indices = [] extrema = [] directions = [] - current_dir = 0 - current_extreme = None - current_extreme_idx = None - first_high = df["high"].iloc[0] first_low = df["low"].iloc[0] - first_threshold = thresholds.iloc[0] * ratio + first_threshold = thresholds.iloc[0] if natr: first_move = (first_high - first_low) / first_low @@ -441,47 +433,47 @@ def dynamic_zigzag( current_idx = df.index[i] h = df.at[current_idx, "high"] l = df.at[current_idx, "low"] - threshold = thresholds.iloc[i] * ratio + threshold = thresholds.iloc[i] if current_dir == 1: # Looking for higher high if h > current_extreme: current_extreme = h current_extreme_idx = current_idx + continue + if natr: + reversal = (current_extreme - l) / current_extreme >= threshold else: - if natr: - reversal = (current_extreme - l) / current_extreme >= threshold - else: - reversal = (current_extreme - l) >= threshold - if reversal: - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - - current_dir = -1 - current_extreme = l - current_extreme_idx = current_idx + reversal = (current_extreme - l) >= threshold + if reversal: + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + current_dir = -1 + current_extreme = l + current_extreme_idx = current_idx elif current_dir == -1: # Looking for lower low if l < current_extreme: current_extreme = l current_extreme_idx = current_idx + continue + if natr: + reversal = (h - current_extreme) / current_extreme >= threshold else: - if natr: - reversal = (h - current_extreme) / current_extreme >= threshold - else: - reversal = (h - current_extreme) >= threshold - if reversal: - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - - current_dir = 1 - current_extreme = h - current_extreme_idx = current_idx + reversal = (h - current_extreme) >= threshold + if reversal: + if current_extreme_idx != last_idx: + indices.append(current_extreme_idx) + extrema.append(current_extreme) + directions.append(current_dir) + last_idx = current_extreme_idx + + current_dir = 1 + current_extreme = h + current_extreme_idx = current_idx if current_extreme_idx != last_idx: indices.append(current_extreme_idx)