)
if exit_linear_slope < 0.0:
warnings.warn(
- "exit_linear_slope < 0; falling back to 0.0",
+ "exit_linear_slope < 0; falling back to 1.0",
RewardDiagnosticsWarning,
stacklevel=2,
)
- exit_linear_slope = 0.0
+ exit_linear_slope = 1.0
def _legacy_kernel(f: float, dr: float) -> float:
return f * (1.5 if dr <= 1.0 else 0.5)
if "risk_reward_ratio" in params:
risk_reward_ratio = _get_float_param(params, "risk_reward_ratio", float(risk_reward_ratio))
- profit_target_final = profit_target * risk_reward_ratio
- idle_factor = factor * profit_target_final / 4.0
+ pnl_target = float(profit_target * risk_reward_ratio)
+
+ idle_factor = factor * pnl_target / 4.0
pnl_factor = _get_pnl_factor(
params,
context,
- profit_target_final,
+ pnl_target,
risk_reward_ratio,
)
hold_factor = idle_factor
if pbrs_enabled and not is_neutral:
# Compute Φ(s) for the current state to preserve telescoping semantics Δ = γ·Φ(s') − Φ(s)
- current_potential = _compute_hold_potential(current_pnl, current_duration_ratio, params)
+ current_potential = _compute_hold_potential(
+ current_pnl, pnl_target, current_duration_ratio, params
+ )
if not np.isfinite(current_potential):
current_potential = 0.0
apply_potential_shaping(
base_reward=base_reward,
current_pnl=current_pnl,
+ pnl_target=pnl_target,
current_duration_ratio=current_duration_ratio,
next_pnl=next_pnl,
next_duration_ratio=next_duration_ratio,
# === PBRS IMPLEMENTATION ===
-def _compute_hold_potential(pnl: float, duration_ratio: float, params: RewardParams) -> float:
+def _compute_hold_potential(
+ pnl: float,
+ pnl_target: float,
+ duration_ratio: float,
+ params: RewardParams,
+) -> float:
"""Compute PBRS hold potential Φ(s)."""
if not _get_bool_param(
params,
return _compute_bi_component(
kind="hold_potential",
pnl=pnl,
+ pnl_target=pnl_target,
duration_ratio=duration_ratio,
params=params,
scale_key="hold_potential_scale",
)
-def _compute_entry_additive(pnl: float, duration_ratio: float, params: RewardParams) -> float:
+def _compute_entry_additive(
+ pnl: float,
+ pnl_target: float,
+ duration_ratio: float,
+ params: RewardParams,
+) -> float:
if not _get_bool_param(
params,
"entry_additive_enabled",
return _compute_bi_component(
kind="entry_additive",
pnl=pnl,
+ pnl_target=pnl_target,
duration_ratio=duration_ratio,
params=params,
scale_key="entry_additive_scale",
)
-def _compute_exit_additive(pnl: float, duration_ratio: float, params: RewardParams) -> float:
+def _compute_exit_additive(
+ pnl: float,
+ pnl_target: float,
+ duration_ratio: float,
+ params: RewardParams,
+) -> float:
if not _get_bool_param(
params,
"exit_additive_enabled",
return _compute_bi_component(
kind="exit_additive",
pnl=pnl,
+ pnl_target=pnl_target,
duration_ratio=duration_ratio,
params=params,
scale_key="exit_additive_scale",
def apply_potential_shaping(
base_reward: float,
current_pnl: float,
+ pnl_target: float,
current_duration_ratio: float,
next_pnl: float,
next_duration_ratio: float,
Notes
-----
- - Shaping Δ = γ·Φ(next) − Φ(prev) with prev = Φ(current_pnl, current_duration_ratio).
+ - Shaping Δ = γ·Φ(next) − Φ(prev).
- previous_potential:
Previously computed Φ(s) for the prior transition. When provided and finite, it
is used as Φ(prev) in Δ; otherwise Φ(prev) is derived from the current state.
prev_term = (
float(previous_potential)
if np.isfinite(previous_potential)
- else _compute_hold_potential(current_pnl, current_duration_ratio, params)
+ else _compute_hold_potential(current_pnl, pnl_target, current_duration_ratio, params)
)
if not np.isfinite(prev_term):
prev_term = 0.0
)
next_potential = _compute_exit_potential(last_potential, params)
else:
- next_potential = _compute_hold_potential(next_pnl, next_duration_ratio, params)
+ next_potential = _compute_hold_potential(next_pnl, pnl_target, next_duration_ratio, params)
# PBRS shaping Δ = γ·Φ(next) − Φ(prev)
pbrs_delta = gamma * next_potential - float(prev_term)
reward_shaping = pbrs_delta
# Non-PBRS additives
- # Pre-compute candidate additives (return 0.0 if corresponding feature disabled)
- cand_entry_add = _compute_entry_additive(next_pnl, next_duration_ratio, params)
- cand_exit_add = _compute_exit_additive(current_pnl, current_duration_ratio, params)
+ cand_entry_add = _compute_entry_additive(next_pnl, pnl_target, next_duration_ratio, params)
+ cand_exit_add = _compute_exit_additive(current_pnl, pnl_target, current_duration_ratio, params)
entry_additive = cand_entry_add if is_entry else 0.0
exit_additive = cand_exit_add if is_exit else 0.0
def _compute_bi_component(
kind: str,
pnl: float,
+ pnl_target: float,
duration_ratio: float,
params: RewardParams,
scale_key: str,
non_finite_key: str,
) -> float:
"""Generic helper for (pnl, duration) bi-component transforms."""
+ if not (np.isfinite(pnl) and np.isfinite(pnl_target) and np.isfinite(duration_ratio)):
+ return _fail_safely(non_finite_key)
+ if pnl_target <= 0.0:
+ return _fail_safely(f"{kind}_invalid_pnl_target")
+
+ pnl_ratio = float(pnl / pnl_target)
+ duration_ratio = float(np.clip(duration_ratio, 0.0, 1.0))
+
scale = _get_float_param(params, scale_key, 1.0)
gain = _get_float_param(params, gain_key, 1.0)
transform_pnl = _get_str_param(params, transform_pnl_key, "tanh")
transform_duration = _get_str_param(params, transform_dur_key, "tanh")
- t_pnl = apply_transform(transform_pnl, gain * pnl)
+
+ t_pnl = apply_transform(transform_pnl, gain * pnl_ratio)
t_dur = apply_transform(transform_duration, gain * duration_ratio)
value = scale * 0.5 * (t_pnl + t_dur)
if not np.isfinite(value):
ctx = {
"base_reward": 0.05,
"current_pnl": 0.01,
+ "pnl_target": self.TEST_PROFIT_TARGET,
"current_duration_ratio": 0.2,
"next_pnl": 0.012,
"next_duration_ratio": 0.25,
"hold_potential_transform_pnl": "tanh",
"hold_potential_transform_duration": "tanh",
}
- val = _compute_hold_potential(0.5, 0.3, params)
+ val = _compute_hold_potential(0.5, self.TEST_PROFIT_TARGET, 0.3, params)
self.assertFinite(val, name="hold_potential")
def test_hold_penalty_basic_calculation(self):
)
current_pnl = 0.02
current_dur = 0.5
- prev_potential = _compute_hold_potential(current_pnl, current_dur, params)
+ pnl_target = self.TEST_PROFIT_TARGET
+ prev_potential = _compute_hold_potential(current_pnl, pnl_target, current_dur, params)
(
_total_reward,
reward_shaping,
) = apply_potential_shaping(
base_reward=0.0,
current_pnl=current_pnl,
+ pnl_target=pnl_target,
current_duration_ratio=current_dur,
next_pnl=0.0,
next_duration_ratio=0.0,
)
current_pnl = 0.015
current_dur = 0.4
- prev_potential = _compute_hold_potential(current_pnl, current_dur, params)
+ pnl_target = self.TEST_PROFIT_TARGET
+ prev_potential = _compute_hold_potential(current_pnl, pnl_target, current_dur, params)
gamma = _get_float_param(
params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95)
)
) = apply_potential_shaping(
base_reward=0.0,
current_pnl=current_pnl,
+ pnl_target=pnl_target,
current_duration_ratio=current_dur,
next_pnl=0.0,
next_duration_ratio=0.0,
def test_additive_components_disabled_return_zero(self):
"""Verifies entry/exit additives return zero when disabled."""
params_entry = {"entry_additive_enabled": False, "entry_additive_scale": 1.0}
- val_entry = _compute_entry_additive(0.5, 0.3, params_entry)
+ val_entry = _compute_entry_additive(0.5, self.TEST_PROFIT_TARGET, 0.3, params_entry)
self.assertEqual(float(val_entry), 0.0)
params_exit = {"exit_additive_enabled": False, "exit_additive_scale": 1.0}
- val_exit = _compute_exit_additive(0.5, 0.3, params_exit)
+ val_exit = _compute_exit_additive(0.5, self.TEST_PROFIT_TARGET, 0.3, params_exit)
self.assertEqual(float(val_exit), 0.0)
def test_exit_potential_canonical(self):
apply_potential_shaping(
base_reward=base_reward,
current_pnl=current_pnl,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=current_duration_ratio,
next_pnl=next_pnl,
next_duration_ratio=next_duration_ratio,
self.assertPlacesEqual(next_potential, 0.0, places=12)
current_potential = _compute_hold_potential(
current_pnl,
+ self.TEST_PROFIT_TARGET,
current_duration_ratio,
{"hold_potential_enabled": True, "hold_potential_scale": 1.0},
)
_t1, _s1, _n1, _pbrs_delta, _entry_additive, _exit_additive = apply_potential_shaping(
base_reward=0.0,
current_pnl=0.05,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=0.3,
next_pnl=0.0,
next_duration_ratio=0.0,
_t2, _s2, _n2, _pbrs_delta2, _entry_additive2, _exit_additive2 = apply_potential_shaping(
base_reward=0.0,
current_pnl=0.02,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=0.1,
next_pnl=0.0,
next_duration_ratio=0.0,
apply_potential_shaping(
base_reward=0.0,
current_pnl=0.0,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=0.0,
next_pnl=0.0,
next_duration_ratio=0.0,
res_nan = apply_potential_shaping(
base_reward=0.1,
current_pnl=0.03,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=0.2,
next_pnl=0.035,
next_duration_ratio=0.25,
res_ref = apply_potential_shaping(
base_reward=0.1,
current_pnl=0.03,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=0.2,
next_pnl=0.035,
next_duration_ratio=0.25,
ctx_pnl = 0.012
ctx_dur_ratio = 0.3
params_can = self.base_params(exit_potential_mode="canonical", **base_common)
- prev_phi = _compute_hold_potential(ctx_pnl, ctx_dur_ratio, params_can)
+ prev_phi = _compute_hold_potential(
+ ctx_pnl, self.TEST_PROFIT_TARGET, ctx_dur_ratio, params_can
+ )
self.assertFinite(prev_phi, name="prev_phi")
next_phi_can = _compute_exit_potential(prev_phi, params_can)
self.assertAlmostEqualFloat(
apply_potential_shaping(
base_reward=0.0,
current_pnl=0.02,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=0.3,
next_pnl=0.025,
next_duration_ratio=0.35,
apply_potential_shaping(
base_reward=0.0,
current_pnl=current_pnl,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=current_dur,
next_pnl=next_pnl,
next_duration_ratio=next_dur,
apply_potential_shaping(
base_reward=0.0,
current_pnl=float(rng.normal(0, 0.07)),
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=float(rng.uniform(0, 1)),
next_pnl=next_pnl,
next_duration_ratio=next_dur,
comp_share = _pd.Series([], dtype=float)
action_summary = _pd.DataFrame(
- columns=["count", "mean", "std", "min", "max"],
+ columns=_pd.Index(["count", "mean", "std", "min", "max"]),
index=_pd.Index([], name="action"),
)
component_bounds = _pd.DataFrame(
- columns=["component_min", "component_mean", "component_max"],
+ columns=_pd.Index(["component_min", "component_mean", "component_max"]),
index=_pd.Index([], name="component"),
)
global_stats = _pd.Series([], dtype=float)
)
def test_negative_slope_sanitization(self):
- """Negative exit_linear_slope is sanitized to 0.0; resulting exit factors must match slope=0.0 within tolerance."""
+ """Negative exit_linear_slope is sanitized to 1.0; resulting exit factors must match slope=1.0 within tolerance."""
base_factor = 100.0
pnl = 0.03
pnl_factor = 1.0
exit_attenuation_mode="linear", exit_linear_slope=-5.0, exit_plateau=False
)
params_ref = self.base_params(
- exit_attenuation_mode="linear", exit_linear_slope=0.0, exit_plateau=False
+ exit_attenuation_mode="linear", exit_linear_slope=1.0, exit_plateau=False
)
for dr in duration_ratios:
f_bad = _get_exit_factor(base_factor, pnl, pnl_factor, dr, params_bad)
apply_potential_shaping(
base_reward=0.0,
current_pnl=current_pnl,
+ pnl_target=self.TEST_PROFIT_TARGET,
current_duration_ratio=current_dur,
next_pnl=next_pnl,
next_duration_ratio=next_dur,
_LOG_2: Final[float] = math.log(2.0)
+ DEFAULT_MAX_TRADE_DURATION_CANDLES: Final[int] = 128
DEFAULT_IDLE_DURATION_MULTIPLIER: Final[int] = 4
+
DEFAULT_BASE_FACTOR: Final[float] = 100.0
- DEFAULT_HOLD_POTENTIAL_SCALE: Final[float] = 1.0
DEFAULT_EFFICIENCY_WEIGHT: Final[float] = 1.0
- DEFAULT_MAX_TRADE_DURATION_CANDLES: Final[int] = 128
+
+ DEFAULT_EXIT_POTENTIAL_DECAY: Final[float] = 0.5
+ DEFAULT_ENTRY_ADDITIVE_ENABLED: Final[bool] = False
+ DEFAULT_ENTRY_ADDITIVE_SCALE: Final[float] = 1.0
+ DEFAULT_ENTRY_ADDITIVE_GAIN: Final[float] = 1.0
+ DEFAULT_HOLD_POTENTIAL_ENABLED: Final[bool] = True
+ DEFAULT_HOLD_POTENTIAL_SCALE: Final[float] = 1.0
+ DEFAULT_HOLD_POTENTIAL_GAIN: Final[float] = 1.0
+ DEFAULT_EXIT_ADDITIVE_ENABLED: Final[bool] = False
+ DEFAULT_EXIT_ADDITIVE_SCALE: Final[float] = 1.0
+ DEFAULT_EXIT_ADDITIVE_GAIN: Final[float] = 1.0
+
+ DEFAULT_EXIT_PLATEAU: Final[bool] = True
+ DEFAULT_EXIT_PLATEAU_GRACE: Final[float] = 1.0
+ DEFAULT_EXIT_LINEAR_SLOPE: Final[float] = 1.0
+ DEFAULT_EXIT_HALF_LIFE: Final[float] = 0.5
+
+ DEFAULT_PNL_FACTOR_BETA: Final[float] = 0.5
+ DEFAULT_WIN_REWARD_FACTOR: Final[float] = 2.0
+ DEFAULT_EFFICIENCY_CENTER: Final[float] = 0.5
+
+ DEFAULT_INVALID_ACTION: Final[float] = -2.0
+ DEFAULT_IDLE_PENALTY_SCALE: Final[float] = 0.5
+ DEFAULT_IDLE_PENALTY_POWER: Final[float] = 1.025
+ DEFAULT_HOLD_PENALTY_SCALE: Final[float] = 0.25
+ DEFAULT_HOLD_PENALTY_POWER: Final[float] = 1.025
+
+ DEFAULT_CHECK_INVARIANTS: Final[bool] = True
+ DEFAULT_EXIT_FACTOR_THRESHOLD: Final[float] = 10_000.0
_MODEL_TYPES: Final[Tuple[ModelType, ...]] = (
"PPO",
0
] # "canonical"
self._exit_potential_decay: float = float(
- model_reward_parameters.get("exit_potential_decay", 0.5)
+ model_reward_parameters.get(
+ "exit_potential_decay", ReforceXY.DEFAULT_EXIT_POTENTIAL_DECAY
+ )
)
# === ENTRY ADDITIVE (non-PBRS additive term) ===
self._entry_additive_enabled: bool = bool(
- model_reward_parameters.get("entry_additive_enabled", False)
+ model_reward_parameters.get(
+ "entry_additive_enabled", ReforceXY.DEFAULT_ENTRY_ADDITIVE_ENABLED
+ )
)
self._entry_additive_scale: float = float(
- model_reward_parameters.get("entry_additive_scale", 1.0)
+ model_reward_parameters.get(
+ "entry_additive_scale", ReforceXY.DEFAULT_ENTRY_ADDITIVE_SCALE
+ )
)
self._entry_additive_gain: float = float(
- model_reward_parameters.get("entry_additive_gain", 1.0)
+ model_reward_parameters.get(
+ "entry_additive_gain", ReforceXY.DEFAULT_ENTRY_ADDITIVE_GAIN
+ )
)
self._entry_additive_transform_pnl: TransformFunction = cast(
TransformFunction,
)
# === HOLD POTENTIAL (PBRS function Φ) ===
self._hold_potential_enabled: bool = bool(
- model_reward_parameters.get("hold_potential_enabled", True)
+ model_reward_parameters.get(
+ "hold_potential_enabled", ReforceXY.DEFAULT_HOLD_POTENTIAL_ENABLED
+ )
)
self._hold_potential_scale: float = float(
model_reward_parameters.get(
)
)
self._hold_potential_gain: float = float(
- model_reward_parameters.get("hold_potential_gain", 1.0)
+ model_reward_parameters.get(
+ "hold_potential_gain", ReforceXY.DEFAULT_HOLD_POTENTIAL_GAIN
+ )
)
self._hold_potential_transform_pnl: TransformFunction = cast(
TransformFunction,
)
# === EXIT ADDITIVE (non-PBRS additive term) ===
self._exit_additive_enabled: bool = bool(
- model_reward_parameters.get("exit_additive_enabled", False)
+ model_reward_parameters.get(
+ "exit_additive_enabled", ReforceXY.DEFAULT_EXIT_ADDITIVE_ENABLED
+ )
)
self._exit_additive_scale: float = float(
- model_reward_parameters.get("exit_additive_scale", 1.0)
+ model_reward_parameters.get(
+ "exit_additive_scale", ReforceXY.DEFAULT_EXIT_ADDITIVE_SCALE
+ )
)
self._exit_additive_gain: float = float(
- model_reward_parameters.get("exit_additive_gain", 1.0)
+ model_reward_parameters.get(
+ "exit_additive_gain", ReforceXY.DEFAULT_EXIT_ADDITIVE_GAIN
+ )
)
self._exit_additive_transform_pnl: TransformFunction = cast(
TransformFunction,
"exit_attenuation_mode", ReforceXY._EXIT_ATTENUATION_MODES[2]
) # "linear"
)
- exit_plateau = bool(model_reward_parameters.get("exit_plateau", True))
+ exit_plateau = bool(
+ model_reward_parameters.get("exit_plateau", ReforceXY.DEFAULT_EXIT_PLATEAU)
+ )
exit_plateau_grace = float(
- model_reward_parameters.get("exit_plateau_grace", 1.0)
+ model_reward_parameters.get(
+ "exit_plateau_grace", ReforceXY.DEFAULT_EXIT_PLATEAU_GRACE
+ )
)
if exit_plateau_grace < 0.0:
exit_plateau_grace = 0.0
return f / math.sqrt(1.0 + dr)
def _linear(f: float, dr: float, p: Mapping[str, Any]) -> float:
- slope = float(p.get("exit_linear_slope", 1.0))
+ slope = float(
+ p.get("exit_linear_slope", ReforceXY.DEFAULT_EXIT_LINEAR_SLOPE)
+ )
if slope < 0.0:
slope = 1.0
return f / (1.0 + slope * dr)
return f / math.pow(1.0 + dr, alpha)
def _half_life(f: float, dr: float, p: Mapping[str, Any]) -> float:
- hl = float(p.get("exit_half_life", 0.5))
+ hl = float(p.get("exit_half_life", ReforceXY.DEFAULT_EXIT_HALF_LIFE))
if np.isclose(hl, 0.0) or hl < 0.0:
return 1.0
return f * math.pow(2.0, -dr / hl)
pnl, self._pnl_target, model_reward_parameters
)
- check_invariants = model_reward_parameters.get("check_invariants", True)
+ check_invariants = model_reward_parameters.get(
+ "check_invariants", ReforceXY.DEFAULT_CHECK_INVARIANTS
+ )
check_invariants = (
check_invariants if isinstance(check_invariants, bool) else True
)
)
factor = 0.0
exit_factor_threshold = float(
- model_reward_parameters.get("exit_factor_threshold", 10_000.0)
+ model_reward_parameters.get(
+ "exit_factor_threshold", ReforceXY.DEFAULT_EXIT_FACTOR_THRESHOLD
+ )
)
if exit_factor_threshold > 0 and abs(factor) > exit_factor_threshold:
logger.warning(
pnl_target_factor = 1.0
if pnl_target > 0.0:
- pnl_factor_beta = float(model_reward_parameters.get("pnl_factor_beta", 0.5))
+ pnl_factor_beta = float(
+ model_reward_parameters.get(
+ "pnl_factor_beta", ReforceXY.DEFAULT_PNL_FACTOR_BETA
+ )
+ )
pnl_ratio = pnl / pnl_target
if abs(pnl_ratio) > 1.0:
pnl_factor_beta * (abs(pnl_ratio) - 1.0)
)
win_reward_factor = float(
- model_reward_parameters.get("win_reward_factor", 2.0)
+ model_reward_parameters.get(
+ "win_reward_factor", ReforceXY.DEFAULT_WIN_REWARD_FACTOR
+ )
)
if pnl_ratio > 1.0:
"efficiency_weight", ReforceXY.DEFAULT_EFFICIENCY_WEIGHT
)
)
- efficiency_center = float(model_reward_parameters.get("efficiency_center", 0.5))
+ efficiency_center = float(
+ model_reward_parameters.get(
+ "efficiency_center", ReforceXY.DEFAULT_EFFICIENCY_CENTER
+ )
+ )
efficiency_factor = 1.0
if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0):
"""
Combine PnL target and efficiency factors (>= 0.0)
"""
- if not np.isfinite(pnl):
- return 0.0
-
pnl_target_factor = self._compute_pnl_target_factor(
pnl, pnl_target, model_reward_parameters
)
# 1. Invalid action
if not self.action_masking and not self._is_valid(action):
self.tensorboard_log("invalid", category="actions")
- base_reward = float(model_reward_parameters.get("invalid_action", -2.0))
+ base_reward = float(
+ model_reward_parameters.get(
+ "invalid_action", ReforceXY.DEFAULT_INVALID_ACTION
+ )
+ )
self._last_invalid_penalty = float(base_reward)
max_trade_duration = max(1, self.max_trade_duration_candles)
trade_duration = self.get_trade_duration()
duration_ratio = trade_duration / max_trade_duration
- base_factor = float(model_reward_parameters.get("base_factor", 100.0))
+ base_factor = float(
+ model_reward_parameters.get("base_factor", ReforceXY.DEFAULT_BASE_FACTOR)
+ )
idle_factor = base_factor * self._pnl_target / 4.0
hold_factor = idle_factor
):
max_idle_duration = max(1, self.max_idle_duration_candles)
idle_penalty_scale = float(
- model_reward_parameters.get("idle_penalty_scale", 0.5)
+ model_reward_parameters.get(
+ "idle_penalty_scale", ReforceXY.DEFAULT_IDLE_PENALTY_SCALE
+ )
)
idle_penalty_power = float(
- model_reward_parameters.get("idle_penalty_power", 1.025)
+ model_reward_parameters.get(
+ "idle_penalty_power", ReforceXY.DEFAULT_IDLE_PENALTY_POWER
+ )
)
idle_duration = self.get_idle_duration()
idle_duration_ratio = idle_duration / max(1, max_idle_duration)
and action == Actions.Neutral.value
):
hold_penalty_scale = float(
- model_reward_parameters.get("hold_penalty_scale", 0.25)
+ model_reward_parameters.get(
+ "hold_penalty_scale", ReforceXY.DEFAULT_HOLD_PENALTY_SCALE
+ )
)
hold_penalty_power = float(
- model_reward_parameters.get("hold_penalty_power", 1.025)
+ model_reward_parameters.get(
+ "hold_penalty_power", ReforceXY.DEFAULT_HOLD_PENALTY_POWER
+ )
)
if duration_ratio < 1.0:
base_reward = 0.0