From 809d7f637f53433c4d9d78c25666861cc98711bf Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 24 Mar 2025 10:35:50 +0100 Subject: [PATCH] refactor: factor out helper functions MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- ReforceXY/user_data/freqaimodels/ReforceXY.py | 5 +- .../LightGBMRegressorQuickAdapterV3.py | 93 +++--- .../XGBoostRegressorQuickAdapterV3.py | 93 +++--- .../user_data/strategies/QuickAdapterV3.py | 142 +++------ quickadapter/user_data/strategies/Utils.py | 274 +++++++++++++----- 5 files changed, 331 insertions(+), 276 deletions(-) diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index 7aae376..654d7d1 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -501,7 +501,8 @@ class ReforceXY(BaseReinforcementLearningModel): ) return storage - def study_has_best_trial_params(self, study: Study | None) -> bool: + @staticmethod + def study_has_best_trial_params(study: Study | None) -> bool: if study is None: return False try: @@ -570,7 +571,7 @@ class ReforceXY(BaseReinforcementLearningModel): ) hyperopt_failed = True time_spent = time.time() - start - if self.study_has_best_trial_params(study) is False: + if ReforceXY.study_has_best_trial_params(study) is False: logger.error( f"Hyperopt {study_name} failed ({time_spent:.2f} secs): no study best trial params found" ) diff --git a/quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py b/quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py index 5350522..107521c 100644 --- a/quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py +++ b/quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py @@ -272,8 +272,8 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): ) smoothing_methods: dict = { "quantile": self.quantile_min_max_pred, - "mean": mean_min_max_pred, - "median": median_min_max_pred, + "mean": LightGBMRegressorQuickAdapterV3.mean_min_max_pred, + "median": LightGBMRegressorQuickAdapterV3.median_min_max_pred, } return smoothing_methods.get( prediction_thresholds_smoothing, smoothing_methods["mean"] @@ -306,7 +306,7 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): storage = self.optuna_storage(pair) pruner = optuna.pruners.HyperbandPruner() if self.__optuna_config.get("continuous", True): - self.optuna_study_delete(study_name, storage) + LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage) study = optuna.create_study( study_name=study_name, sampler=optuna.samplers.TPESampler( @@ -350,7 +350,7 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): ) return None, None time_spent = time.time() - start - if self.optuna_study_has_best_params(study) is False: + if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False: logger.error( f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" ) @@ -392,7 +392,7 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): storage = self.optuna_storage(pair) pruner = optuna.pruners.HyperbandPruner() if self.__optuna_config.get("continuous", True): - self.optuna_study_delete(study_name, storage) + LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage) study = optuna.create_study( study_name=study_name, sampler=optuna.samplers.TPESampler( @@ -438,7 +438,7 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): ) return None, None time_spent = time.time() - start - if self.optuna_study_has_best_params(study) is False: + if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False: logger.error( f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" ) @@ -470,16 +470,18 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): return json.load(read_file) return None + @staticmethod def optuna_study_delete( - self, study_name: str, storage: optuna.storages.BaseStorage + study_name: str, storage: optuna.storages.BaseStorage ) -> None: try: optuna.delete_study(study_name=study_name, storage=storage) except Exception: pass + @staticmethod def optuna_study_load( - self, study_name: str, storage: optuna.storages.BaseStorage + study_name: str, storage: optuna.storages.BaseStorage ) -> optuna.study.Study | None: try: study = optuna.load_study(study_name=study_name, storage=storage) @@ -487,7 +489,8 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): study = None return study - def optuna_study_has_best_params(self, study: optuna.study.Study | None) -> bool: + @staticmethod + def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool: if study is None: return False try: @@ -500,6 +503,44 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): except ValueError: return False + @staticmethod + def mean_min_max_pred( + pred_df: pd.DataFrame, + fit_live_predictions_candles: int, + label_period_candles: int, + ) -> tuple[pd.Series, pd.Series]: + pred_df_sorted = ( + pred_df.select_dtypes(exclude=["object"]) + .copy() + .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) + ) + + label_period_frequency: int = int( + fit_live_predictions_candles / (label_period_candles * 2) + ) + min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean() + max_pred = pred_df_sorted.iloc[:label_period_frequency].mean() + return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] + + @staticmethod + def median_min_max_pred( + pred_df: pd.DataFrame, + fit_live_predictions_candles: int, + label_period_candles: int, + ) -> tuple[pd.Series, pd.Series]: + pred_df_sorted = ( + pred_df.select_dtypes(exclude=["object"]) + .copy() + .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) + ) + + label_period_frequency: int = int( + fit_live_predictions_candles / (label_period_candles * 2) + ) + min_pred = pred_df_sorted.iloc[-label_period_frequency:].median() + max_pred = pred_df_sorted.iloc[:label_period_frequency].median() + return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] + def quantile_min_max_pred( self, pred_df: pd.DataFrame, @@ -521,40 +562,6 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] -def mean_min_max_pred( - pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int -) -> tuple[pd.Series, pd.Series]: - pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) - .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) - ) - - label_period_frequency: int = int( - fit_live_predictions_candles / (label_period_candles * 2) - ) - min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean() - max_pred = pred_df_sorted.iloc[:label_period_frequency].mean() - return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] - - -def median_min_max_pred( - pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int -) -> tuple[pd.Series, pd.Series]: - pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) - .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) - ) - - label_period_frequency: int = int( - fit_live_predictions_candles / (label_period_candles * 2) - ) - min_pred = pred_df_sorted.iloc[-label_period_frequency:].median() - max_pred = pred_df_sorted.iloc[:label_period_frequency].median() - return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] - - def period_objective( trial, X, diff --git a/quickadapter/user_data/freqaimodels/XGBoostRegressorQuickAdapterV3.py b/quickadapter/user_data/freqaimodels/XGBoostRegressorQuickAdapterV3.py index c96fcdd..4116bb0 100644 --- a/quickadapter/user_data/freqaimodels/XGBoostRegressorQuickAdapterV3.py +++ b/quickadapter/user_data/freqaimodels/XGBoostRegressorQuickAdapterV3.py @@ -275,8 +275,8 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): ) smoothing_methods: dict = { "quantile": self.quantile_min_max_pred, - "mean": mean_min_max_pred, - "median": median_min_max_pred, + "mean": XGBoostRegressorQuickAdapterV3.mean_min_max_pred, + "median": XGBoostRegressorQuickAdapterV3.median_min_max_pred, } return smoothing_methods.get( prediction_thresholds_smoothing, smoothing_methods["mean"] @@ -309,7 +309,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): storage = self.optuna_storage(pair) pruner = optuna.pruners.HyperbandPruner() if self.__optuna_config.get("continuous", True): - self.optuna_study_delete(study_name, storage) + XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage) study = optuna.create_study( study_name=study_name, sampler=optuna.samplers.TPESampler( @@ -353,7 +353,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): ) return None, None time_spent = time.time() - start - if self.optuna_study_has_best_params(study) is False: + if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False: logger.error( f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" ) @@ -395,7 +395,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): storage = self.optuna_storage(pair) pruner = optuna.pruners.HyperbandPruner() if self.__optuna_config.get("continuous", True): - self.optuna_study_delete(study_name, storage) + XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage) study = optuna.create_study( study_name=study_name, sampler=optuna.samplers.TPESampler( @@ -441,7 +441,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): ) return None, None time_spent = time.time() - start - if self.optuna_study_has_best_params(study) is False: + if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False: logger.error( f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" ) @@ -473,16 +473,18 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): return json.load(read_file) return None + @staticmethod def optuna_study_delete( - self, study_name: str, storage: optuna.storages.BaseStorage + study_name: str, storage: optuna.storages.BaseStorage ) -> None: try: optuna.delete_study(study_name=study_name, storage=storage) except Exception: pass + @staticmethod def optuna_study_load( - self, study_name: str, storage: optuna.storages.BaseStorage + study_name: str, storage: optuna.storages.BaseStorage ) -> optuna.study.Study | None: try: study = optuna.load_study(study_name=study_name, storage=storage) @@ -490,7 +492,8 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): study = None return study - def optuna_study_has_best_params(self, study: optuna.study.Study | None) -> bool: + @staticmethod + def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool: if study is None: return False try: @@ -503,6 +506,44 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): except ValueError: return False + @staticmethod + def mean_min_max_pred( + pred_df: pd.DataFrame, + fit_live_predictions_candles: int, + label_period_candles: int, + ) -> tuple[pd.Series, pd.Series]: + pred_df_sorted = ( + pred_df.select_dtypes(exclude=["object"]) + .copy() + .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) + ) + + label_period_frequency: int = int( + fit_live_predictions_candles / (label_period_candles * 2) + ) + min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean() + max_pred = pred_df_sorted.iloc[:label_period_frequency].mean() + return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] + + @staticmethod + def median_min_max_pred( + pred_df: pd.DataFrame, + fit_live_predictions_candles: int, + label_period_candles: int, + ) -> tuple[pd.Series, pd.Series]: + pred_df_sorted = ( + pred_df.select_dtypes(exclude=["object"]) + .copy() + .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) + ) + + label_period_frequency: int = int( + fit_live_predictions_candles / (label_period_candles * 2) + ) + min_pred = pred_df_sorted.iloc[-label_period_frequency:].median() + max_pred = pred_df_sorted.iloc[:label_period_frequency].median() + return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] + def quantile_min_max_pred( self, pred_df: pd.DataFrame, @@ -524,40 +565,6 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] -def mean_min_max_pred( - pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int -) -> tuple[pd.Series, pd.Series]: - pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) - .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) - ) - - label_period_frequency: int = int( - fit_live_predictions_candles / (label_period_candles * 2) - ) - min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean() - max_pred = pred_df_sorted.iloc[:label_period_frequency].mean() - return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] - - -def median_min_max_pred( - pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int -) -> tuple[pd.Series, pd.Series]: - pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) - .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) - ) - - label_period_frequency: int = int( - fit_live_predictions_candles / (label_period_candles * 2) - ) - min_pred = pred_df_sorted.iloc[-label_period_frequency:].median() - max_pred = pred_df_sorted.iloc[:label_period_frequency].median() - return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] - - def period_objective( trial, X, diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index b20a312..1910b18 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -13,12 +13,20 @@ from freqtrade.strategy.interface import IStrategy from freqtrade.strategy import stoploss_from_absolute from technical.pivots_points import pivots_points from freqtrade.persistence import Trade -from scipy.signal import argrelmin, argrelmax, convolve -from scipy.signal.windows import gaussian +from scipy.signal import argrelmin, argrelmax import numpy as np import pandas_ta as pta -from Utils import ewo, vwapb +from Utils import ( + ewo, + vwapb, + top_change_percent, + get_distance, + get_gaussian_window, + get_odd_window, + derive_gaussian_std_from_window, + zero_phase_gaussian, +) logger = logging.getLogger(__name__) @@ -56,7 +64,7 @@ class QuickAdapterV3(IStrategy): @property def trailing_stoploss_positive_offset(self) -> float: - return self.config.get("trailing_stoploss_positive_offset", 0.01) + return self.config.get("trailing_stoploss_positive_offset", 0.0075) @property def trailing_stoploss_only_offset_is_reached(self) -> bool: @@ -210,7 +218,11 @@ class QuickAdapterV3(IStrategy): dataframe["%-raw_volume"] = dataframe["volume"] dataframe["%-obv"] = ta.OBV(dataframe) dataframe["%-ewo"] = ewo( - dataframe=dataframe, mamode="ema", zero_lag=True, normalize=True + dataframe=dataframe, + mamode="ema", + pricemode="close", + zero_lag=True, + normalize=True, ) psar = ta.SAR(dataframe, acceleration=0.02, maximum=0.2) dataframe["%-diff_to_psar"] = dataframe["close"] - psar @@ -244,6 +256,18 @@ class QuickAdapterV3(IStrategy): (dataframe["close"] - dataframe["low"]) / (dataframe["high"] - dataframe["low"]) ).fillna(0.0) + # dataframe["jaw"], dataframe["teeth"], dataframe["lips"] = alligator( + # dataframe, zero_lag=True + # ) + # dataframe["%-dist_to_jaw"] = get_distance(dataframe["close"], dataframe["jaw"]) + # dataframe["%-dist_to_teeth"] = get_distance( + # dataframe["close"], dataframe["teeth"] + # ) + # dataframe["%-dist_to_lips"] = get_distance( + # dataframe["close"], dataframe["lips"] + # ) + # dataframe["%-spread_jaw_teeth"] = dataframe["jaw"] - dataframe["teeth"] + # dataframe["%-spread_teeth_lips"] = dataframe["teeth"] - dataframe["lips"] dataframe["zlema_50"] = pta.zlma(dataframe["close"], length=50, mamode="ema") dataframe["zlema_12"] = pta.zlma(dataframe["close"], length=12, mamode="ema") dataframe["zlema_26"] = pta.zlma(dataframe["close"], length=26, mamode="ema") @@ -435,7 +459,8 @@ class QuickAdapterV3(IStrategy): ).total_seconds() / 60.0 return trade_duration_minutes // timeframe_to_minutes(self.timeframe) - def is_trade_duration_valid(self, trade_duration_candles: int) -> bool: + @staticmethod + def is_trade_duration_valid(trade_duration_candles: int) -> bool: if isna(trade_duration_candles): return False if trade_duration_candles == 0: @@ -446,7 +471,7 @@ class QuickAdapterV3(IStrategy): self, df: DataFrame, trade: Trade, current_rate: float ) -> float | None: trade_duration_candles = self.get_trade_duration_candles(df, trade) - if self.is_trade_duration_valid(trade_duration_candles) is False: + if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False: return None current_natr = df["natr_ratio_labeling_window"].iloc[-1] if isna(current_natr): @@ -455,12 +480,12 @@ class QuickAdapterV3(IStrategy): current_rate * current_natr * self.trailing_stoploss_natr_ratio - * (1 / math.log10(1 + trade_duration_candles)) + * (1 / math.log10(1 + 0.25 * trade_duration_candles)) ) def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> float | None: trade_duration_candles = self.get_trade_duration_candles(df, trade) - if self.is_trade_duration_valid(trade_duration_candles) is False: + if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False: return None entry_natr = self.get_trade_entry_natr(df, trade) if isna(entry_natr): @@ -671,102 +696,3 @@ class QuickAdapterV3(IStrategy): with best_params_path.open("r", encoding="utf-8") as read_file: return json.load(read_file) return None - - -def top_change_percent(dataframe: DataFrame, period: int) -> Series: - """ - Percentage change of the current close relative to the top close price in the previous `period` bars. - - :param dataframe: OHLCV DataFrame - :param period: The previous period window size to look back (>=1) - :return: The top change percentage series - """ - if period < 1: - raise ValueError("period must be greater than or equal to 1") - - previous_close_top = ( - dataframe["close"].rolling(period, min_periods=period).max().shift(1) - ) - - return (dataframe["close"] - previous_close_top) / previous_close_top - - -def bottom_change_percent(dataframe: DataFrame, period: int) -> Series: - """ - Percentage change of the current close relative to the bottom close price in the previous `period` bars. - - :param dataframe: OHLCV DataFrame - :param period: The previous period window size to look back (>=1) - :return: The bottom change percentage series - """ - if period < 1: - raise ValueError("period must be greater than or equal to 1") - - previous_close_bottom = ( - dataframe["close"].rolling(period, min_periods=period).min().shift(1) - ) - - return (dataframe["close"] - previous_close_bottom) / previous_close_bottom - - -def price_retracement_percent(dataframe: DataFrame, period: int) -> Series: - """ - Calculate the percentage retracement of the current close within the high/low close price range - of the previous `period` bars. - - :param dataframe: OHLCV DataFrame - :param period: Window size for calculating historical closes high/low (>=1) - :return: Retracement percentage series - """ - if period < 1: - raise ValueError("period must be greater than or equal to 1") - - previous_close_low = ( - dataframe["close"].rolling(period, min_periods=period).min().shift(1) - ) - previous_close_high = ( - dataframe["close"].rolling(period, min_periods=period).max().shift(1) - ) - - return (dataframe["close"] - previous_close_low) / ( - previous_close_high - previous_close_low - ).fillna(0.0) - - -def zero_phase_gaussian(series: Series, window: int, std: float): - kernel = gaussian(window, std=std) - kernel /= kernel.sum() - - padding_length = window - 1 - padded_series = np.pad(series.values, (padding_length, padding_length), mode="edge") - - forward = convolve(padded_series, kernel, mode="valid") - backward = convolve(forward[::-1], kernel, mode="valid")[::-1] - - return Series(backward, index=series.index) - - -def get_gaussian_window(std: float, center: bool) -> int: - if std is None: - raise ValueError("Standard deviation cannot be None") - if std <= 0: - raise ValueError("Standard deviation must be greater than 0") - window = int(6 * std + 1) - if center and window % 2 == 0: - window += 1 - return max(3, window) - - -def derive_gaussian_std_from_window(window: int) -> float: - # Assuming window = 6 * std + 1 => std = (window - 1) / 6 - return (window - 1) / 6.0 if window > 1 else 0.5 - - -def get_odd_window(window: int) -> int: - if window < 1: - raise ValueError("Window size must be greater than 0") - return window if window % 2 == 1 else window + 1 - - -def get_distance(p1: Series | float, p2: Series | float) -> Series | float: - return abs(p1 - p2) diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 7f282df..79940e4 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -2,9 +2,110 @@ import numpy as np import pandas as pd import pandas_ta as pta import talib.abstract as ta +from scipy.signal import convolve +from scipy.signal.windows import gaussian from technical import qtpylib +def get_distance(p1: pd.Series | float, p2: pd.Series | float) -> pd.Series | float: + return abs(p1 - p2) + + +def get_gaussian_window(std: float, center: bool) -> int: + if std is None: + raise ValueError("Standard deviation cannot be None") + if std <= 0: + raise ValueError("Standard deviation must be greater than 0") + window = int(6 * std + 1) + if center and window % 2 == 0: + window += 1 + return max(3, window) + + +def get_odd_window(window: int) -> int: + if window < 1: + raise ValueError("Window size must be greater than 0") + return window if window % 2 == 1 else window + 1 + + +def derive_gaussian_std_from_window(window: int) -> float: + # Assuming window = 6 * std + 1 => std = (window - 1) / 6 + return (window - 1) / 6.0 if window > 1 else 0.5 + + +def zero_phase_gaussian(series: pd.Series, window: int, std: float): + kernel = gaussian(window, std=std) + kernel /= kernel.sum() + + padding_length = window - 1 + padded_series = np.pad(series.values, (padding_length, padding_length), mode="edge") + + forward = convolve(padded_series, kernel, mode="valid") + backward = convolve(forward[::-1], kernel, mode="valid")[::-1] + + return pd.Series(backward, index=series.index) + + +def top_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series: + """ + Percentage change of the current close relative to the top close price in the previous `period` bars. + + :param dataframe: OHLCV DataFrame + :param period: The previous period window size to look back (>=1) + :return: The top change percentage series + """ + if period < 1: + raise ValueError("period must be greater than or equal to 1") + + previous_close_top = ( + dataframe["close"].rolling(period, min_periods=period).max().shift(1) + ) + + return (dataframe["close"] - previous_close_top) / previous_close_top + + +def bottom_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series: + """ + Percentage change of the current close relative to the bottom close price in the previous `period` bars. + + :param dataframe: OHLCV DataFrame + :param period: The previous period window size to look back (>=1) + :return: The bottom change percentage series + """ + if period < 1: + raise ValueError("period must be greater than or equal to 1") + + previous_close_bottom = ( + dataframe["close"].rolling(period, min_periods=period).min().shift(1) + ) + + return (dataframe["close"] - previous_close_bottom) / previous_close_bottom + + +def price_retracement_percent(dataframe: pd.DataFrame, period: int) -> pd.Series: + """ + Calculate the percentage retracement of the current close within the high/low close price range + of the previous `period` bars. + + :param dataframe: OHLCV DataFrame + :param period: Window size for calculating historical closes high/low (>=1) + :return: Retracement percentage series + """ + if period < 1: + raise ValueError("period must be greater than or equal to 1") + + previous_close_low = ( + dataframe["close"].rolling(period, min_periods=period).min().shift(1) + ) + previous_close_high = ( + dataframe["close"].rolling(period, min_periods=period).max().shift(1) + ) + + return (dataframe["close"] - previous_close_low) / ( + previous_close_high - previous_close_low + ).fillna(0.0) + + # VWAP bands def vwapb(dataframe: pd.DataFrame, window=20, num_of_std=1) -> tuple: vwap = qtpylib.rolling_vwap(dataframe, window=window) @@ -26,86 +127,14 @@ def get_ma_fn(mamode: str, zero_lag=False) -> callable: "t3": ta.T3, } if zero_lag: - ma_fn = lambda df, timeperiod: pta.zlma( - df["close"], length=timeperiod, mamode=mamode + ma_fn = lambda series, timeperiod: pta.zlma( + series, length=timeperiod, mamode=mamode ) else: ma_fn = mamodes.get(mamode, mamodes["sma"]) return ma_fn -def ewo( - dataframe: pd.DataFrame, - ma1_length=5, - ma2_length=34, - mamode="sma", - zero_lag=False, - normalize=False, -) -> pd.Series: - ma_fn = get_ma_fn(mamode, zero_lag=zero_lag) - ma1 = ma_fn(dataframe, timeperiod=ma1_length) - ma2 = ma_fn(dataframe, timeperiod=ma2_length) - madiff = ma1 - ma2 - if normalize: - madiff = ( - madiff / dataframe["close"] - ) * 100 # Optional normalization with close price - return madiff - - -def smma( - df: pd.DataFrame, period: int, mamode="sma", zero_lag=False, offset=0 -) -> pd.Series: - """ - SMoothed Moving Average (SMMA). - """ - close = df["close"] - if len(close) < period: - return pd.Series(index=close.index, dtype=float) - - smma = close.copy() - smma[: period - 1] = np.nan - ma_fn = get_ma_fn(mamode, zero_lag=zero_lag) - smma.iloc[period - 1] = ma_fn(close[:period], timeperiod=period).iloc[-1] - - for i in range(period, len(close)): - smma.iat[i] = ((period - 1) * smma.iat[i - 1] + smma.iat[i]) / period - - if offset != 0: - smma = smma.shift(offset) - - return smma - - -def alligator( - df: pd.DataFrame, - jaw_period=13, - teeth_period=8, - lips_period=5, - jaw_shift=8, - teeth_shift=5, - lips_shift=3, - mamode="sma", - zero_lag=False, -) -> tuple[pd.Series, pd.Series, pd.Series]: - """ - Calculate Bill Williams' Alligator indicator lines. - """ - median_price = (df["high"] + df["low"]) / 2 - - jaw = smma(median_price, period=jaw_period, mamode=mamode, zero_lag=zero_lag).shift( - jaw_shift - ) - teeth = smma( - median_price, period=teeth_period, mamode=mamode, zero_lag=zero_lag - ).shift(teeth_shift) - lips = smma( - median_price, period=lips_period, mamode=mamode, zero_lag=zero_lag - ).shift(lips_shift) - - return jaw, teeth, lips - - def fractal_dimension( prices_array: np.ndarray, period: int, normalize: bool = False ) -> float: @@ -145,12 +174,12 @@ def fractal_dimension( return np.clip(D, 1.0, 2.0) -def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series: +def frama(series: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series: """ Calculate FRAMA with optional normalization. Args: - prices: Pandas Series of closing prices. + series: Pandas Series of prices. period: Lookback window (default=16). normalize: Enable range normalization (default=False). @@ -160,10 +189,10 @@ def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Se if period % 2 != 0: raise ValueError("FRAMA period must be even") - frama = np.full(len(prices), np.nan) + frama = np.full(len(series), np.nan) - for i in range(period - 1, len(prices)): - prices_array = prices.iloc[i - period + 1 : i + 1].to_numpy() + for i in range(period - 1, len(series)): + prices_array = series.iloc[i - period + 1 : i + 1].to_numpy() D = fractal_dimension(prices_array, period, normalize) alpha = np.exp(-4.6 * (D - 1)) @@ -172,4 +201,89 @@ def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Se else: frama[i] = alpha * prices_array[-1] + (1 - alpha) * frama[i - 1] - return pd.Series(frama, index=prices.index) + return pd.Series(frama, index=series.index) + + +def smma( + series: pd.Series, period: int, mamode="sma", zero_lag=False, offset=0 +) -> pd.Series: + """ + SMoothed Moving Average (SMMA). + + https://www.sierrachart.com/index.php?page=doc/StudiesReference.php&ID=173&Name=Moving_Average_-_Smoothed + """ + if len(series) < period: + return pd.Series(index=series.index, dtype=float) + + smma = series.copy() + smma[: period - 1] = np.nan + ma_fn = get_ma_fn(mamode, zero_lag=zero_lag) + smma.iloc[period - 1] = pd.Series( + ma_fn(series.iloc[:period], timeperiod=period) + ).iloc[-1] + + for i in range(period, len(series)): + smma.iat[i] = ((period - 1) * smma.iat[i - 1] + smma.iat[i]) / period + + if offset != 0: + smma = smma.shift(offset) + + return smma + + +def get_price_fn(pricemode: str) -> callable: + pricemodes = { + "typical": lambda df: (df["high"] + df["low"] + df["close"]) / 3, + "median": lambda df: (df["high"] + df["low"]) / 2, + "close": lambda df: df["close"], + } + return pricemodes.get(pricemode, pricemodes["close"]) + + +def ewo( + dataframe: pd.DataFrame, + ma1_length=5, + ma2_length=34, + pricemode="close", + mamode="sma", + zero_lag=False, + normalize=False, +) -> pd.Series: + price_series = get_price_fn(pricemode)(dataframe) + ma_fn = get_ma_fn(mamode, zero_lag=zero_lag) + ma1 = ma_fn(price_series, timeperiod=ma1_length) + ma2 = ma_fn(price_series, timeperiod=ma2_length) + madiff = ma1 - ma2 + if normalize: + madiff = (madiff / price_series) * 100 + return madiff + + +def alligator( + df: pd.DataFrame, + jaw_period=13, + teeth_period=8, + lips_period=5, + jaw_shift=8, + teeth_shift=5, + lips_shift=3, + pricemode="median", + mamode="sma", + zero_lag=False, +) -> tuple[pd.Series, pd.Series, pd.Series]: + """ + Calculate Bill Williams' Alligator indicator lines. + """ + price_series = get_price_fn(pricemode)(df) + + jaw = smma(price_series, period=jaw_period, mamode=mamode, zero_lag=zero_lag).shift( + jaw_shift + ) + teeth = smma( + price_series, period=teeth_period, mamode=mamode, zero_lag=zero_lag + ).shift(teeth_shift) + lips = smma( + price_series, period=lips_period, mamode=mamode, zero_lag=zero_lag + ).shift(lips_shift) + + return jaw, teeth, lips -- 2.43.0