self,
factor: float,
pnl: float,
- trade_duration: int,
- max_trade_duration: int,
+ duration_ratio: float,
) -> float:
"""
Compute the reward factor at trade exit
"""
model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
exit_factor_mode = model_reward_parameters.get("exit_factor_mode", "linear")
- duration_ratio = trade_duration / max_trade_duration
if exit_factor_mode == "legacy":
- if trade_duration <= max_trade_duration:
+ if duration_ratio <= 1.0:
factor *= 1.5
else:
factor *= 0.5
factor /= math.sqrt(1.0 + duration_ratio)
elif exit_factor_mode == "linear":
factor /= 1.0 + duration_ratio
- elif exit_factor_mode == "hybrid":
- if duration_ratio <= 1.0:
- factor *= 1.2 - 0.2 * duration_ratio
- else:
- factor /= math.sqrt(1.0 + (duration_ratio - 1.0))
- if pnl > self.profit_aim * self.rr:
- factor *= float(model_reward_parameters.get("win_reward_factor", 2.0))
+ factor *= self._get_pnl_factor(self.profit_aim * self.rr, pnl)
- eff_weight = float(model_reward_parameters.get("exit_efficiency_weight", 0.0))
+ eff_weight = float(model_reward_parameters.get("exit_efficiency_weight", 0.75))
eff_center = float(model_reward_parameters.get("exit_efficiency_center", 0.5))
if eff_weight != 0.0:
max_pnl = self.get_max_unrealized_profit()
return factor
- def _progressive_duration_factor(self, duration_ratio: float) -> float:
- if not isinstance(duration_ratio, (int, float)) or duration_ratio <= 0.0:
- return 0.0
- duration_overage_ratio = max(0.0, duration_ratio - 1.0)
- dynamic_k = 2.5 + 3.0 * duration_overage_ratio
- gaussian_decay = math.exp(-dynamic_k * duration_overage_ratio**2.0)
- blend_weight = 1.0 - math.exp(-8.0 * duration_overage_ratio)
- return (1.0 - blend_weight) * duration_ratio + blend_weight * gaussian_decay
+ def _get_pnl_factor(self, profit_target: float, pnl: float) -> float:
+ if not np.isfinite(profit_target) or profit_target <= 0.0:
+ return 1.0
+ if not np.isfinite(pnl) or pnl <= profit_target:
+ return 1.0
+
+ model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
+ win_reward_factor = float(model_reward_parameters.get("win_reward_factor", 2.0))
+ profit_factor_beta = float(
+ model_reward_parameters.get("profit_factor_beta", 0.5)
+ )
+
+ profit_ratio = pnl / profit_target
+ return 1.0 + win_reward_factor * math.tanh(
+ profit_factor_beta * (profit_ratio - 1.0)
+ )
def calculate_reward(self, action: int) -> float:
"""
float = the reward to give to the agent for current step (used for optimization
of weights in NN)
"""
+ model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
+
# first, penalize if the action is not valid
if not self.action_masking and not self._is_valid(action):
self.tensorboard_log("invalid", category="actions")
- return self.rl_config.get("model_reward_parameters", {}).get(
- "invalid_action", -2.0
- )
+ return float(model_reward_parameters.get("invalid_action", -2.0))
pnl = self.get_unrealized_profit()
# mrr = self.get_most_recent_return()
max_trade_duration = max(1, self.timeout)
trade_duration = self.get_trade_duration()
+ duration_ratio = trade_duration / max_trade_duration
factor = 100.0
- holding_factor = factor * (self.profit_aim * self.rr) / 3.0
- idle_factor = holding_factor
+ profit_target = self.profit_aim * self.rr
+ idle_factor = factor * profit_target / 3.0
+ holding_factor = idle_factor * self._get_pnl_factor(profit_target, pnl)
# Force exits
if self._force_action in (
ForceActions.Stop_loss,
ForceActions.Timeout,
):
- return pnl * self._get_exit_reward_factor(
- factor, pnl, trade_duration, max_trade_duration
- )
+ return pnl * self._get_exit_reward_factor(factor, pnl, duration_ratio)
# # you can use feature values from dataframe
# rsi_now = self.get_feature_value(
# discourage agent from sitting idle
if action == Actions.Neutral.value and self._position == Positions.Neutral:
+ max_idle_duration = int(
+ model_reward_parameters.get(
+ "max_idle_duration_candles", max_trade_duration
+ )
+ )
+ idle_penalty_scale = float(
+ model_reward_parameters.get("idle_penalty_scale", 1.0)
+ )
+ idle_penalty_power = float(
+ model_reward_parameters.get("idle_penalty_power", 1.0)
+ )
+ idle_duration = self.get_idle_duration()
+ idle_duration_ratio = idle_duration / max(1, max_idle_duration)
return (
- -idle_factor * (self.get_idle_duration() / max_trade_duration) ** 1.05
+ -idle_factor
+ * idle_penalty_scale
+ * idle_duration_ratio**idle_penalty_power
)
# discourage agent from sitting in position
self._position in (Positions.Short, Positions.Long)
and action == Actions.Neutral.value
):
- duration_ratio = trade_duration / max_trade_duration
-
- max_pnl = self.get_max_unrealized_profit()
- min_pnl = self.get_min_unrealized_profit()
- range_pnl = max_pnl - min_pnl
- if np.isclose(range_pnl, 0.0):
- return -holding_factor * duration_ratio
-
- pnl_from_max = pnl - max_pnl
- pnl_from_min = pnl - min_pnl
-
- threshold_ratio = 0.3
-
- ratio_up = max(0.0, pnl_from_max) / range_pnl
- ratio_down = max(0.0, -pnl_from_min) / range_pnl
-
- if ratio_down > threshold_ratio:
- scale_down = (ratio_down - threshold_ratio) / (1.0 - threshold_ratio)
- return -holding_factor * duration_ratio * (1.0 + scale_down**1.25)
-
- if ratio_up > threshold_ratio:
- scale_up = (ratio_up - threshold_ratio) / (1.0 - threshold_ratio)
- duration_factor = self._progressive_duration_factor(duration_ratio)
- return holding_factor * duration_factor * scale_up**1.05
-
- corr_min = min_pnl - threshold_ratio * range_pnl
- corr_max = max_pnl + threshold_ratio * range_pnl
- corr_range = corr_max - corr_min
- if np.isclose(corr_range, 0.0):
- return -holding_factor * duration_ratio
- return (
- -holding_factor
- * duration_ratio
- * (1.0 - (pnl - corr_min) / corr_range) ** 0.65
+ holding_duration_grace = float(
+ model_reward_parameters.get("holding_duration_grace", 1.0)
+ )
+ holding_overage_scale = float(
+ model_reward_parameters.get("holding_overage_scale", 1.0)
+ )
+ holding_overage_power = float(
+ model_reward_parameters.get("holding_overage_power", 1.1)
)
+ duration_overage_ratio = max(0.0, duration_ratio - holding_duration_grace)
+ if duration_overage_ratio > 0.0 or pnl > profit_target:
+ return (
+ -holding_factor
+ * holding_overage_scale
+ * duration_overage_ratio**holding_overage_power
+ )
+ return 0.0
# close long
if action == Actions.Long_exit.value and self._position == Positions.Long:
- return pnl * self._get_exit_reward_factor(
- factor, pnl, trade_duration, max_trade_duration
- )
+ return pnl * self._get_exit_reward_factor(factor, pnl, duration_ratio)
# close short
if action == Actions.Short_exit.value and self._position == Positions.Short:
- return pnl * self._get_exit_reward_factor(
- factor, pnl, trade_duration, max_trade_duration
- )
+ return pnl * self._get_exit_reward_factor(factor, pnl, duration_ratio)
return 0.0