"""
_LOG_2: Final[float] = math.log(2.0)
+
DEFAULT_IDLE_DURATION_MULTIPLIER: Final[int] = 4
+ DEFAULT_BASE_FACTOR: Final[float] = 100.0
+ DEFAULT_HOLD_POTENTIAL_SCALE: Final[float] = 1.0
+ DEFAULT_EFFICIENCY_WEIGHT: Final[float] = 1.0
+ DEFAULT_MAX_TRADE_DURATION_CANDLES: Final[int] = 128
_MODEL_TYPES: Final[Tuple[ModelType, ...]] = (
"PPO",
self._total_entry_additive: float = 0.0
self._last_exit_additive: float = 0.0
self._total_exit_additive: float = 0.0
- model_reward_parameters: Dict[str, Any] = self.rl_config.get(
+ model_reward_parameters: Mapping[str, Any] = self.rl_config.get(
"model_reward_parameters", {}
)
self.max_trade_duration_candles: int = int(
model_reward_parameters.get(
"max_trade_duration_candles",
- 128,
+ ReforceXY.DEFAULT_MAX_TRADE_DURATION_CANDLES,
)
)
self.max_idle_duration_candles: int = int(
model_reward_parameters.get("hold_potential_enabled", True)
)
self._hold_potential_scale: float = float(
- model_reward_parameters.get("hold_potential_scale", 1.0)
+ model_reward_parameters.get(
+ "hold_potential_scale", ReforceXY.DEFAULT_HOLD_POTENTIAL_SCALE
+ )
)
self._hold_potential_gain: float = float(
model_reward_parameters.get("hold_potential_gain", 1.0)
self._last_exit_reward = 0.0
return observation, history
- def _get_exit_factor(
+ def _compute_time_attenuation_factor(
self,
factor: float,
- pnl: float,
duration_ratio: float,
+ model_reward_parameters: Mapping[str, Any],
) -> float:
"""
- Compute the reward factor at trade exit
+ Apply time-based decay to reward factor using configurable strategy
+ (legacy/sqrt/linear/power/half_life). Optionally apply plateau grace period.
"""
- if not (
- np.isfinite(factor) and np.isfinite(pnl) and np.isfinite(duration_ratio)
- ):
- return 0.0
if duration_ratio < 0.0:
duration_ratio = 0.0
- model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
exit_attenuation_mode = str(
model_reward_parameters.get(
"exit_attenuation_mode", ReforceXY._EXIT_ATTENUATION_MODES[2]
)
if exit_plateau_grace < 0.0:
exit_plateau_grace = 0.0
- exit_linear_slope = float(model_reward_parameters.get("exit_linear_slope", 1.0))
- if exit_linear_slope < 0.0:
- exit_linear_slope = 0.0
- def _legacy(f: float, dr: float, p: Mapping) -> float:
+ def _legacy(f: float, dr: float, p: Mapping[str, Any]) -> float:
return f * (1.5 if dr <= 1.0 else 0.5)
- def _sqrt(f: float, dr: float, p: Mapping) -> float:
+ def _sqrt(f: float, dr: float, p: Mapping[str, Any]) -> float:
return f / math.sqrt(1.0 + dr)
- def _linear(f: float, dr: float, p: Mapping) -> float:
+ def _linear(f: float, dr: float, p: Mapping[str, Any]) -> float:
slope = float(p.get("exit_linear_slope", 1.0))
if slope < 0.0:
slope = 1.0
return f / (1.0 + slope * dr)
- def _power(f: float, dr: float, p: Mapping) -> float:
+ def _power(f: float, dr: float, p: Mapping[str, Any]) -> float:
tau = p.get("exit_power_tau")
if isinstance(tau, (int, float)):
tau = float(tau)
alpha = 1.0
return f / math.pow(1.0 + dr, alpha)
- def _half_life(f: float, dr: float, p: Mapping) -> float:
+ def _half_life(f: float, dr: float, p: Mapping[str, Any]) -> float:
hl = float(p.get("exit_half_life", 0.5))
if np.isclose(hl, 0.0) or hl < 0.0:
return 1.0
return f * math.pow(2.0, -dr / hl)
- strategies: Dict[str, Callable[[float, float, Mapping], float]] = {
+ strategies: Dict[str, Callable[[float, float, Mapping[str, Any]], float]] = {
ReforceXY._EXIT_ATTENUATION_MODES[0]: _legacy,
ReforceXY._EXIT_ATTENUATION_MODES[1]: _sqrt,
ReforceXY._EXIT_ATTENUATION_MODES[2]: _linear,
)
factor = _linear(factor, effective_dr, model_reward_parameters)
- factor *= self._get_pnl_factor(pnl, self._pnl_target)
+ return factor
+
+ def _get_exit_factor(
+ self,
+ factor: float,
+ pnl: float,
+ duration_ratio: float,
+ model_reward_parameters: Mapping[str, Any],
+ ) -> float:
+ """
+ Compute exit reward factor combining time attenuation and PnL factors
+ """
+ if not (
+ np.isfinite(factor) and np.isfinite(pnl) and np.isfinite(duration_ratio)
+ ):
+ return 0.0
+ time_attenuation_factor = self._compute_time_attenuation_factor(
+ factor,
+ duration_ratio,
+ model_reward_parameters,
+ )
+
+ factor *= time_attenuation_factor * self._get_pnl_factor(
+ pnl, self._pnl_target, model_reward_parameters
+ )
check_invariants = model_reward_parameters.get("check_invariants", True)
check_invariants = (
return factor
- def _get_pnl_factor(self, pnl: float, pnl_target: float) -> float:
- if not np.isfinite(pnl):
- return 0.0
-
- model_reward_parameters = self.rl_config.get("model_reward_parameters", {})
-
+ def _compute_pnl_target_factor(
+ self, pnl: float, pnl_target: float, model_reward_parameters: Mapping[str, Any]
+ ) -> float:
+ """
+ Scale reward based on PnL/target ratio using tanh (≥ 1.0 for good trades).
+ """
pnl_target_factor = 1.0
+
if pnl_target > 0.0:
pnl_factor_beta = float(model_reward_parameters.get("pnl_factor_beta", 0.5))
pnl_ratio = pnl / pnl_target
+
if abs(pnl_ratio) > 1.0:
base_pnl_target_factor = math.tanh(
pnl_factor_beta * (abs(pnl_ratio) - 1.0)
win_reward_factor = float(
model_reward_parameters.get("win_reward_factor", 2.0)
)
+
if pnl_ratio > 1.0:
pnl_target_factor = 1.0 + win_reward_factor * base_pnl_target_factor
elif pnl_ratio < -(1.0 / self.rr):
1.0 + loss_penalty_factor * base_pnl_target_factor
)
- efficiency_factor = 1.0
- efficiency_weight = float(model_reward_parameters.get("efficiency_weight", 1.0))
+ return pnl_target_factor
+
+ def _compute_efficiency_factor(
+ self, pnl: float, model_reward_parameters: Mapping[str, Any]
+ ) -> float:
+ """
+ Scale reward based on exit efficiency (distance from max unrealized PnL).
+ """
+ efficiency_weight = float(
+ model_reward_parameters.get(
+ "efficiency_weight", ReforceXY.DEFAULT_EFFICIENCY_WEIGHT
+ )
+ )
efficiency_center = float(model_reward_parameters.get("efficiency_center", 0.5))
+
+ efficiency_factor = 1.0
if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0):
max_pnl = max(self.get_max_unrealized_profit(), pnl)
min_pnl = min(self.get_min_unrealized_profit(), pnl)
efficiency_center - efficiency_ratio
)
+ return efficiency_factor
+
+ def _get_pnl_factor(
+ self, pnl: float, pnl_target: float, model_reward_parameters: Mapping[str, Any]
+ ) -> float:
+ """
+ Combine PnL target and efficiency factors (>= 0.0)
+ """
+ if not np.isfinite(pnl):
+ return 0.0
+
+ pnl_target_factor = self._compute_pnl_target_factor(
+ pnl, pnl_target, model_reward_parameters
+ )
+ efficiency_factor = self._compute_efficiency_factor(
+ pnl, model_reward_parameters
+ )
+
return max(0.0, pnl_target_factor * efficiency_factor)
def calculate_reward(self, action: int) -> float:
and action == Actions.Long_exit.value
and self._position == Positions.Long
):
- base_reward = pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
+ base_reward = pnl * self._get_exit_factor(
+ base_factor, pnl, duration_ratio, model_reward_parameters
+ )
self._last_exit_reward = float(base_reward)
if (
base_reward is None
and action == Actions.Short_exit.value
and self._position == Positions.Short
):
- base_reward = pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
+ base_reward = pnl * self._get_exit_factor(
+ base_factor, pnl, duration_ratio, model_reward_parameters
+ )
self._last_exit_reward = float(base_reward)
# 5. Default