]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor: factor out helper functions
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 24 Mar 2025 09:35:50 +0000 (10:35 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 24 Mar 2025 09:35:50 +0000 (10:35 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/user_data/freqaimodels/ReforceXY.py
quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py
quickadapter/user_data/freqaimodels/XGBoostRegressorQuickAdapterV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index 7aae376fa1eca675268ff9aead1f19f648cb978f..654d7d1e0a9968e0aa7f5b2c4de3122e810f0070 100644 (file)
@@ -501,7 +501,8 @@ class ReforceXY(BaseReinforcementLearningModel):
             )
         return storage
 
-    def study_has_best_trial_params(self, study: Study | None) -> bool:
+    @staticmethod
+    def study_has_best_trial_params(study: Study | None) -> bool:
         if study is None:
             return False
         try:
@@ -570,7 +571,7 @@ class ReforceXY(BaseReinforcementLearningModel):
             )
             hyperopt_failed = True
         time_spent = time.time() - start
-        if self.study_has_best_trial_params(study) is False:
+        if ReforceXY.study_has_best_trial_params(study) is False:
             logger.error(
                 f"Hyperopt {study_name} failed ({time_spent:.2f} secs): no study best trial params found"
             )
index 535052235e30034f33b0c3c2089acf7d0e0580c1..107521c22e3f29e318a9e447068f6fe0a93e0225 100644 (file)
@@ -272,8 +272,8 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
         )
         smoothing_methods: dict = {
             "quantile": self.quantile_min_max_pred,
-            "mean": mean_min_max_pred,
-            "median": median_min_max_pred,
+            "mean": LightGBMRegressorQuickAdapterV3.mean_min_max_pred,
+            "median": LightGBMRegressorQuickAdapterV3.median_min_max_pred,
         }
         return smoothing_methods.get(
             prediction_thresholds_smoothing, smoothing_methods["mean"]
@@ -306,7 +306,7 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
         storage = self.optuna_storage(pair)
         pruner = optuna.pruners.HyperbandPruner()
         if self.__optuna_config.get("continuous", True):
-            self.optuna_study_delete(study_name, storage)
+            LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
         study = optuna.create_study(
             study_name=study_name,
             sampler=optuna.samplers.TPESampler(
@@ -350,7 +350,7 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
             )
             return None, None
         time_spent = time.time() - start
-        if self.optuna_study_has_best_params(study) is False:
+        if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
             logger.error(
                 f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
             )
@@ -392,7 +392,7 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
         storage = self.optuna_storage(pair)
         pruner = optuna.pruners.HyperbandPruner()
         if self.__optuna_config.get("continuous", True):
-            self.optuna_study_delete(study_name, storage)
+            LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
         study = optuna.create_study(
             study_name=study_name,
             sampler=optuna.samplers.TPESampler(
@@ -438,7 +438,7 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
             )
             return None, None
         time_spent = time.time() - start
-        if self.optuna_study_has_best_params(study) is False:
+        if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
             logger.error(
                 f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
             )
@@ -470,16 +470,18 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
                 return json.load(read_file)
         return None
 
+    @staticmethod
     def optuna_study_delete(
-        self, study_name: str, storage: optuna.storages.BaseStorage
+        study_name: str, storage: optuna.storages.BaseStorage
     ) -> None:
         try:
             optuna.delete_study(study_name=study_name, storage=storage)
         except Exception:
             pass
 
+    @staticmethod
     def optuna_study_load(
-        self, study_name: str, storage: optuna.storages.BaseStorage
+        study_name: str, storage: optuna.storages.BaseStorage
     ) -> optuna.study.Study | None:
         try:
             study = optuna.load_study(study_name=study_name, storage=storage)
@@ -487,7 +489,8 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
             study = None
         return study
 
-    def optuna_study_has_best_params(self, study: optuna.study.Study | None) -> bool:
+    @staticmethod
+    def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool:
         if study is None:
             return False
         try:
@@ -500,6 +503,44 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
         except ValueError:
             return False
 
+    @staticmethod
+    def mean_min_max_pred(
+        pred_df: pd.DataFrame,
+        fit_live_predictions_candles: int,
+        label_period_candles: int,
+    ) -> tuple[pd.Series, pd.Series]:
+        pred_df_sorted = (
+            pred_df.select_dtypes(exclude=["object"])
+            .copy()
+            .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+        )
+
+        label_period_frequency: int = int(
+            fit_live_predictions_candles / (label_period_candles * 2)
+        )
+        min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
+        max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
+        return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
+
+    @staticmethod
+    def median_min_max_pred(
+        pred_df: pd.DataFrame,
+        fit_live_predictions_candles: int,
+        label_period_candles: int,
+    ) -> tuple[pd.Series, pd.Series]:
+        pred_df_sorted = (
+            pred_df.select_dtypes(exclude=["object"])
+            .copy()
+            .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+        )
+
+        label_period_frequency: int = int(
+            fit_live_predictions_candles / (label_period_candles * 2)
+        )
+        min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
+        max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
+        return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
+
     def quantile_min_max_pred(
         self,
         pred_df: pd.DataFrame,
@@ -521,40 +562,6 @@ class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
         return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
 
 
-def mean_min_max_pred(
-    pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
-) -> tuple[pd.Series, pd.Series]:
-    pred_df_sorted = (
-        pred_df.select_dtypes(exclude=["object"])
-        .copy()
-        .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
-    )
-
-    label_period_frequency: int = int(
-        fit_live_predictions_candles / (label_period_candles * 2)
-    )
-    min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
-    max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
-    return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
-def median_min_max_pred(
-    pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
-) -> tuple[pd.Series, pd.Series]:
-    pred_df_sorted = (
-        pred_df.select_dtypes(exclude=["object"])
-        .copy()
-        .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
-    )
-
-    label_period_frequency: int = int(
-        fit_live_predictions_candles / (label_period_candles * 2)
-    )
-    min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
-    max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
-    return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
 def period_objective(
     trial,
     X,
index c96fcddcd6678399bbf24c5ed6bb36f3371f9574..4116bb052f19ca94debc96e406ae56c8d78d1233 100644 (file)
@@ -275,8 +275,8 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
         )
         smoothing_methods: dict = {
             "quantile": self.quantile_min_max_pred,
-            "mean": mean_min_max_pred,
-            "median": median_min_max_pred,
+            "mean": XGBoostRegressorQuickAdapterV3.mean_min_max_pred,
+            "median": XGBoostRegressorQuickAdapterV3.median_min_max_pred,
         }
         return smoothing_methods.get(
             prediction_thresholds_smoothing, smoothing_methods["mean"]
@@ -309,7 +309,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
         storage = self.optuna_storage(pair)
         pruner = optuna.pruners.HyperbandPruner()
         if self.__optuna_config.get("continuous", True):
-            self.optuna_study_delete(study_name, storage)
+            XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
         study = optuna.create_study(
             study_name=study_name,
             sampler=optuna.samplers.TPESampler(
@@ -353,7 +353,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
             )
             return None, None
         time_spent = time.time() - start
-        if self.optuna_study_has_best_params(study) is False:
+        if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
             logger.error(
                 f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
             )
@@ -395,7 +395,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
         storage = self.optuna_storage(pair)
         pruner = optuna.pruners.HyperbandPruner()
         if self.__optuna_config.get("continuous", True):
-            self.optuna_study_delete(study_name, storage)
+            XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
         study = optuna.create_study(
             study_name=study_name,
             sampler=optuna.samplers.TPESampler(
@@ -441,7 +441,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
             )
             return None, None
         time_spent = time.time() - start
-        if self.optuna_study_has_best_params(study) is False:
+        if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
             logger.error(
                 f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
             )
@@ -473,16 +473,18 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
                 return json.load(read_file)
         return None
 
+    @staticmethod
     def optuna_study_delete(
-        self, study_name: str, storage: optuna.storages.BaseStorage
+        study_name: str, storage: optuna.storages.BaseStorage
     ) -> None:
         try:
             optuna.delete_study(study_name=study_name, storage=storage)
         except Exception:
             pass
 
+    @staticmethod
     def optuna_study_load(
-        self, study_name: str, storage: optuna.storages.BaseStorage
+        study_name: str, storage: optuna.storages.BaseStorage
     ) -> optuna.study.Study | None:
         try:
             study = optuna.load_study(study_name=study_name, storage=storage)
@@ -490,7 +492,8 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
             study = None
         return study
 
-    def optuna_study_has_best_params(self, study: optuna.study.Study | None) -> bool:
+    @staticmethod
+    def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool:
         if study is None:
             return False
         try:
@@ -503,6 +506,44 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
         except ValueError:
             return False
 
+    @staticmethod
+    def mean_min_max_pred(
+        pred_df: pd.DataFrame,
+        fit_live_predictions_candles: int,
+        label_period_candles: int,
+    ) -> tuple[pd.Series, pd.Series]:
+        pred_df_sorted = (
+            pred_df.select_dtypes(exclude=["object"])
+            .copy()
+            .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+        )
+
+        label_period_frequency: int = int(
+            fit_live_predictions_candles / (label_period_candles * 2)
+        )
+        min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
+        max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
+        return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
+
+    @staticmethod
+    def median_min_max_pred(
+        pred_df: pd.DataFrame,
+        fit_live_predictions_candles: int,
+        label_period_candles: int,
+    ) -> tuple[pd.Series, pd.Series]:
+        pred_df_sorted = (
+            pred_df.select_dtypes(exclude=["object"])
+            .copy()
+            .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+        )
+
+        label_period_frequency: int = int(
+            fit_live_predictions_candles / (label_period_candles * 2)
+        )
+        min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
+        max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
+        return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
+
     def quantile_min_max_pred(
         self,
         pred_df: pd.DataFrame,
@@ -524,40 +565,6 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
         return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
 
 
-def mean_min_max_pred(
-    pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
-) -> tuple[pd.Series, pd.Series]:
-    pred_df_sorted = (
-        pred_df.select_dtypes(exclude=["object"])
-        .copy()
-        .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
-    )
-
-    label_period_frequency: int = int(
-        fit_live_predictions_candles / (label_period_candles * 2)
-    )
-    min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
-    max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
-    return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
-def median_min_max_pred(
-    pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
-) -> tuple[pd.Series, pd.Series]:
-    pred_df_sorted = (
-        pred_df.select_dtypes(exclude=["object"])
-        .copy()
-        .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
-    )
-
-    label_period_frequency: int = int(
-        fit_live_predictions_candles / (label_period_candles * 2)
-    )
-    min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
-    max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
-    return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
 def period_objective(
     trial,
     X,
index b20a3129830a272ba882cfa4d0ac0ab30eb023f0..1910b18b8980526cbd57568d6074c05f27ad0ed0 100644 (file)
@@ -13,12 +13,20 @@ from freqtrade.strategy.interface import IStrategy
 from freqtrade.strategy import stoploss_from_absolute
 from technical.pivots_points import pivots_points
 from freqtrade.persistence import Trade
-from scipy.signal import argrelmin, argrelmax, convolve
-from scipy.signal.windows import gaussian
+from scipy.signal import argrelmin, argrelmax
 import numpy as np
 import pandas_ta as pta
 
-from Utils import ewo, vwapb
+from Utils import (
+    ewo,
+    vwapb,
+    top_change_percent,
+    get_distance,
+    get_gaussian_window,
+    get_odd_window,
+    derive_gaussian_std_from_window,
+    zero_phase_gaussian,
+)
 
 logger = logging.getLogger(__name__)
 
@@ -56,7 +64,7 @@ class QuickAdapterV3(IStrategy):
 
     @property
     def trailing_stoploss_positive_offset(self) -> float:
-        return self.config.get("trailing_stoploss_positive_offset", 0.01)
+        return self.config.get("trailing_stoploss_positive_offset", 0.0075)
 
     @property
     def trailing_stoploss_only_offset_is_reached(self) -> bool:
@@ -210,7 +218,11 @@ class QuickAdapterV3(IStrategy):
         dataframe["%-raw_volume"] = dataframe["volume"]
         dataframe["%-obv"] = ta.OBV(dataframe)
         dataframe["%-ewo"] = ewo(
-            dataframe=dataframe, mamode="ema", zero_lag=True, normalize=True
+            dataframe=dataframe,
+            mamode="ema",
+            pricemode="close",
+            zero_lag=True,
+            normalize=True,
         )
         psar = ta.SAR(dataframe, acceleration=0.02, maximum=0.2)
         dataframe["%-diff_to_psar"] = dataframe["close"] - psar
@@ -244,6 +256,18 @@ class QuickAdapterV3(IStrategy):
             (dataframe["close"] - dataframe["low"])
             / (dataframe["high"] - dataframe["low"])
         ).fillna(0.0)
+        # dataframe["jaw"], dataframe["teeth"], dataframe["lips"] = alligator(
+        #     dataframe, zero_lag=True
+        # )
+        # dataframe["%-dist_to_jaw"] = get_distance(dataframe["close"], dataframe["jaw"])
+        # dataframe["%-dist_to_teeth"] = get_distance(
+        #     dataframe["close"], dataframe["teeth"]
+        # )
+        # dataframe["%-dist_to_lips"] = get_distance(
+        #     dataframe["close"], dataframe["lips"]
+        # )
+        # dataframe["%-spread_jaw_teeth"] = dataframe["jaw"] - dataframe["teeth"]
+        # dataframe["%-spread_teeth_lips"] = dataframe["teeth"] - dataframe["lips"]
         dataframe["zlema_50"] = pta.zlma(dataframe["close"], length=50, mamode="ema")
         dataframe["zlema_12"] = pta.zlma(dataframe["close"], length=12, mamode="ema")
         dataframe["zlema_26"] = pta.zlma(dataframe["close"], length=26, mamode="ema")
@@ -435,7 +459,8 @@ class QuickAdapterV3(IStrategy):
         ).total_seconds() / 60.0
         return trade_duration_minutes // timeframe_to_minutes(self.timeframe)
 
-    def is_trade_duration_valid(self, trade_duration_candles: int) -> bool:
+    @staticmethod
+    def is_trade_duration_valid(trade_duration_candles: int) -> bool:
         if isna(trade_duration_candles):
             return False
         if trade_duration_candles == 0:
@@ -446,7 +471,7 @@ class QuickAdapterV3(IStrategy):
         self, df: DataFrame, trade: Trade, current_rate: float
     ) -> float | None:
         trade_duration_candles = self.get_trade_duration_candles(df, trade)
-        if self.is_trade_duration_valid(trade_duration_candles) is False:
+        if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False:
             return None
         current_natr = df["natr_ratio_labeling_window"].iloc[-1]
         if isna(current_natr):
@@ -455,12 +480,12 @@ class QuickAdapterV3(IStrategy):
             current_rate
             * current_natr
             * self.trailing_stoploss_natr_ratio
-            * (1 / math.log10(1 + trade_duration_candles))
+            * (1 / math.log10(1 + 0.25 * trade_duration_candles))
         )
 
     def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> float | None:
         trade_duration_candles = self.get_trade_duration_candles(df, trade)
-        if self.is_trade_duration_valid(trade_duration_candles) is False:
+        if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False:
             return None
         entry_natr = self.get_trade_entry_natr(df, trade)
         if isna(entry_natr):
@@ -671,102 +696,3 @@ class QuickAdapterV3(IStrategy):
             with best_params_path.open("r", encoding="utf-8") as read_file:
                 return json.load(read_file)
         return None
-
-
-def top_change_percent(dataframe: DataFrame, period: int) -> Series:
-    """
-    Percentage change of the current close relative to the top close price in the previous `period` bars.
-
-    :param dataframe: OHLCV DataFrame
-    :param period: The previous period window size to look back (>=1)
-    :return: The top change percentage series
-    """
-    if period < 1:
-        raise ValueError("period must be greater than or equal to 1")
-
-    previous_close_top = (
-        dataframe["close"].rolling(period, min_periods=period).max().shift(1)
-    )
-
-    return (dataframe["close"] - previous_close_top) / previous_close_top
-
-
-def bottom_change_percent(dataframe: DataFrame, period: int) -> Series:
-    """
-    Percentage change of the current close relative to the bottom close price in the previous `period` bars.
-
-    :param dataframe: OHLCV DataFrame
-    :param period: The previous period window size to look back (>=1)
-    :return: The bottom change percentage series
-    """
-    if period < 1:
-        raise ValueError("period must be greater than or equal to 1")
-
-    previous_close_bottom = (
-        dataframe["close"].rolling(period, min_periods=period).min().shift(1)
-    )
-
-    return (dataframe["close"] - previous_close_bottom) / previous_close_bottom
-
-
-def price_retracement_percent(dataframe: DataFrame, period: int) -> Series:
-    """
-    Calculate the percentage retracement of the current close within the high/low close price range
-    of the previous `period` bars.
-
-    :param dataframe: OHLCV DataFrame
-    :param period: Window size for calculating historical closes high/low (>=1)
-    :return: Retracement percentage series
-    """
-    if period < 1:
-        raise ValueError("period must be greater than or equal to 1")
-
-    previous_close_low = (
-        dataframe["close"].rolling(period, min_periods=period).min().shift(1)
-    )
-    previous_close_high = (
-        dataframe["close"].rolling(period, min_periods=period).max().shift(1)
-    )
-
-    return (dataframe["close"] - previous_close_low) / (
-        previous_close_high - previous_close_low
-    ).fillna(0.0)
-
-
-def zero_phase_gaussian(series: Series, window: int, std: float):
-    kernel = gaussian(window, std=std)
-    kernel /= kernel.sum()
-
-    padding_length = window - 1
-    padded_series = np.pad(series.values, (padding_length, padding_length), mode="edge")
-
-    forward = convolve(padded_series, kernel, mode="valid")
-    backward = convolve(forward[::-1], kernel, mode="valid")[::-1]
-
-    return Series(backward, index=series.index)
-
-
-def get_gaussian_window(std: float, center: bool) -> int:
-    if std is None:
-        raise ValueError("Standard deviation cannot be None")
-    if std <= 0:
-        raise ValueError("Standard deviation must be greater than 0")
-    window = int(6 * std + 1)
-    if center and window % 2 == 0:
-        window += 1
-    return max(3, window)
-
-
-def derive_gaussian_std_from_window(window: int) -> float:
-    # Assuming window = 6 * std + 1 => std = (window - 1) / 6
-    return (window - 1) / 6.0 if window > 1 else 0.5
-
-
-def get_odd_window(window: int) -> int:
-    if window < 1:
-        raise ValueError("Window size must be greater than 0")
-    return window if window % 2 == 1 else window + 1
-
-
-def get_distance(p1: Series | float, p2: Series | float) -> Series | float:
-    return abs(p1 - p2)
index 7f282dff82e1c54da29f9dd7a818041804c42bcb..79940e4dbb40f541e3775efaa9fbb5cc7bfd21da 100644 (file)
@@ -2,9 +2,110 @@ import numpy as np
 import pandas as pd
 import pandas_ta as pta
 import talib.abstract as ta
+from scipy.signal import convolve
+from scipy.signal.windows import gaussian
 from technical import qtpylib
 
 
+def get_distance(p1: pd.Series | float, p2: pd.Series | float) -> pd.Series | float:
+    return abs(p1 - p2)
+
+
+def get_gaussian_window(std: float, center: bool) -> int:
+    if std is None:
+        raise ValueError("Standard deviation cannot be None")
+    if std <= 0:
+        raise ValueError("Standard deviation must be greater than 0")
+    window = int(6 * std + 1)
+    if center and window % 2 == 0:
+        window += 1
+    return max(3, window)
+
+
+def get_odd_window(window: int) -> int:
+    if window < 1:
+        raise ValueError("Window size must be greater than 0")
+    return window if window % 2 == 1 else window + 1
+
+
+def derive_gaussian_std_from_window(window: int) -> float:
+    # Assuming window = 6 * std + 1 => std = (window - 1) / 6
+    return (window - 1) / 6.0 if window > 1 else 0.5
+
+
+def zero_phase_gaussian(series: pd.Series, window: int, std: float):
+    kernel = gaussian(window, std=std)
+    kernel /= kernel.sum()
+
+    padding_length = window - 1
+    padded_series = np.pad(series.values, (padding_length, padding_length), mode="edge")
+
+    forward = convolve(padded_series, kernel, mode="valid")
+    backward = convolve(forward[::-1], kernel, mode="valid")[::-1]
+
+    return pd.Series(backward, index=series.index)
+
+
+def top_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+    """
+    Percentage change of the current close relative to the top close price in the previous `period` bars.
+
+    :param dataframe: OHLCV DataFrame
+    :param period: The previous period window size to look back (>=1)
+    :return: The top change percentage series
+    """
+    if period < 1:
+        raise ValueError("period must be greater than or equal to 1")
+
+    previous_close_top = (
+        dataframe["close"].rolling(period, min_periods=period).max().shift(1)
+    )
+
+    return (dataframe["close"] - previous_close_top) / previous_close_top
+
+
+def bottom_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+    """
+    Percentage change of the current close relative to the bottom close price in the previous `period` bars.
+
+    :param dataframe: OHLCV DataFrame
+    :param period: The previous period window size to look back (>=1)
+    :return: The bottom change percentage series
+    """
+    if period < 1:
+        raise ValueError("period must be greater than or equal to 1")
+
+    previous_close_bottom = (
+        dataframe["close"].rolling(period, min_periods=period).min().shift(1)
+    )
+
+    return (dataframe["close"] - previous_close_bottom) / previous_close_bottom
+
+
+def price_retracement_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+    """
+    Calculate the percentage retracement of the current close within the high/low close price range
+    of the previous `period` bars.
+
+    :param dataframe: OHLCV DataFrame
+    :param period: Window size for calculating historical closes high/low (>=1)
+    :return: Retracement percentage series
+    """
+    if period < 1:
+        raise ValueError("period must be greater than or equal to 1")
+
+    previous_close_low = (
+        dataframe["close"].rolling(period, min_periods=period).min().shift(1)
+    )
+    previous_close_high = (
+        dataframe["close"].rolling(period, min_periods=period).max().shift(1)
+    )
+
+    return (dataframe["close"] - previous_close_low) / (
+        previous_close_high - previous_close_low
+    ).fillna(0.0)
+
+
 # VWAP bands
 def vwapb(dataframe: pd.DataFrame, window=20, num_of_std=1) -> tuple:
     vwap = qtpylib.rolling_vwap(dataframe, window=window)
@@ -26,86 +127,14 @@ def get_ma_fn(mamode: str, zero_lag=False) -> callable:
         "t3": ta.T3,
     }
     if zero_lag:
-        ma_fn = lambda df, timeperiod: pta.zlma(
-            df["close"], length=timeperiod, mamode=mamode
+        ma_fn = lambda series, timeperiod: pta.zlma(
+            series, length=timeperiod, mamode=mamode
         )
     else:
         ma_fn = mamodes.get(mamode, mamodes["sma"])
     return ma_fn
 
 
-def ewo(
-    dataframe: pd.DataFrame,
-    ma1_length=5,
-    ma2_length=34,
-    mamode="sma",
-    zero_lag=False,
-    normalize=False,
-) -> pd.Series:
-    ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
-    ma1 = ma_fn(dataframe, timeperiod=ma1_length)
-    ma2 = ma_fn(dataframe, timeperiod=ma2_length)
-    madiff = ma1 - ma2
-    if normalize:
-        madiff = (
-            madiff / dataframe["close"]
-        ) * 100  # Optional normalization with close price
-    return madiff
-
-
-def smma(
-    df: pd.DataFrame, period: int, mamode="sma", zero_lag=False, offset=0
-) -> pd.Series:
-    """
-    SMoothed Moving Average (SMMA).
-    """
-    close = df["close"]
-    if len(close) < period:
-        return pd.Series(index=close.index, dtype=float)
-
-    smma = close.copy()
-    smma[: period - 1] = np.nan
-    ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
-    smma.iloc[period - 1] = ma_fn(close[:period], timeperiod=period).iloc[-1]
-
-    for i in range(period, len(close)):
-        smma.iat[i] = ((period - 1) * smma.iat[i - 1] + smma.iat[i]) / period
-
-    if offset != 0:
-        smma = smma.shift(offset)
-
-    return smma
-
-
-def alligator(
-    df: pd.DataFrame,
-    jaw_period=13,
-    teeth_period=8,
-    lips_period=5,
-    jaw_shift=8,
-    teeth_shift=5,
-    lips_shift=3,
-    mamode="sma",
-    zero_lag=False,
-) -> tuple[pd.Series, pd.Series, pd.Series]:
-    """
-    Calculate Bill Williams' Alligator indicator lines.
-    """
-    median_price = (df["high"] + df["low"]) / 2
-
-    jaw = smma(median_price, period=jaw_period, mamode=mamode, zero_lag=zero_lag).shift(
-        jaw_shift
-    )
-    teeth = smma(
-        median_price, period=teeth_period, mamode=mamode, zero_lag=zero_lag
-    ).shift(teeth_shift)
-    lips = smma(
-        median_price, period=lips_period, mamode=mamode, zero_lag=zero_lag
-    ).shift(lips_shift)
-
-    return jaw, teeth, lips
-
-
 def fractal_dimension(
     prices_array: np.ndarray, period: int, normalize: bool = False
 ) -> float:
@@ -145,12 +174,12 @@ def fractal_dimension(
     return np.clip(D, 1.0, 2.0)
 
 
-def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series:
+def frama(series: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series:
     """
     Calculate FRAMA with optional normalization.
 
     Args:
-        prices: Pandas Series of closing prices.
+        series: Pandas Series of prices.
         period: Lookback window (default=16).
         normalize: Enable range normalization (default=False).
 
@@ -160,10 +189,10 @@ def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Se
     if period % 2 != 0:
         raise ValueError("FRAMA period must be even")
 
-    frama = np.full(len(prices), np.nan)
+    frama = np.full(len(series), np.nan)
 
-    for i in range(period - 1, len(prices)):
-        prices_array = prices.iloc[i - period + 1 : i + 1].to_numpy()
+    for i in range(period - 1, len(series)):
+        prices_array = series.iloc[i - period + 1 : i + 1].to_numpy()
         D = fractal_dimension(prices_array, period, normalize)
         alpha = np.exp(-4.6 * (D - 1))
 
@@ -172,4 +201,89 @@ def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Se
         else:
             frama[i] = alpha * prices_array[-1] + (1 - alpha) * frama[i - 1]
 
-    return pd.Series(frama, index=prices.index)
+    return pd.Series(frama, index=series.index)
+
+
+def smma(
+    series: pd.Series, period: int, mamode="sma", zero_lag=False, offset=0
+) -> pd.Series:
+    """
+    SMoothed Moving Average (SMMA).
+
+    https://www.sierrachart.com/index.php?page=doc/StudiesReference.php&ID=173&Name=Moving_Average_-_Smoothed
+    """
+    if len(series) < period:
+        return pd.Series(index=series.index, dtype=float)
+
+    smma = series.copy()
+    smma[: period - 1] = np.nan
+    ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
+    smma.iloc[period - 1] = pd.Series(
+        ma_fn(series.iloc[:period], timeperiod=period)
+    ).iloc[-1]
+
+    for i in range(period, len(series)):
+        smma.iat[i] = ((period - 1) * smma.iat[i - 1] + smma.iat[i]) / period
+
+    if offset != 0:
+        smma = smma.shift(offset)
+
+    return smma
+
+
+def get_price_fn(pricemode: str) -> callable:
+    pricemodes = {
+        "typical": lambda df: (df["high"] + df["low"] + df["close"]) / 3,
+        "median": lambda df: (df["high"] + df["low"]) / 2,
+        "close": lambda df: df["close"],
+    }
+    return pricemodes.get(pricemode, pricemodes["close"])
+
+
+def ewo(
+    dataframe: pd.DataFrame,
+    ma1_length=5,
+    ma2_length=34,
+    pricemode="close",
+    mamode="sma",
+    zero_lag=False,
+    normalize=False,
+) -> pd.Series:
+    price_series = get_price_fn(pricemode)(dataframe)
+    ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
+    ma1 = ma_fn(price_series, timeperiod=ma1_length)
+    ma2 = ma_fn(price_series, timeperiod=ma2_length)
+    madiff = ma1 - ma2
+    if normalize:
+        madiff = (madiff / price_series) * 100
+    return madiff
+
+
+def alligator(
+    df: pd.DataFrame,
+    jaw_period=13,
+    teeth_period=8,
+    lips_period=5,
+    jaw_shift=8,
+    teeth_shift=5,
+    lips_shift=3,
+    pricemode="median",
+    mamode="sma",
+    zero_lag=False,
+) -> tuple[pd.Series, pd.Series, pd.Series]:
+    """
+    Calculate Bill Williams' Alligator indicator lines.
+    """
+    price_series = get_price_fn(pricemode)(df)
+
+    jaw = smma(price_series, period=jaw_period, mamode=mamode, zero_lag=zero_lag).shift(
+        jaw_shift
+    )
+    teeth = smma(
+        price_series, period=teeth_period, mamode=mamode, zero_lag=zero_lag
+    ).shift(teeth_shift)
+    lips = smma(
+        price_series, period=lips_period, mamode=mamode, zero_lag=zero_lag
+    ).shift(lips_shift)
+
+    return jaw, teeth, lips