)
return storage
- def study_has_best_trial_params(self, study: Study | None) -> bool:
+ @staticmethod
+ def study_has_best_trial_params(study: Study | None) -> bool:
if study is None:
return False
try:
)
hyperopt_failed = True
time_spent = time.time() - start
- if self.study_has_best_trial_params(study) is False:
+ if ReforceXY.study_has_best_trial_params(study) is False:
logger.error(
f"Hyperopt {study_name} failed ({time_spent:.2f} secs): no study best trial params found"
)
)
smoothing_methods: dict = {
"quantile": self.quantile_min_max_pred,
- "mean": mean_min_max_pred,
- "median": median_min_max_pred,
+ "mean": LightGBMRegressorQuickAdapterV3.mean_min_max_pred,
+ "median": LightGBMRegressorQuickAdapterV3.median_min_max_pred,
}
return smoothing_methods.get(
prediction_thresholds_smoothing, smoothing_methods["mean"]
storage = self.optuna_storage(pair)
pruner = optuna.pruners.HyperbandPruner()
if self.__optuna_config.get("continuous", True):
- self.optuna_study_delete(study_name, storage)
+ LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
study = optuna.create_study(
study_name=study_name,
sampler=optuna.samplers.TPESampler(
)
return None, None
time_spent = time.time() - start
- if self.optuna_study_has_best_params(study) is False:
+ if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
logger.error(
f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
)
storage = self.optuna_storage(pair)
pruner = optuna.pruners.HyperbandPruner()
if self.__optuna_config.get("continuous", True):
- self.optuna_study_delete(study_name, storage)
+ LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
study = optuna.create_study(
study_name=study_name,
sampler=optuna.samplers.TPESampler(
)
return None, None
time_spent = time.time() - start
- if self.optuna_study_has_best_params(study) is False:
+ if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
logger.error(
f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
)
return json.load(read_file)
return None
+ @staticmethod
def optuna_study_delete(
- self, study_name: str, storage: optuna.storages.BaseStorage
+ study_name: str, storage: optuna.storages.BaseStorage
) -> None:
try:
optuna.delete_study(study_name=study_name, storage=storage)
except Exception:
pass
+ @staticmethod
def optuna_study_load(
- self, study_name: str, storage: optuna.storages.BaseStorage
+ study_name: str, storage: optuna.storages.BaseStorage
) -> optuna.study.Study | None:
try:
study = optuna.load_study(study_name=study_name, storage=storage)
study = None
return study
- def optuna_study_has_best_params(self, study: optuna.study.Study | None) -> bool:
+ @staticmethod
+ def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool:
if study is None:
return False
try:
except ValueError:
return False
+ @staticmethod
+ def mean_min_max_pred(
+ pred_df: pd.DataFrame,
+ fit_live_predictions_candles: int,
+ label_period_candles: int,
+ ) -> tuple[pd.Series, pd.Series]:
+ pred_df_sorted = (
+ pred_df.select_dtypes(exclude=["object"])
+ .copy()
+ .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+ )
+
+ label_period_frequency: int = int(
+ fit_live_predictions_candles / (label_period_candles * 2)
+ )
+ min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
+ max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
+ return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
+
+ @staticmethod
+ def median_min_max_pred(
+ pred_df: pd.DataFrame,
+ fit_live_predictions_candles: int,
+ label_period_candles: int,
+ ) -> tuple[pd.Series, pd.Series]:
+ pred_df_sorted = (
+ pred_df.select_dtypes(exclude=["object"])
+ .copy()
+ .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+ )
+
+ label_period_frequency: int = int(
+ fit_live_predictions_candles / (label_period_candles * 2)
+ )
+ min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
+ max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
+ return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
+
def quantile_min_max_pred(
self,
pred_df: pd.DataFrame,
return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-def mean_min_max_pred(
- pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
-) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
- .copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
- )
- min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
- max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
- return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
-def median_min_max_pred(
- pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
-) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
- .copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
- )
- min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
- max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
- return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
def period_objective(
trial,
X,
)
smoothing_methods: dict = {
"quantile": self.quantile_min_max_pred,
- "mean": mean_min_max_pred,
- "median": median_min_max_pred,
+ "mean": XGBoostRegressorQuickAdapterV3.mean_min_max_pred,
+ "median": XGBoostRegressorQuickAdapterV3.median_min_max_pred,
}
return smoothing_methods.get(
prediction_thresholds_smoothing, smoothing_methods["mean"]
storage = self.optuna_storage(pair)
pruner = optuna.pruners.HyperbandPruner()
if self.__optuna_config.get("continuous", True):
- self.optuna_study_delete(study_name, storage)
+ XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
study = optuna.create_study(
study_name=study_name,
sampler=optuna.samplers.TPESampler(
)
return None, None
time_spent = time.time() - start
- if self.optuna_study_has_best_params(study) is False:
+ if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
logger.error(
f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
)
storage = self.optuna_storage(pair)
pruner = optuna.pruners.HyperbandPruner()
if self.__optuna_config.get("continuous", True):
- self.optuna_study_delete(study_name, storage)
+ XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
study = optuna.create_study(
study_name=study_name,
sampler=optuna.samplers.TPESampler(
)
return None, None
time_spent = time.time() - start
- if self.optuna_study_has_best_params(study) is False:
+ if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
logger.error(
f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
)
return json.load(read_file)
return None
+ @staticmethod
def optuna_study_delete(
- self, study_name: str, storage: optuna.storages.BaseStorage
+ study_name: str, storage: optuna.storages.BaseStorage
) -> None:
try:
optuna.delete_study(study_name=study_name, storage=storage)
except Exception:
pass
+ @staticmethod
def optuna_study_load(
- self, study_name: str, storage: optuna.storages.BaseStorage
+ study_name: str, storage: optuna.storages.BaseStorage
) -> optuna.study.Study | None:
try:
study = optuna.load_study(study_name=study_name, storage=storage)
study = None
return study
- def optuna_study_has_best_params(self, study: optuna.study.Study | None) -> bool:
+ @staticmethod
+ def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool:
if study is None:
return False
try:
except ValueError:
return False
+ @staticmethod
+ def mean_min_max_pred(
+ pred_df: pd.DataFrame,
+ fit_live_predictions_candles: int,
+ label_period_candles: int,
+ ) -> tuple[pd.Series, pd.Series]:
+ pred_df_sorted = (
+ pred_df.select_dtypes(exclude=["object"])
+ .copy()
+ .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+ )
+
+ label_period_frequency: int = int(
+ fit_live_predictions_candles / (label_period_candles * 2)
+ )
+ min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
+ max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
+ return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
+
+ @staticmethod
+ def median_min_max_pred(
+ pred_df: pd.DataFrame,
+ fit_live_predictions_candles: int,
+ label_period_candles: int,
+ ) -> tuple[pd.Series, pd.Series]:
+ pred_df_sorted = (
+ pred_df.select_dtypes(exclude=["object"])
+ .copy()
+ .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+ )
+
+ label_period_frequency: int = int(
+ fit_live_predictions_candles / (label_period_candles * 2)
+ )
+ min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
+ max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
+ return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
+
def quantile_min_max_pred(
self,
pred_df: pd.DataFrame,
return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-def mean_min_max_pred(
- pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
-) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
- .copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
- )
- min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
- max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
- return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
-def median_min_max_pred(
- pred_df: pd.DataFrame, fit_live_predictions_candles: int, label_period_candles: int
-) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
- .copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
- )
- min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
- max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
- return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
def period_objective(
trial,
X,
from freqtrade.strategy import stoploss_from_absolute
from technical.pivots_points import pivots_points
from freqtrade.persistence import Trade
-from scipy.signal import argrelmin, argrelmax, convolve
-from scipy.signal.windows import gaussian
+from scipy.signal import argrelmin, argrelmax
import numpy as np
import pandas_ta as pta
-from Utils import ewo, vwapb
+from Utils import (
+ ewo,
+ vwapb,
+ top_change_percent,
+ get_distance,
+ get_gaussian_window,
+ get_odd_window,
+ derive_gaussian_std_from_window,
+ zero_phase_gaussian,
+)
logger = logging.getLogger(__name__)
@property
def trailing_stoploss_positive_offset(self) -> float:
- return self.config.get("trailing_stoploss_positive_offset", 0.01)
+ return self.config.get("trailing_stoploss_positive_offset", 0.0075)
@property
def trailing_stoploss_only_offset_is_reached(self) -> bool:
dataframe["%-raw_volume"] = dataframe["volume"]
dataframe["%-obv"] = ta.OBV(dataframe)
dataframe["%-ewo"] = ewo(
- dataframe=dataframe, mamode="ema", zero_lag=True, normalize=True
+ dataframe=dataframe,
+ mamode="ema",
+ pricemode="close",
+ zero_lag=True,
+ normalize=True,
)
psar = ta.SAR(dataframe, acceleration=0.02, maximum=0.2)
dataframe["%-diff_to_psar"] = dataframe["close"] - psar
(dataframe["close"] - dataframe["low"])
/ (dataframe["high"] - dataframe["low"])
).fillna(0.0)
+ # dataframe["jaw"], dataframe["teeth"], dataframe["lips"] = alligator(
+ # dataframe, zero_lag=True
+ # )
+ # dataframe["%-dist_to_jaw"] = get_distance(dataframe["close"], dataframe["jaw"])
+ # dataframe["%-dist_to_teeth"] = get_distance(
+ # dataframe["close"], dataframe["teeth"]
+ # )
+ # dataframe["%-dist_to_lips"] = get_distance(
+ # dataframe["close"], dataframe["lips"]
+ # )
+ # dataframe["%-spread_jaw_teeth"] = dataframe["jaw"] - dataframe["teeth"]
+ # dataframe["%-spread_teeth_lips"] = dataframe["teeth"] - dataframe["lips"]
dataframe["zlema_50"] = pta.zlma(dataframe["close"], length=50, mamode="ema")
dataframe["zlema_12"] = pta.zlma(dataframe["close"], length=12, mamode="ema")
dataframe["zlema_26"] = pta.zlma(dataframe["close"], length=26, mamode="ema")
).total_seconds() / 60.0
return trade_duration_minutes // timeframe_to_minutes(self.timeframe)
- def is_trade_duration_valid(self, trade_duration_candles: int) -> bool:
+ @staticmethod
+ def is_trade_duration_valid(trade_duration_candles: int) -> bool:
if isna(trade_duration_candles):
return False
if trade_duration_candles == 0:
self, df: DataFrame, trade: Trade, current_rate: float
) -> float | None:
trade_duration_candles = self.get_trade_duration_candles(df, trade)
- if self.is_trade_duration_valid(trade_duration_candles) is False:
+ if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False:
return None
current_natr = df["natr_ratio_labeling_window"].iloc[-1]
if isna(current_natr):
current_rate
* current_natr
* self.trailing_stoploss_natr_ratio
- * (1 / math.log10(1 + trade_duration_candles))
+ * (1 / math.log10(1 + 0.25 * trade_duration_candles))
)
def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> float | None:
trade_duration_candles = self.get_trade_duration_candles(df, trade)
- if self.is_trade_duration_valid(trade_duration_candles) is False:
+ if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False:
return None
entry_natr = self.get_trade_entry_natr(df, trade)
if isna(entry_natr):
with best_params_path.open("r", encoding="utf-8") as read_file:
return json.load(read_file)
return None
-
-
-def top_change_percent(dataframe: DataFrame, period: int) -> Series:
- """
- Percentage change of the current close relative to the top close price in the previous `period` bars.
-
- :param dataframe: OHLCV DataFrame
- :param period: The previous period window size to look back (>=1)
- :return: The top change percentage series
- """
- if period < 1:
- raise ValueError("period must be greater than or equal to 1")
-
- previous_close_top = (
- dataframe["close"].rolling(period, min_periods=period).max().shift(1)
- )
-
- return (dataframe["close"] - previous_close_top) / previous_close_top
-
-
-def bottom_change_percent(dataframe: DataFrame, period: int) -> Series:
- """
- Percentage change of the current close relative to the bottom close price in the previous `period` bars.
-
- :param dataframe: OHLCV DataFrame
- :param period: The previous period window size to look back (>=1)
- :return: The bottom change percentage series
- """
- if period < 1:
- raise ValueError("period must be greater than or equal to 1")
-
- previous_close_bottom = (
- dataframe["close"].rolling(period, min_periods=period).min().shift(1)
- )
-
- return (dataframe["close"] - previous_close_bottom) / previous_close_bottom
-
-
-def price_retracement_percent(dataframe: DataFrame, period: int) -> Series:
- """
- Calculate the percentage retracement of the current close within the high/low close price range
- of the previous `period` bars.
-
- :param dataframe: OHLCV DataFrame
- :param period: Window size for calculating historical closes high/low (>=1)
- :return: Retracement percentage series
- """
- if period < 1:
- raise ValueError("period must be greater than or equal to 1")
-
- previous_close_low = (
- dataframe["close"].rolling(period, min_periods=period).min().shift(1)
- )
- previous_close_high = (
- dataframe["close"].rolling(period, min_periods=period).max().shift(1)
- )
-
- return (dataframe["close"] - previous_close_low) / (
- previous_close_high - previous_close_low
- ).fillna(0.0)
-
-
-def zero_phase_gaussian(series: Series, window: int, std: float):
- kernel = gaussian(window, std=std)
- kernel /= kernel.sum()
-
- padding_length = window - 1
- padded_series = np.pad(series.values, (padding_length, padding_length), mode="edge")
-
- forward = convolve(padded_series, kernel, mode="valid")
- backward = convolve(forward[::-1], kernel, mode="valid")[::-1]
-
- return Series(backward, index=series.index)
-
-
-def get_gaussian_window(std: float, center: bool) -> int:
- if std is None:
- raise ValueError("Standard deviation cannot be None")
- if std <= 0:
- raise ValueError("Standard deviation must be greater than 0")
- window = int(6 * std + 1)
- if center and window % 2 == 0:
- window += 1
- return max(3, window)
-
-
-def derive_gaussian_std_from_window(window: int) -> float:
- # Assuming window = 6 * std + 1 => std = (window - 1) / 6
- return (window - 1) / 6.0 if window > 1 else 0.5
-
-
-def get_odd_window(window: int) -> int:
- if window < 1:
- raise ValueError("Window size must be greater than 0")
- return window if window % 2 == 1 else window + 1
-
-
-def get_distance(p1: Series | float, p2: Series | float) -> Series | float:
- return abs(p1 - p2)
import pandas as pd
import pandas_ta as pta
import talib.abstract as ta
+from scipy.signal import convolve
+from scipy.signal.windows import gaussian
from technical import qtpylib
+def get_distance(p1: pd.Series | float, p2: pd.Series | float) -> pd.Series | float:
+ return abs(p1 - p2)
+
+
+def get_gaussian_window(std: float, center: bool) -> int:
+ if std is None:
+ raise ValueError("Standard deviation cannot be None")
+ if std <= 0:
+ raise ValueError("Standard deviation must be greater than 0")
+ window = int(6 * std + 1)
+ if center and window % 2 == 0:
+ window += 1
+ return max(3, window)
+
+
+def get_odd_window(window: int) -> int:
+ if window < 1:
+ raise ValueError("Window size must be greater than 0")
+ return window if window % 2 == 1 else window + 1
+
+
+def derive_gaussian_std_from_window(window: int) -> float:
+ # Assuming window = 6 * std + 1 => std = (window - 1) / 6
+ return (window - 1) / 6.0 if window > 1 else 0.5
+
+
+def zero_phase_gaussian(series: pd.Series, window: int, std: float):
+ kernel = gaussian(window, std=std)
+ kernel /= kernel.sum()
+
+ padding_length = window - 1
+ padded_series = np.pad(series.values, (padding_length, padding_length), mode="edge")
+
+ forward = convolve(padded_series, kernel, mode="valid")
+ backward = convolve(forward[::-1], kernel, mode="valid")[::-1]
+
+ return pd.Series(backward, index=series.index)
+
+
+def top_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+ """
+ Percentage change of the current close relative to the top close price in the previous `period` bars.
+
+ :param dataframe: OHLCV DataFrame
+ :param period: The previous period window size to look back (>=1)
+ :return: The top change percentage series
+ """
+ if period < 1:
+ raise ValueError("period must be greater than or equal to 1")
+
+ previous_close_top = (
+ dataframe["close"].rolling(period, min_periods=period).max().shift(1)
+ )
+
+ return (dataframe["close"] - previous_close_top) / previous_close_top
+
+
+def bottom_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+ """
+ Percentage change of the current close relative to the bottom close price in the previous `period` bars.
+
+ :param dataframe: OHLCV DataFrame
+ :param period: The previous period window size to look back (>=1)
+ :return: The bottom change percentage series
+ """
+ if period < 1:
+ raise ValueError("period must be greater than or equal to 1")
+
+ previous_close_bottom = (
+ dataframe["close"].rolling(period, min_periods=period).min().shift(1)
+ )
+
+ return (dataframe["close"] - previous_close_bottom) / previous_close_bottom
+
+
+def price_retracement_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+ """
+ Calculate the percentage retracement of the current close within the high/low close price range
+ of the previous `period` bars.
+
+ :param dataframe: OHLCV DataFrame
+ :param period: Window size for calculating historical closes high/low (>=1)
+ :return: Retracement percentage series
+ """
+ if period < 1:
+ raise ValueError("period must be greater than or equal to 1")
+
+ previous_close_low = (
+ dataframe["close"].rolling(period, min_periods=period).min().shift(1)
+ )
+ previous_close_high = (
+ dataframe["close"].rolling(period, min_periods=period).max().shift(1)
+ )
+
+ return (dataframe["close"] - previous_close_low) / (
+ previous_close_high - previous_close_low
+ ).fillna(0.0)
+
+
# VWAP bands
def vwapb(dataframe: pd.DataFrame, window=20, num_of_std=1) -> tuple:
vwap = qtpylib.rolling_vwap(dataframe, window=window)
"t3": ta.T3,
}
if zero_lag:
- ma_fn = lambda df, timeperiod: pta.zlma(
- df["close"], length=timeperiod, mamode=mamode
+ ma_fn = lambda series, timeperiod: pta.zlma(
+ series, length=timeperiod, mamode=mamode
)
else:
ma_fn = mamodes.get(mamode, mamodes["sma"])
return ma_fn
-def ewo(
- dataframe: pd.DataFrame,
- ma1_length=5,
- ma2_length=34,
- mamode="sma",
- zero_lag=False,
- normalize=False,
-) -> pd.Series:
- ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
- ma1 = ma_fn(dataframe, timeperiod=ma1_length)
- ma2 = ma_fn(dataframe, timeperiod=ma2_length)
- madiff = ma1 - ma2
- if normalize:
- madiff = (
- madiff / dataframe["close"]
- ) * 100 # Optional normalization with close price
- return madiff
-
-
-def smma(
- df: pd.DataFrame, period: int, mamode="sma", zero_lag=False, offset=0
-) -> pd.Series:
- """
- SMoothed Moving Average (SMMA).
- """
- close = df["close"]
- if len(close) < period:
- return pd.Series(index=close.index, dtype=float)
-
- smma = close.copy()
- smma[: period - 1] = np.nan
- ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
- smma.iloc[period - 1] = ma_fn(close[:period], timeperiod=period).iloc[-1]
-
- for i in range(period, len(close)):
- smma.iat[i] = ((period - 1) * smma.iat[i - 1] + smma.iat[i]) / period
-
- if offset != 0:
- smma = smma.shift(offset)
-
- return smma
-
-
-def alligator(
- df: pd.DataFrame,
- jaw_period=13,
- teeth_period=8,
- lips_period=5,
- jaw_shift=8,
- teeth_shift=5,
- lips_shift=3,
- mamode="sma",
- zero_lag=False,
-) -> tuple[pd.Series, pd.Series, pd.Series]:
- """
- Calculate Bill Williams' Alligator indicator lines.
- """
- median_price = (df["high"] + df["low"]) / 2
-
- jaw = smma(median_price, period=jaw_period, mamode=mamode, zero_lag=zero_lag).shift(
- jaw_shift
- )
- teeth = smma(
- median_price, period=teeth_period, mamode=mamode, zero_lag=zero_lag
- ).shift(teeth_shift)
- lips = smma(
- median_price, period=lips_period, mamode=mamode, zero_lag=zero_lag
- ).shift(lips_shift)
-
- return jaw, teeth, lips
-
-
def fractal_dimension(
prices_array: np.ndarray, period: int, normalize: bool = False
) -> float:
return np.clip(D, 1.0, 2.0)
-def frama(prices: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series:
+def frama(series: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series:
"""
Calculate FRAMA with optional normalization.
Args:
- prices: Pandas Series of closing prices.
+ series: Pandas Series of prices.
period: Lookback window (default=16).
normalize: Enable range normalization (default=False).
if period % 2 != 0:
raise ValueError("FRAMA period must be even")
- frama = np.full(len(prices), np.nan)
+ frama = np.full(len(series), np.nan)
- for i in range(period - 1, len(prices)):
- prices_array = prices.iloc[i - period + 1 : i + 1].to_numpy()
+ for i in range(period - 1, len(series)):
+ prices_array = series.iloc[i - period + 1 : i + 1].to_numpy()
D = fractal_dimension(prices_array, period, normalize)
alpha = np.exp(-4.6 * (D - 1))
else:
frama[i] = alpha * prices_array[-1] + (1 - alpha) * frama[i - 1]
- return pd.Series(frama, index=prices.index)
+ return pd.Series(frama, index=series.index)
+
+
+def smma(
+ series: pd.Series, period: int, mamode="sma", zero_lag=False, offset=0
+) -> pd.Series:
+ """
+ SMoothed Moving Average (SMMA).
+
+ https://www.sierrachart.com/index.php?page=doc/StudiesReference.php&ID=173&Name=Moving_Average_-_Smoothed
+ """
+ if len(series) < period:
+ return pd.Series(index=series.index, dtype=float)
+
+ smma = series.copy()
+ smma[: period - 1] = np.nan
+ ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
+ smma.iloc[period - 1] = pd.Series(
+ ma_fn(series.iloc[:period], timeperiod=period)
+ ).iloc[-1]
+
+ for i in range(period, len(series)):
+ smma.iat[i] = ((period - 1) * smma.iat[i - 1] + smma.iat[i]) / period
+
+ if offset != 0:
+ smma = smma.shift(offset)
+
+ return smma
+
+
+def get_price_fn(pricemode: str) -> callable:
+ pricemodes = {
+ "typical": lambda df: (df["high"] + df["low"] + df["close"]) / 3,
+ "median": lambda df: (df["high"] + df["low"]) / 2,
+ "close": lambda df: df["close"],
+ }
+ return pricemodes.get(pricemode, pricemodes["close"])
+
+
+def ewo(
+ dataframe: pd.DataFrame,
+ ma1_length=5,
+ ma2_length=34,
+ pricemode="close",
+ mamode="sma",
+ zero_lag=False,
+ normalize=False,
+) -> pd.Series:
+ price_series = get_price_fn(pricemode)(dataframe)
+ ma_fn = get_ma_fn(mamode, zero_lag=zero_lag)
+ ma1 = ma_fn(price_series, timeperiod=ma1_length)
+ ma2 = ma_fn(price_series, timeperiod=ma2_length)
+ madiff = ma1 - ma2
+ if normalize:
+ madiff = (madiff / price_series) * 100
+ return madiff
+
+
+def alligator(
+ df: pd.DataFrame,
+ jaw_period=13,
+ teeth_period=8,
+ lips_period=5,
+ jaw_shift=8,
+ teeth_shift=5,
+ lips_shift=3,
+ pricemode="median",
+ mamode="sma",
+ zero_lag=False,
+) -> tuple[pd.Series, pd.Series, pd.Series]:
+ """
+ Calculate Bill Williams' Alligator indicator lines.
+ """
+ price_series = get_price_fn(pricemode)(df)
+
+ jaw = smma(price_series, period=jaw_period, mamode=mamode, zero_lag=zero_lag).shift(
+ jaw_shift
+ )
+ teeth = smma(
+ price_series, period=teeth_period, mamode=mamode, zero_lag=zero_lag
+ ).shift(teeth_shift)
+ lips = smma(
+ price_series, period=lips_period, mamode=mamode, zero_lag=zero_lag
+ ).shift(lips_shift)
+
+ return jaw, teeth, lips