import gc
import json
import logging
+import math
import time
import warnings
from collections import defaultdict
self.timeout: int = self.rl_config.get("max_trade_duration_candles", 128)
self._last_closed_position: Optional[Positions] = None
self._last_closed_trade_tick: int = 0
+ self._max_unrealized_profit: float = -np.inf
+ self._min_unrealized_profit: float = np.inf
if self.force_actions:
logger.info(
"%s - take_profit: %s, stop_loss: %s, timeout: %s candles (%s days), observation_space: %s",
"""
Compute the reward factor at trade exit
"""
- if trade_duration <= max_trade_duration:
- factor *= 1.5
- elif trade_duration > max_trade_duration:
- factor *= 0.5
+ model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
+ exit_factor_mode = model_reward_parameters.get("exit_factor_mode", "linear")
+ duration_ratio = trade_duration / max_trade_duration
+
+ if exit_factor_mode == "legacy":
+ if trade_duration <= max_trade_duration:
+ factor *= 1.5
+ else:
+ factor *= 0.5
+ elif exit_factor_mode == "sqrt":
+ factor /= math.sqrt(1.0 + duration_ratio)
+ elif exit_factor_mode == "linear":
+ factor /= 1.0 + duration_ratio
+ elif exit_factor_mode == "hybrid":
+ if duration_ratio <= 1.0:
+ factor *= 1.2 - 0.2 * duration_ratio
+ else:
+ factor /= math.sqrt(1.0 + (duration_ratio - 1.0))
+
if pnl > self.profit_aim * self.rr:
- factor *= float(
- self.rl_config.get("model_reward_parameters", {}).get(
- "win_reward_factor", 2.0
- )
- )
+ factor *= float(model_reward_parameters.get("win_reward_factor", 2.0))
+
+ eff_weight = float(model_reward_parameters.get("exit_efficiency_weight", 0.0))
+ eff_center = float(model_reward_parameters.get("exit_efficiency_center", 0.5))
+ if eff_weight != 0.0:
+ max_pnl = self.get_max_unrealized_profit()
+ min_pnl = self.get_min_unrealized_profit()
+ range_pnl = max_pnl - min_pnl
+ if not np.isclose(range_pnl, 0.0):
+ eff_ratio = (pnl - min_pnl) / range_pnl
+ factor *= 1.0 + eff_weight * (eff_ratio - eff_center)
+ factor = max(0.0, factor)
+
return factor
+ def _progressive_duration_factor(self, duration_ratio: float) -> float:
+ if not isinstance(duration_ratio, (int, float)) or duration_ratio <= 0.0:
+ return 0.0
+ duration_overage_ratio = max(0.0, duration_ratio - 1.0)
+ dynamic_k = 2.5 + 3.0 * duration_overage_ratio
+ gaussian_decay = math.exp(-dynamic_k * duration_overage_ratio**2.0)
+ blend_weight = 1.0 - math.exp(-8.0 * duration_overage_ratio)
+ return (1.0 - blend_weight) * duration_ratio + blend_weight * gaussian_decay
+
def calculate_reward(self, action: int) -> float:
"""
An example reward function. This is the one function that users will likely
trade_duration = self.get_trade_duration()
factor = 100.0
+ holding_factor = factor * (self.profit_aim * self.rr) / 3.0
+ idle_factor = holding_factor
# Force exits
if self._force_action in (
# factor = 1
# return 25.0 * factor
- # discourage agent from sitting idle too long
+ # discourage agent from sitting idle
if action == Actions.Neutral.value and self._position == Positions.Neutral:
- return -0.01 * self.get_idle_duration() ** 1.05
+ return (
+ -idle_factor * (self.get_idle_duration() / max_trade_duration) ** 1.05
+ )
- # pnl and duration aware agent reward while sitting in position
+ # discourage agent from sitting in position
if (
self._position in (Positions.Short, Positions.Long)
and action == Actions.Neutral.value
):
- duration_fraction = trade_duration / max_trade_duration
- max_pnl = max(self.get_most_recent_max_pnl(), pnl)
-
- if max_pnl > 0:
- drawdown_penalty = 0.0025 * factor * (max_pnl - pnl) * duration_fraction
- else:
- drawdown_penalty = 0.0
-
- lambda1 = 0.05
- lambda2 = 0.1
- if pnl >= 0:
- if duration_fraction <= 1.0:
- duration_penalty_factor = 1.0
- else:
- duration_penalty_factor = 1.0 / (
- 1.0 + lambda1 * (duration_fraction - 1.0)
- )
- return (
- factor * pnl * duration_penalty_factor
- - lambda2 * duration_fraction
- - drawdown_penalty
- )
- else:
- return (
- factor * pnl * (1.0 + lambda1 * duration_fraction)
- - 2.0 * lambda2 * duration_fraction
- - drawdown_penalty
- )
+ duration_ratio = trade_duration / max_trade_duration
+
+ max_pnl = self.get_max_unrealized_profit()
+ min_pnl = self.get_min_unrealized_profit()
+ range_pnl = max_pnl - min_pnl
+ if np.isclose(range_pnl, 0.0):
+ return -holding_factor * duration_ratio
+
+ pnl_from_max = pnl - max_pnl
+ pnl_from_min = pnl - min_pnl
+
+ threshold_ratio = 0.3
+
+ ratio_up = max(0.0, pnl_from_max) / range_pnl
+ ratio_down = max(0.0, -pnl_from_min) / range_pnl
+
+ if ratio_down > threshold_ratio:
+ scale_down = (ratio_down - threshold_ratio) / (1.0 - threshold_ratio)
+ return -holding_factor * duration_ratio * (1.0 + scale_down**1.25)
+
+ if ratio_up > threshold_ratio:
+ scale_up = (ratio_up - threshold_ratio) / (1.0 - threshold_ratio)
+ duration_factor = self._progressive_duration_factor(duration_ratio)
+ return holding_factor * duration_factor * scale_up**1.05
+
+ corr_min = min_pnl - threshold_ratio * range_pnl
+ corr_max = max_pnl + threshold_ratio * range_pnl
+ corr_range = corr_max - corr_min
+ if np.isclose(corr_range, 0.0):
+ return -holding_factor * duration_ratio
+ return (
+ -holding_factor
+ * duration_ratio
+ * (1.0 - (pnl - corr_min) / corr_range) ** 0.65
+ )
# close long
if action == Actions.Long_exit.value and self._position == Positions.Long:
def _enter_trade(self, action: int) -> None:
self._position = self._get_position(action)
self._last_trade_tick = self._current_tick
+ self._max_unrealized_profit = -np.inf
+ self._min_unrealized_profit = np.inf
def _exit_trade(self) -> None:
self._update_total_profit()
self._position = Positions.Neutral
self._last_trade_tick = None
self._last_closed_trade_tick = self._current_tick
+ self._max_unrealized_profit = -np.inf
+ self._min_unrealized_profit = np.inf
def execute_trade(self, action: int) -> Optional[str]:
"""
"""
self._current_tick += 1
self._update_unrealized_total_profit()
- previous_pnl = self.get_unrealized_profit()
+ pre_pnl = self.get_unrealized_profit()
self._update_portfolio_log_returns()
self._force_action = self._get_force_action()
reward = self.calculate_reward(action)
self.tensorboard_log(Actions._member_names_[action], category="actions")
trade_type = self.execute_trade(action)
if trade_type is not None:
- self.append_trade_history(trade_type, self.current_price(), previous_pnl)
+ self.append_trade_history(trade_type, self.current_price(), pre_pnl)
self._position_history.append(self._position)
+ pnl = self.get_unrealized_profit()
+ self._update_max_unrealized_profit(pnl)
+ self._update_min_unrealized_profit(pnl)
+ delta_pnl = pnl - pre_pnl
info = {
"tick": self._current_tick,
"position": self._position.value,
"action": action,
"force_action": (self._force_action.name if self._force_action else None),
- "previous_pnl": round(previous_pnl, 5),
- "pnl": round(self.get_unrealized_profit(), 5),
+ "pre_pnl": round(pre_pnl, 5),
+ "pnl": round(pnl, 5),
+ "delta_pnl": round(delta_pnl, 5),
+ "max_pnl": round(self.get_max_unrealized_profit(), 5),
+ "min_pnl": round(self.get_min_unrealized_profit(), 5),
+ "most_recent_return": round(self.get_most_recent_return(), 5),
+ "most_recent_profit": round(self.get_most_recent_profit(), 5),
+ "total_profit": round(self._total_profit, 5),
"reward": round(reward, 5),
"total_reward": round(self.total_reward, 5),
- "total_profit": round(self._total_profit, 5),
"idle_duration": self.get_idle_duration(),
"trade_duration": self.get_trade_duration(),
"trade_count": int(len(self.trade_history) // 2),
return self._current_tick - self._start_tick
return self._current_tick - self._last_closed_trade_tick
- def get_most_recent_max_pnl(self) -> float:
+ def get_previous_unrealized_profit(self) -> float:
"""
- Calculate the most recent maximum unrealized profit if in a trade
+ Get the previous tick unrealized profit if the agent is in a trade
"""
if self._last_trade_tick is None:
return 0.0
if self._position == Positions.Neutral:
return 0.0
- pnl_history = self.history.get("pnl")
- if not pnl_history or len(pnl_history) == 0:
+ elif self._position == Positions.Short:
+ previous_price = self.add_entry_fee(self.previous_price())
+ last_trade_price = self.add_exit_fee(
+ self.prices.iloc[self._last_trade_tick].open
+ )
+ return (last_trade_price - previous_price) / last_trade_price
+ elif self._position == Positions.Long:
+ previous_price = self.add_exit_fee(self.previous_price())
+ last_trade_price = self.add_entry_fee(
+ self.prices.iloc[self._last_trade_tick].open
+ )
+ return (previous_price - last_trade_price) / last_trade_price
+ else:
return 0.0
- pnl_history = np.asarray(pnl_history)
- ticks = self.history.get("tick")
- if not ticks:
+ def get_max_unrealized_profit(self) -> float:
+ """
+ Get the maximum unrealized profit if the agent is in a trade
+ """
+ if self._last_trade_tick is None:
return 0.0
- ticks = np.asarray(ticks)
- start = np.searchsorted(ticks, self._last_trade_tick, side="left")
- trade_pnl_history = pnl_history[start:]
- if trade_pnl_history.size == 0:
+ if self._position == Positions.Neutral:
return 0.0
- return np.max(trade_pnl_history)
+ if not np.isfinite(self._max_unrealized_profit):
+ return self.get_unrealized_profit()
+ return self._max_unrealized_profit
+
+ def _update_max_unrealized_profit(self, pnl: float) -> None:
+ if self._position in (Positions.Long, Positions.Short):
+ if pnl > self._max_unrealized_profit:
+ self._max_unrealized_profit = pnl
+
+ def get_min_unrealized_profit(self) -> float:
+ """
+ Get the minimum unrealized profit if the agent is in a trade
+ """
+ if self._last_trade_tick is None:
+ return 0.0
+ if self._position == Positions.Neutral:
+ return 0.0
+ if not np.isfinite(self._min_unrealized_profit):
+ return self.get_unrealized_profit()
+ return self._min_unrealized_profit
+
+ def _update_min_unrealized_profit(self, pnl: float) -> None:
+ if self._position in (Positions.Long, Positions.Short):
+ if pnl < self._min_unrealized_profit:
+ self._min_unrealized_profit = pnl
def get_most_recent_return(self) -> float:
"""
- Calculate the tick to tick return if in a trade.
+ Calculate the tick to tick return if the agent is in a trade.
Return is generated from rising prices in Long and falling prices in Short positions.
The actions Sell/Buy or Hold during a Long position trigger the sell/buy-fee.
"""
elif self._position == Positions.Long:
current_price = self.current_price()
previous_price = self.previous_price()
+ previous_tick = self.previous_tick()
if (
- self._position_history[self._current_tick - 1] == Positions.Short
- or self._position_history[self._current_tick - 1] == Positions.Neutral
+ self._position_history[previous_tick] == Positions.Short
+ or self._position_history[previous_tick] == Positions.Neutral
):
previous_price = self.add_entry_fee(previous_price)
return np.log(current_price) - np.log(previous_price)
elif self._position == Positions.Short:
current_price = self.current_price()
previous_price = self.previous_price()
+ previous_tick = self.previous_tick()
if (
- self._position_history[self._current_tick - 1] == Positions.Long
- or self._position_history[self._current_tick - 1] == Positions.Neutral
+ self._position_history[previous_tick] == Positions.Long
+ or self._position_history[previous_tick] == Positions.Neutral
):
previous_price = self.add_exit_fee(previous_price)
return np.log(previous_price) - np.log(current_price)
def get_most_recent_profit(self) -> float:
"""
- Calculate the tick to tick unrealized profit if in a trade
+ Calculate the tick to tick unrealized profit if the agent is in a trade
"""
if self._last_trade_tick is None:
return 0.0
return (previous_price - current_price) / previous_price
return 0.0
+ def previous_tick(self) -> int:
+ return max(self._current_tick - 1, self._start_tick)
+
def previous_price(self) -> float:
- return self.prices.iloc[self._current_tick - 1].get("open")
+ return self.prices.iloc[self.previous_tick()].get("open")
def get_env_history(self) -> DataFrame:
"""
color="gray",
label="pnl",
)
- previous_pnl_series = history.get("previous_pnl")
- if previous_pnl_series is not None and len(previous_pnl_series) > 0:
+ pre_pnl_series = history.get("pre_pnl")
+ if pre_pnl_series is not None and len(pre_pnl_series) > 0:
axs[1].plot(
ticks,
- previous_pnl_series,
+ pre_pnl_series,
linewidth=1,
color="deepskyblue",
- label="previous_pnl",
+ label="pre_pnl",
)
if (pnl_series is not None and len(pnl_series) > 0) or (
- previous_pnl_series is not None and len(previous_pnl_series) > 0
+ pre_pnl_series is not None and len(pre_pnl_series) > 0
):
axs[1].legend(loc="upper left", fontsize=8)
axs[1].axhline(y=0, label="0", alpha=0.33, color="gray")