"""
self.close_envs()
- train_df = data_dictionary["train_features"]
- test_df = data_dictionary["test_features"]
+ train_df = data_dictionary.get("train_features")
+ test_df = data_dictionary.get("test_features")
env_dict = self.pack_env_dict(dk.pair)
seed = self.model_training_parameters.get("seed", 42)
if not model_params.get("policy_kwargs"):
model_params["policy_kwargs"] = {}
- net_arch = model_params["policy_kwargs"].get("net_arch", [128, 128])
+ net_arch = model_params.get("policy_kwargs", {}).get("net_arch", [128, 128])
if "PPO" in self.model_type:
model_params["policy_kwargs"]["net_arch"] = {
"pi": net_arch,
model_params["policy_kwargs"]["net_arch"] = net_arch
model_params["policy_kwargs"]["activation_fn"] = get_activation_fn(
- model_params["policy_kwargs"].get("activation_fn", "relu")
+ model_params.get("policy_kwargs", {}).get("activation_fn", "relu")
)
model_params["policy_kwargs"]["optimizer_class"] = get_optimizer_class(
- model_params["policy_kwargs"].get("optimizer_class", "adam")
+ model_params.get("policy_kwargs", {}).get("optimizer_class", "adam")
)
return model_params
:return:
model Any = trained model to be used for inference in dry/live/backtesting
"""
- train_df = data_dictionary["train_features"]
+ train_df = data_dictionary.get("train_features")
train_timesteps = len(train_df)
- test_timesteps = len(data_dictionary["test_features"])
+ test_df = data_dictionary.get("test_features")
+ test_timesteps = len(test_df)
train_cycles = max(1, int(self.rl_config.get("train_cycles", 25)))
total_timesteps = train_timesteps * train_cycles * self.n_envs
- train_days = steps_to_days(train_timesteps, self.config["timeframe"])
- total_days = steps_to_days(total_timesteps, self.config["timeframe"])
+ train_days = steps_to_days(train_timesteps, self.config.get("timeframe"))
+ total_days = steps_to_days(total_timesteps, self.config.get("timeframe"))
logger.info("Action masking: %s", self.is_maskable)
logger.info(
logger.info(
"Test: %s steps (%s days)",
test_timesteps,
- steps_to_days(test_timesteps, self.config["timeframe"]),
+ steps_to_days(test_timesteps, self.config.get("timeframe")),
)
logger.info("Hyperopt: %s", self.hyperopt)
logger.info(
"Continual training activated - starting training from previously trained agent."
)
- model = self.dd.model_dictionary[dk.pair]
+ model = self.dd.model_dictionary.get(dk.pair)
model.set_env(self.train_env)
callbacks = self.get_callbacks(
if self.progressbar_callback:
self.progressbar_callback.on_training_end()
self.close_envs()
- model.env.close()
+ if hasattr(model, "env") and model.env is not None:
+ model.env.close()
time_spent = time.time() - start
self.dd.update_metric_tracker("fit_time", time_spent, dk.pair)
if self.progressbar_callback:
self.progressbar_callback.on_training_end()
self.close_envs()
- model.env.close()
+ if hasattr(model, "env") and model.env is not None:
+ model.env.close()
if nan_encountered:
return np.nan
super().__init__(**kwargs)
self.force_actions: bool = self.rl_config.get("force_actions", False)
self._force_action: Optional[ForceActions] = None
- self.take_profit: float = self.config["minimal_roi"]["0"]
- self.stop_loss: float = self.config["stoploss"]
+ self.take_profit: float = self.config.get("minimal_roi", {}).get("0", 0.03)
+ self.stop_loss: float = self.config.get("stoploss", -0.02)
self.timeout: int = self.rl_config.get("max_trade_duration_candles", 128)
- self._last_closed_position: Positions = None
+ self._last_closed_position: Optional[Positions] = None
self._last_closed_trade_tick: int = 0
if self.force_actions:
logger.info(
self.take_profit,
self.stop_loss,
self.timeout,
- steps_to_days(self.timeout, self.config["timeframe"]),
+ steps_to_days(self.timeout, self.config.get("timeframe")),
self.observation_space,
)
"""
observation, history = super().reset(seed, **kwargs)
self._force_action: Optional[ForceActions] = None
- self._last_closed_position: Positions = None
+ self._last_closed_position: Optional[Positions] = None
self._last_closed_trade_tick: int = 0
return observation, history
# name="%-rsi",
# period=8,
# pair=self.pair,
- # timeframe=self.config["timeframe"],
+ # timeframe=self.config.get("timeframe"),
# raw=True,
# )
return ForceActions.Take_profit
if pnl <= self.stop_loss:
return ForceActions.Stop_loss
+ return None
def _get_new_position(self, action: int) -> Positions:
return {
return self._current_tick - self._last_closed_trade_tick
def get_most_recent_max_pnl(self) -> float:
+ pnl_history = self.history.get("pnl")
return (
- np.max(self.history.get("pnl")) if self.history.get("pnl") else -np.inf
+ np.max(pnl_history) if pnl_history and len(pnl_history) > 0 else -np.inf
)
def get_most_recent_return(self) -> float:
if self._current_tick <= 0:
return 0.0
if self._position == Positions.Long:
- current_price = self.prices.iloc[self._current_tick].open
- previous_price = self.prices.iloc[self._current_tick - 1].open
+ current_price = self.prices.iloc[self._current_tick].get("open")
+ previous_price = self.prices.iloc[self._current_tick - 1].get("open")
if (
self._position_history[self._current_tick - 1] == Positions.Short
or self._position_history[self._current_tick - 1]
previous_price = self.add_entry_fee(previous_price)
return np.log(current_price) - np.log(previous_price)
if self._position == Positions.Short:
- current_price = self.prices.iloc[self._current_tick].open
- previous_price = self.prices.iloc[self._current_tick - 1].open
+ current_price = self.prices.iloc[self._current_tick].get("open")
+ previous_price = self.prices.iloc[self._current_tick - 1].get("open")
if (
self._position_history[self._current_tick - 1] == Positions.Long
or self._position_history[self._current_tick - 1]
return 0.0
def previous_price(self) -> float:
- return self.prices.iloc[self._current_tick - 1].open
+ return self.prices.iloc[self._current_tick - 1].get("open")
def get_env_history(self) -> DataFrame:
"""
Get environment data from the first to the last trade
"""
- # Check if history or trade_history is empty
if not self.history or not self.trade_history:
logger.warning("History or trade history is empty.")
- return DataFrame() # Return an empty DataFrame
+ return DataFrame()
_history_df = DataFrame.from_dict(self.history)
_trade_history_df = DataFrame.from_dict(self.trade_history)
- # Check if 'tick' column exists in both DataFrames
if (
"tick" not in _history_df.columns
or "tick" not in _trade_history_df.columns
logger.warning(
"'tick' column is missing from history or trade history."
)
- return DataFrame() # Return an empty DataFrame
+ return DataFrame()
- _rollout_history = _history_df.merge(
- _trade_history_df, on="tick", how="left"
+ _rollout_history = merge(
+ _history_df, _trade_history_df, on="tick", how="left"
).fillna(method="ffill")
_price_history = (
self.prices.iloc[_rollout_history.tick].copy().reset_index()
sharex=True,
)
- # Return empty fig if no trades
if len(self.trade_history) == 0:
return fig
history = self.get_env_history()
+ if len(history) == 0:
+ return fig
+
+ history_price = history.get("price")
+ if history_price is None or len(history_price) == 0:
+ return fig
+ history_type = history.get("type")
+ if history_type is None or len(history_type) == 0:
+ return fig
+ history_open = history["open"]
+ if history_open is None or len(history_open) == 0:
+ return fig
+
+ enter_long_prices = history.loc[history_type == "long_enter", "price"]
+ enter_short_prices = history.loc[history_type == "short_enter", "price"]
+ exit_long_prices = history.loc[history_type == "long_exit", "price"]
+ exit_short_prices = history.loc[history_type == "short_exit", "price"]
+ take_profit_prices = history.loc[history_type == "take_profit"]
+ stop_loss_prices = history.loc[history_type == "stop_loss", "price"]
+ timeout_prices = history.loc[history_type == "timeout", "price"]
- enter_long_prices = history.loc[history["type"] == "long_enter"]["price"]
- enter_short_prices = history.loc[history["type"] == "short_enter"]["price"]
- exit_long_prices = history.loc[history["type"] == "long_exit"]["price"]
- exit_short_prices = history.loc[history["type"] == "short_exit"]["price"]
- take_profit_prices = history.loc[history["type"] == "take_profit"]["price"]
- stop_loss_prices = history.loc[history["type"] == "stop_loss"]["price"]
- timeout_prices = history.loc[history["type"] == "timeout"]["price"]
- axs[0].plot(history["open"], linewidth=1, color="orchid")
+ axs[0].plot(history_open, linewidth=1, color="orchid")
plot_markers(axs[0], enter_long_prices, "^", "forestgreen", 5, -0.1)
plot_markers(axs[0], enter_short_prices, "v", "firebrick", 5, 0.1)
plot_markers(axs[0], exit_short_prices, ".", "thistle", 4, -0.1)
plot_markers(axs[0], take_profit_prices, "*", "lime", 8, 0.1)
plot_markers(axs[0], stop_loss_prices, "x", "red", 8, -0.1)
- plot_markers(axs[0], timeout_prices, "1", "yellow", 8, 0)
+ plot_markers(axs[0], timeout_prices, "1", "yellow", 8, 0.0)
axs[1].set_ylabel("pnl")
- axs[1].plot(history["pnl"], linewidth=1, color="gray")
+ axs[1].plot(history.get("pnl"), linewidth=1, color="gray")
axs[1].axhline(y=0, label="0", alpha=0.33, color="gray")
axs[1].axhline(y=self.take_profit, label="tp", alpha=0.33, color="green")
axs[1].axhline(y=self.stop_loss, label="sl", alpha=0.33, color="red")
axs[2].set_ylabel("reward")
- axs[2].plot(history["reward"], linewidth=1, color="gray")
+ axs[2].plot(history.get("reward"), linewidth=1, color="gray")
axs[2].axhline(y=0, label="0", alpha=0.33)
axs[3].set_ylabel("total_profit")
- axs[3].plot(history["total_profit"], linewidth=1, color="gray")
+ axs[3].plot(history.get("total_profit"), linewidth=1, color="gray")
axs[3].axhline(y=1, label="0", alpha=0.33)
axs[4].set_ylabel("total_reward")
- axs[4].plot(history["total_reward"], linewidth=1, color="gray")
+ axs[4].plot(history.get("total_reward"), linewidth=1, color="gray")
axs[4].axhline(y=0, label="0", alpha=0.33)
axs[4].set_xlabel("tick")
def close(self) -> None:
plt.close()
gc.collect()
- th.cuda.empty_cache()
+ if th.cuda.is_available():
+ th.cuda.empty_cache()
class InfoMetricsCallback(TensorboardCallback):
def _on_training_start(self) -> None:
_lr = self.model.learning_rate
_lr = _lr if isinstance(_lr, float) else "lr_schedule"
- hparam_dict: Dict = {
+ hparam_dict: Dict[str, Any] = {
"algorithm": self.model.__class__.__name__,
"learning_rate": _lr,
"gamma": self.model.gamma,
if "QRDQN" in self.model.__class__.__name__:
hparam_dict.update({"n_quantiles": self.model.n_quantiles})
metric_dict = {
- "info/total_reward": 0,
- "info/total_profit": 1,
+ "info/total_reward": 0.0,
+ "info/total_profit": 1.0,
"info/trade_count": 0,
"info/trade_duration": 0,
}
)
def _on_step(self) -> bool:
- local_info = self.locals["infos"][0]
+ local_info = self.locals.get("infos", [{}])[0]
if self.training_env is None:
return True
tensorboard_metrics = self.training_env.get_attr("tensorboard_metrics")[0]
for metric in local_info:
if metric not in ["episode", "terminal_observation", "TimeLimit.truncated"]:
- self.logger.record(f"info/{metric}", local_info[metric])
+ self.logger.record(f"info/{metric}", local_info.get(metric))
for category in tensorboard_metrics:
- for metric in tensorboard_metrics[category]:
+ for metric in tensorboard_metrics.get(category, {}):
self.logger.record(
- f"{category}/{metric}", tensorboard_metrics[category][metric]
+ f"{category}/{metric}",
+ tensorboard_metrics.get(category, {}).get(metric),
)
return True
if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
super()._on_step()
self.eval_idx += 1
- self.trial.report(self.last_mean_reward, self.eval_idx)
+ if hasattr(self.trial, "report"):
+ self.trial.report(self.last_mean_reward, self.eval_idx)
- # Prune trial if need
- if self.trial.should_prune():
+ # Prune trial if needed
+ if hasattr(self.trial, "should_prune") and self.trial.should_prune():
self.is_pruned = True
return False
return func
-def hours_to_seconds(hours):
+def hours_to_seconds(hours: float) -> float:
"""
Converts hours to seconds
"""
def feature_engineering_expand_basic(
self, dataframe: DataFrame, metadata: dict, **kwargs
):
- dataframe["%-close_pct_change"] = dataframe["close"].pct_change()
- dataframe["%-raw_volume"] = dataframe["volume"]
+ dataframe["%-close_pct_change"] = dataframe.get("close").pct_change()
+ dataframe["%-raw_volume"] = dataframe.get("volume")
return dataframe
def feature_engineering_standard(
self, dataframe: DataFrame, metadata: dict, **kwargs
):
- dataframe["%-day_of_week"] = (dataframe["date"].dt.dayofweek + 1) / 7
- dataframe["%-hour_of_day"] = (dataframe["date"].dt.hour + 1) / 25
+ dataframe["%-day_of_week"] = (dataframe.get("date").dt.dayofweek + 1) / 7
+ dataframe["%-hour_of_day"] = (dataframe.get("date").dt.hour + 1) / 25
- dataframe["%-raw_close"] = dataframe["close"]
- dataframe["%-raw_open"] = dataframe["open"]
- dataframe["%-raw_high"] = dataframe["high"]
- dataframe["%-raw_low"] = dataframe["low"]
+ dataframe["%-raw_close"] = dataframe.get("close")
+ dataframe["%-raw_open"] = dataframe.get("open")
+ dataframe["%-raw_high"] = dataframe.get("high")
+ dataframe["%-raw_low"] = dataframe.get("low")
return dataframe
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
- enter_long_conditions = [df["do_predict"] == 1, df[ACTION_COLUMN] == 1]
+ enter_long_conditions = [df.get("do_predict") == 1, df.get(ACTION_COLUMN) == 1]
df.loc[
reduce(lambda x, y: x & y, enter_long_conditions),
["enter_long", "enter_tag"],
] = (1, "long")
- enter_short_conditions = [df["do_predict"] == 1, df[ACTION_COLUMN] == 3]
+ enter_short_conditions = [df.get("do_predict") == 1, df.get(ACTION_COLUMN) == 3]
df.loc[
reduce(lambda x, y: x & y, enter_short_conditions),
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
- exit_long_conditions = [df["do_predict"] == 1, df[ACTION_COLUMN] == 2]
+ exit_long_conditions = [df.get("do_predict") == 1, df.get(ACTION_COLUMN) == 2]
df.loc[reduce(lambda x, y: x & y, exit_long_conditions), "exit_long"] = 1
- exit_short_conditions = [df["do_predict"] == 1, df[ACTION_COLUMN] == 4]
+ exit_short_conditions = [df.get("do_predict") == 1, df.get(ACTION_COLUMN) == 4]
df.loc[reduce(lambda x, y: x & y, exit_short_conditions), "exit_short"] = 1
return df
dk.data["labels_mean"], dk.data["labels_std"] = {}, {}
for label in dk.label_list + dk.unique_class_list:
- if pred_df_full[label].dtype == object:
+ pred_df_full_label = pred_df_full.get(label)
+ if pred_df_full_label is None or pred_df_full_label.dtype == object:
continue
if not warmed_up:
f = [0, 0]
else:
- f = sp.stats.norm.fit(pred_df_full[label])
+ f = sp.stats.norm.fit(pred_df_full_label)
dk.data["labels_mean"][label], dk.data["labels_std"][label] = f[0], f[1]
# fit the DI_threshold
f = [0, 0, 0]
cutoff = 2
else:
- di_values = pd.to_numeric(pred_df_full["DI_values"], errors="coerce")
+ di_values = pd.to_numeric(pred_df_full.get("DI_values"), errors="coerce")
di_values = di_values.dropna()
f = sp.stats.weibull_min.fit(di_values)
cutoff = sp.stats.weibull_min.ppf(
self.freqai_info.get("outlier_threshold", 0.999), *f
)
- dk.data["DI_value_mean"] = pred_df_full["DI_values"].mean()
- dk.data["DI_value_std"] = pred_df_full["DI_values"].std()
+ di_values_series = pred_df_full.get("DI_values")
+ dk.data["DI_value_mean"] = di_values_series.mean()
+ dk.data["DI_value_std"] = di_values_series.std()
dk.data["extra_returns_per_train"]["DI_value_param1"] = f[0]
dk.data["extra_returns_per_train"]["DI_value_param2"] = f[1]
dk.data["extra_returns_per_train"]["DI_value_param3"] = f[2]
temperature = float(
self.freqai_info.get("prediction_thresholds_temperature", 250.0)
)
- extrema = pred_df[EXTREMA_COLUMN].iloc[
+ extrema = pred_df.get(EXTREMA_COLUMN).iloc[
-(
max(2, int(fit_live_predictions_candles / label_period_candles))
* label_period_candles
indices = df.index.tolist()
thresholds = get_natr_values(natr_period) * natr_ratio
- closes = df["close"].values
- highs = df["high"].values
- lows = df["low"].values
+ closes = df.get("close").values
+ highs = df.get("high").values
+ lows = df.get("low").values
state: TrendDirection = TrendDirection.NEUTRAL
depth = -1
natr_ratio=label_natr_ratio,
)
- if len(pivots_values) < 2:
- return -np.inf, -np.inf
-
scaled_natr_label_period_candles = (
ta.NATR(df, timeperiod=label_period_candles).fillna(method="bfill") / 100.0
) * label_natr_ratio
from Utils import (
alligator,
bottom_change_percent,
- calculate_zero_lag,
- get_ma_fn,
+ get_zl_ma_fn,
zigzag,
ewo,
non_zero_diff,
get_odd_window,
derive_gaussian_std_from_window,
zero_phase_gaussian,
+ zlema,
)
logger = logging.getLogger(__name__)
INTERFACE_VERSION = 3
def version(self) -> str:
- return "3.3.76"
+ return "3.3.77"
timeframe = "5m"
dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period)
dataframe["%-adx-period"] = ta.ADX(dataframe, timeperiod=period)
dataframe["%-cci-period"] = ta.CCI(dataframe, timeperiod=period)
- dataframe["%-er-period"] = pta.er(dataframe["close"], length=period)
+ dataframe["%-er-period"] = pta.er(dataframe.get("close"), length=period)
dataframe["%-rocr-period"] = ta.ROCR(dataframe, timeperiod=period)
dataframe["%-trix-period"] = ta.TRIX(dataframe, timeperiod=period)
dataframe["%-cmf-period"] = pta.cmf(
- dataframe["high"],
- dataframe["low"],
- dataframe["close"],
- dataframe["volume"],
+ dataframe.get("high"),
+ dataframe.get("low"),
+ dataframe.get("close"),
+ dataframe.get("volume"),
length=period,
)
dataframe["%-tcp-period"] = top_change_percent(dataframe, period=period)
dataframe["%-bcp-period"] = bottom_change_percent(dataframe, period=period)
dataframe["%-prp-period"] = price_retracement_percent(dataframe, period=period)
- dataframe["%-cti-period"] = pta.cti(dataframe["close"], length=period)
+ dataframe["%-cti-period"] = pta.cti(dataframe.get("close"), length=period)
dataframe["%-chop-period"] = pta.chop(
- dataframe["high"],
- dataframe["low"],
- dataframe["close"],
+ dataframe.get("high"),
+ dataframe.get("low"),
+ dataframe.get("close"),
length=period,
)
dataframe["%-linearreg_angle-period"] = ta.LINEARREG_ANGLE(
def feature_engineering_expand_basic(
self, dataframe: DataFrame, metadata: dict, **kwargs
):
- dataframe["%-close_pct_change"] = dataframe["close"].pct_change()
- dataframe["%-raw_volume"] = dataframe["volume"]
+ dataframe["%-close_pct_change"] = dataframe.get("close").pct_change()
+ dataframe["%-raw_volume"] = dataframe.get("volume")
dataframe["%-obv"] = ta.OBV(dataframe)
label_period_candles = self.get_label_period_candles(str(metadata.get("pair")))
dataframe["%-atr_label_period_candles"] = ta.ATR(
normalize=True,
)
psar = ta.SAR(dataframe, acceleration=0.02, maximum=0.2)
- dataframe["%-diff_to_psar"] = dataframe["close"] - psar
+ dataframe["%-diff_to_psar"] = dataframe.get("close") - psar
kc = pta.kc(
- dataframe["high"],
- dataframe["low"],
- dataframe["close"],
+ dataframe.get("high"),
+ dataframe.get("low"),
+ dataframe.get("close"),
length=14,
scalar=2,
)
- dataframe["kc_lowerband"] = kc["KCLe_14_2.0"]
- dataframe["kc_middleband"] = kc["KCBe_14_2.0"]
- dataframe["kc_upperband"] = kc["KCUe_14_2.0"]
+ dataframe["kc_lowerband"] = kc.get("KCLe_14_2.0")
+ dataframe["kc_middleband"] = kc.get("KCBe_14_2.0")
+ dataframe["kc_upperband"] = kc.get("KCUe_14_2.0")
dataframe["%-kc_width"] = (
- dataframe["kc_upperband"] - dataframe["kc_lowerband"]
- ) / dataframe["kc_middleband"]
+ dataframe.get("kc_upperband") - dataframe.get("kc_lowerband")
+ ) / dataframe.get("kc_middleband")
(
dataframe["bb_upperband"],
dataframe["bb_middleband"],
nbdevdn=2.2,
)
dataframe["%-bb_width"] = (
- dataframe["bb_upperband"] - dataframe["bb_lowerband"]
- ) / dataframe["bb_middleband"]
- dataframe["%-ibs"] = (dataframe["close"] - dataframe["low"]) / (
- non_zero_diff(dataframe["high"], dataframe["low"])
+ dataframe.get("bb_upperband") - dataframe.get("bb_lowerband")
+ ) / dataframe.get("bb_middleband")
+ dataframe["%-ibs"] = (dataframe.get("close") - dataframe.get("low")) / (
+ non_zero_diff(dataframe.get("high"), dataframe.get("low"))
)
dataframe["jaw"], dataframe["teeth"], dataframe["lips"] = alligator(
dataframe, pricemode="median", zero_lag=True
)
- dataframe["%-dist_to_jaw"] = get_distance(dataframe["close"], dataframe["jaw"])
+ dataframe["%-dist_to_jaw"] = get_distance(
+ dataframe.get("close"), dataframe.get("jaw")
+ )
dataframe["%-dist_to_teeth"] = get_distance(
- dataframe["close"], dataframe["teeth"]
+ dataframe.get("close"), dataframe.get("teeth")
)
dataframe["%-dist_to_lips"] = get_distance(
- dataframe["close"], dataframe["lips"]
+ dataframe.get("close"), dataframe.get("lips")
+ )
+ dataframe["%-spread_jaw_teeth"] = dataframe.get("jaw") - dataframe.get("teeth")
+ dataframe["%-spread_teeth_lips"] = dataframe.get("teeth") - dataframe.get(
+ "lips"
)
- dataframe["%-spread_jaw_teeth"] = dataframe["jaw"] - dataframe["teeth"]
- dataframe["%-spread_teeth_lips"] = dataframe["teeth"] - dataframe["lips"]
- dataframe["zlema_50"] = pta.zlma(dataframe["close"], length=50, mamode="ema")
- dataframe["zlema_12"] = pta.zlma(dataframe["close"], length=12, mamode="ema")
- dataframe["zlema_26"] = pta.zlma(dataframe["close"], length=26, mamode="ema")
+ dataframe["zlema_50"] = zlema(dataframe.get("close"), period=50)
+ dataframe["zlema_12"] = zlema(dataframe.get("close"), period=12)
+ dataframe["zlema_26"] = zlema(dataframe.get("close"), period=26)
dataframe["%-distzlema50"] = get_distance(
- dataframe["close"], dataframe["zlema_50"]
+ dataframe.get("close"), dataframe.get("zlema_50")
)
dataframe["%-distzlema12"] = get_distance(
- dataframe["close"], dataframe["zlema_12"]
+ dataframe.get("close"), dataframe.get("zlema_12")
)
dataframe["%-distzlema26"] = get_distance(
- dataframe["close"], dataframe["zlema_26"]
+ dataframe.get("close"), dataframe.get("zlema_26")
)
macd = ta.MACD(dataframe)
- dataframe["%-macd"] = macd["macd"]
- dataframe["%-macdsignal"] = macd["macdsignal"]
- dataframe["%-macdhist"] = macd["macdhist"]
+ dataframe["%-macd"] = macd.get("macd")
+ dataframe["%-macdsignal"] = macd.get("macdsignal")
+ dataframe["%-macdhist"] = macd.get("macdhist")
dataframe["%-dist_to_macdsignal"] = get_distance(
- dataframe["%-macd"], dataframe["%-macdsignal"]
+ dataframe.get("%-macd"), dataframe.get("%-macdsignal")
)
- dataframe["%-dist_to_zerohist"] = get_distance(0, dataframe["%-macdhist"])
+ dataframe["%-dist_to_zerohist"] = get_distance(0, dataframe.get("%-macdhist"))
# VWAP bands
(
dataframe["vwap_lowerband"],
dataframe["vwap_upperband"],
) = vwapb(dataframe, 20, 1.0)
dataframe["%-vwap_width"] = (
- dataframe["vwap_upperband"] - dataframe["vwap_lowerband"]
- ) / dataframe["vwap_middleband"]
+ dataframe.get("vwap_upperband") - dataframe.get("vwap_lowerband")
+ ) / dataframe.get("vwap_middleband")
dataframe["%-dist_to_vwap_upperband"] = get_distance(
- dataframe["close"], dataframe["vwap_upperband"]
+ dataframe.get("close"), dataframe.get("vwap_upperband")
)
dataframe["%-dist_to_vwap_middleband"] = get_distance(
- dataframe["close"], dataframe["vwap_middleband"]
+ dataframe.get("close"), dataframe.get("vwap_middleband")
)
dataframe["%-dist_to_vwap_lowerband"] = get_distance(
- dataframe["close"], dataframe["vwap_lowerband"]
+ dataframe.get("close"), dataframe.get("vwap_lowerband")
)
- dataframe["%-body"] = dataframe["close"] - dataframe["open"]
+ dataframe["%-body"] = dataframe.get("close") - dataframe.get("open")
dataframe["%-tail"] = (
- np.minimum(dataframe["open"], dataframe["close"]) - dataframe["low"]
+ np.minimum(dataframe.get("open"), dataframe.get("close"))
+ - dataframe.get("low")
).clip(lower=0)
dataframe["%-wick"] = (
- dataframe["high"] - np.maximum(dataframe["open"], dataframe["close"])
+ dataframe.get("high")
+ - np.maximum(dataframe.get("open"), dataframe.get("close"))
).clip(lower=0)
pp = pivots_points(dataframe)
- dataframe["r1"] = pp["r1"]
- dataframe["s1"] = pp["s1"]
- dataframe["r2"] = pp["r2"]
- dataframe["s2"] = pp["s2"]
- dataframe["r3"] = pp["r3"]
- dataframe["s3"] = pp["s3"]
- dataframe["%-dist_to_r1"] = get_distance(dataframe["close"], dataframe["r1"])
- dataframe["%-dist_to_r2"] = get_distance(dataframe["close"], dataframe["r2"])
- dataframe["%-dist_to_r3"] = get_distance(dataframe["close"], dataframe["r3"])
- dataframe["%-dist_to_s1"] = get_distance(dataframe["close"], dataframe["s1"])
- dataframe["%-dist_to_s2"] = get_distance(dataframe["close"], dataframe["s2"])
- dataframe["%-dist_to_s3"] = get_distance(dataframe["close"], dataframe["s3"])
- dataframe["%-raw_close"] = dataframe["close"]
- dataframe["%-raw_open"] = dataframe["open"]
- dataframe["%-raw_low"] = dataframe["low"]
- dataframe["%-raw_high"] = dataframe["high"]
+ dataframe["r1"] = pp.get("r1")
+ dataframe["s1"] = pp.get("s1")
+ dataframe["r2"] = pp.get("r2")
+ dataframe["s2"] = pp.get("s2")
+ dataframe["r3"] = pp.get("r3")
+ dataframe["s3"] = pp.get("s3")
+ dataframe["%-dist_to_r1"] = get_distance(
+ dataframe.get("close"), dataframe.get("r1")
+ )
+ dataframe["%-dist_to_r2"] = get_distance(
+ dataframe.get("close"), dataframe.get("r2")
+ )
+ dataframe["%-dist_to_r3"] = get_distance(
+ dataframe.get("close"), dataframe.get("r3")
+ )
+ dataframe["%-dist_to_s1"] = get_distance(
+ dataframe.get("close"), dataframe.get("s1")
+ )
+ dataframe["%-dist_to_s2"] = get_distance(
+ dataframe.get("close"), dataframe.get("s2")
+ )
+ dataframe["%-dist_to_s3"] = get_distance(
+ dataframe.get("close"), dataframe.get("s3")
+ )
+ dataframe["%-raw_close"] = dataframe.get("close")
+ dataframe["%-raw_open"] = dataframe.get("open")
+ dataframe["%-raw_low"] = dataframe.get("low")
+ dataframe["%-raw_high"] = dataframe.get("high")
return dataframe
def feature_engineering_standard(self, dataframe: DataFrame, **kwargs):
- dataframe["%-day_of_week"] = (dataframe["date"].dt.dayofweek + 1) / 7
- dataframe["%-hour_of_day"] = (dataframe["date"].dt.hour + 1) / 25
+ dataframe["%-day_of_week"] = (dataframe.get("date").dt.dayofweek + 1) / 7
+ dataframe["%-hour_of_day"] = (dataframe.get("date").dt.hour + 1) / 25
return dataframe
def get_label_period_candles(self, pair: str) -> int:
else:
for pivot_idx, pivot_dir in zip(pivots_indices, pivots_directions):
dataframe.at[pivot_idx, EXTREMA_COLUMN] = pivot_dir
- dataframe["minima"] = np.where(dataframe[EXTREMA_COLUMN] == -1, -1, 0)
- dataframe["maxima"] = np.where(dataframe[EXTREMA_COLUMN] == 1, 1, 0)
+ dataframe["minima"] = np.where(dataframe.get(EXTREMA_COLUMN) == -1, -1, 0)
+ dataframe["maxima"] = np.where(dataframe.get(EXTREMA_COLUMN) == 1, 1, 0)
dataframe[EXTREMA_COLUMN] = self.smooth_extrema(
- dataframe[EXTREMA_COLUMN],
+ dataframe.get(EXTREMA_COLUMN),
self.freqai_info.get("extrema_smoothing_window", 5),
)
return dataframe
dataframe = self.freqai.start(dataframe, metadata, self)
dataframe["DI_catch"] = np.where(
- dataframe["DI_values"] > dataframe["DI_cutoff"],
+ dataframe.get("DI_values") > dataframe.get("DI_cutoff"),
0,
1,
)
pair = str(metadata.get("pair"))
- self.set_label_period_candles(pair, dataframe["label_period_candles"].iloc[-1])
- self.set_label_natr_ratio(pair, dataframe["label_natr_ratio"].iloc[-1])
+ self.set_label_period_candles(
+ pair, dataframe.get("label_period_candles").iloc[-1]
+ )
+ self.set_label_natr_ratio(pair, dataframe.get("label_natr_ratio").iloc[-1])
dataframe["natr_label_period_candles"] = ta.NATR(
dataframe, timeperiod=self.get_label_period_candles(pair)
)
- dataframe["minima_threshold"] = dataframe[MINIMA_THRESHOLD_COLUMN]
- dataframe["maxima_threshold"] = dataframe[MAXIMA_THRESHOLD_COLUMN]
+ dataframe["minima_threshold"] = dataframe.get(MINIMA_THRESHOLD_COLUMN)
+ dataframe["maxima_threshold"] = dataframe.get(MAXIMA_THRESHOLD_COLUMN)
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
enter_long_conditions = [
- df["do_predict"] == 1,
- df["DI_catch"] == 1,
- df[EXTREMA_COLUMN] < df["minima_threshold"],
+ df.get("do_predict") == 1,
+ df.get("DI_catch") == 1,
+ df.get(EXTREMA_COLUMN) < df.get("minima_threshold"),
]
df.loc[
] = (1, "long")
enter_short_conditions = [
- df["do_predict"] == 1,
- df["DI_catch"] == 1,
- df[EXTREMA_COLUMN] > df["maxima_threshold"],
+ df.get("do_predict") == 1,
+ df.get("DI_catch") == 1,
+ df.get(EXTREMA_COLUMN) > df.get("maxima_threshold"),
]
df.loc[
:return: Number of candles since the trade entry
"""
entry_date = QuickAdapterV3.get_trade_entry_date(trade)
- current_date = df["date"].iloc[-1]
+ date_series = df.get("date")
+ if date_series is None or date_series.empty:
+ return None
+ current_date = date_series.iloc[-1]
if isna(current_date):
return None
trade_duration_minutes = (current_date - entry_date).total_seconds() / 60.0
) -> Optional[float]:
if not QuickAdapterV3.is_trade_duration_valid(trade_duration_candles):
return None
- trade_zl_natr = calculate_zero_lag(
- df["natr_label_period_candles"], period=trade_duration_candles
- )
- if trade_zl_natr.empty:
+ label_natr = df.get("natr_label_period_candles")
+ if label_natr is None or label_natr.empty:
return None
trade_natr = np.nan
if trade_duration_candles >= 2:
- kama = get_ma_fn("kama")
+ zl_kama = get_zl_ma_fn("kama")
try:
- trade_kama_natr_values = kama(
- trade_zl_natr, timeperiod=trade_duration_candles
+ trade_kama_natr_values = zl_kama(
+ label_natr, timeperiod=trade_duration_candles
)
trade_kama_natr_values = trade_kama_natr_values[
~np.isnan(trade_kama_natr_values)
f"Failed to calculate KAMA for pair {pair}: {str(e)}", exc_info=True
)
if isna(trade_natr):
- trade_natr = trade_zl_natr.ewm(span=trade_duration_candles).mean().iloc[-1]
+ trade_natr = zlema(label_natr, period=trade_duration_candles).iloc[-1]
return trade_natr
def get_stoploss_distance(
return None
last_candle = df.iloc[-1]
- if last_candle["do_predict"] == 2:
+ if last_candle.get("do_predict") == 2:
return "model_expired"
- if last_candle["DI_catch"] == 0:
+ if last_candle.get("DI_catch") == 0:
return "outlier_detected"
entry_tag = trade.enter_tag
if (
entry_tag == "short"
- and last_candle["do_predict"] == 1
- and last_candle[EXTREMA_COLUMN] < last_candle["minima_threshold"]
+ and last_candle.get("do_predict") == 1
+ and last_candle.get(EXTREMA_COLUMN) < last_candle.get("minima_threshold")
):
return "minima_detected_short"
if (
entry_tag == "long"
- and last_candle["do_predict"] == 1
- and last_candle[EXTREMA_COLUMN] > last_candle["maxima_threshold"]
+ and last_candle.get("do_predict") == 1
+ and last_candle.get(EXTREMA_COLUMN) > last_candle.get("maxima_threshold")
):
return "maxima_detected_long"
if df.empty:
return False
last_candle = df.iloc[-1]
- last_candle_close = last_candle["close"]
- last_candle_high = last_candle["high"]
- last_candle_low = last_candle["low"]
- last_candle_natr = last_candle["natr_label_period_candles"]
+ last_candle_close = last_candle.get("close")
+ last_candle_high = last_candle.get("high")
+ last_candle_low = last_candle.get("low")
+ last_candle_natr = last_candle.get("natr_label_period_candles")
if isna(last_candle_natr) or last_candle_natr < 0:
return False
lower_bound = 0
"smm": series.rolling(window=odd_window, center=True).median(),
"sma": series.rolling(window=odd_window, center=True).mean(),
"ewma": series.ewm(span=window).mean(),
- "zlewma": pta.zlma(series, length=window, mamode="ema"),
+ "zlewma": zlema(series, period=window),
}
return smoothing_methods.get(
extrema_smoothing,
raise ValueError("period must be greater than or equal to 1")
previous_close_top = (
- dataframe["close"].rolling(period, min_periods=period).max().shift(1)
+ dataframe.get("close").rolling(period, min_periods=period).max().shift(1)
)
- return (dataframe["close"] - previous_close_top) / previous_close_top
+ return (dataframe.get("close") - previous_close_top) / previous_close_top
def bottom_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
raise ValueError("period must be greater than or equal to 1")
previous_close_bottom = (
- dataframe["close"].rolling(period, min_periods=period).min().shift(1)
+ dataframe.get("close").rolling(period, min_periods=period).min().shift(1)
)
- return (dataframe["close"] - previous_close_bottom) / previous_close_bottom
+ return (dataframe.get("close") - previous_close_bottom) / previous_close_bottom
def price_retracement_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
raise ValueError("period must be greater than or equal to 1")
previous_close_low = (
- dataframe["close"].rolling(period, min_periods=period).min().shift(1)
+ dataframe.get("close").rolling(period, min_periods=period).min().shift(1)
)
previous_close_high = (
- dataframe["close"].rolling(period, min_periods=period).max().shift(1)
+ dataframe.get("close").rolling(period, min_periods=period).max().shift(1)
)
- return (dataframe["close"] - previous_close_low) / (
+ return (dataframe.get("close") - previous_close_low) / (
non_zero_diff(previous_close_high, previous_close_low)
)
def calculate_zero_lag(series: pd.Series, period: int) -> pd.Series:
"""Applies a zero lag filter to reduce MA lag."""
- lag = max(int(0.5 * (period - 1)), 0)
+ lag = max((period - 1) / 2, 0)
if lag == 0:
return series
- return 2 * series - series.shift(lag)
+ return 2 * series - series.shift(int(lag))
def get_ma_fn(mamode: str) -> Callable[[pd.Series, int], np.ndarray]:
return mamodes.get(mamode, mamodes["sma"])
+def get_zl_ma_fn(mamode: str) -> Callable[[pd.Series, int], np.ndarray]:
+ ma_fn = get_ma_fn(mamode)
+ return lambda series, timeperiod: ma_fn(
+ calculate_zero_lag(series, timeperiod), timeperiod=timeperiod
+ )
+
+
+def zlema(series: pd.Series, period: int) -> pd.Series:
+ """Ehlers' Zero Lag EMA."""
+ lag = max((period - 1) / 2, 0)
+ alpha = 2 / (period + 1)
+ zl_series = 2 * series - series.shift(int(lag))
+ return zl_series.ewm(alpha=alpha).mean()
+
+
def _fractal_dimension(highs: np.ndarray, lows: np.ndarray, period: int) -> float:
"""Original fractal dimension computation implementation per Ehlers' paper."""
if period % 2 != 0:
n = len(df)
- highs = df["high"]
- lows = df["low"]
- closes = df["close"]
+ highs = df.get("high")
+ lows = df.get("low")
+ closes = df.get("close")
if zero_lag:
highs = calculate_zero_lag(highs, period=period)
"median": ta.MEDPRICE,
"typical": ta.TYPPRICE,
"weighted-close": ta.WCLPRICE,
- "close": lambda df: df["close"],
+ "close": lambda df: df.get("close"),
}
return pricemodes.get(pricemode, pricemodes["close"])
if n < 2 * period + 1:
return [], []
- highs = df["high"].values
- lows = df["low"].values
+ highs = df.get("high").values
+ lows = df.get("low").values
fractal_candidate_indices = np.arange(period, n - period)
indices = df.index.tolist()
thresholds = get_natr_values(natr_period) * natr_ratio
- closes = df["close"].values
- highs = df["high"].values
- lows = df["low"].values
+ closes = df.get("close").values
+ highs = df.get("high").values
+ lows = df.get("low").values
state: TrendDirection = TrendDirection.NEUTRAL
depth = -1