return f / (1.0 + exit_linear_slope * dr)
def _power_kernel(f: float, dr: float) -> float:
- tau = params.get("exit_power_tau")
- if isinstance(tau, (int, float)):
- tau = float(tau)
- if 0.0 < tau <= 1.0:
- alpha = -math.log(tau) / _LOG_2
- else:
- alpha = 1.0
+ tau = _get_param_float(
+ params,
+ "exit_power_tau",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_power_tau", 0.5),
+ )
+ if 0.0 < tau <= 1.0:
+ alpha = -math.log(tau) / _LOG_2
else:
alpha = 1.0
return f / math.pow(1.0 + dr, alpha)
if base_factor < 0.0 and pnl >= 0.0:
# Clamp: avoid negative amplification on non-negative pnl
base_factor = 0.0
- thr = params.get("exit_factor_threshold")
- if isinstance(thr, (int, float)) and thr > 0 and np.isfinite(thr):
- if abs(base_factor) > thr:
+ exit_factor_threshold = _get_param_float(
+ params,
+ "exit_factor_threshold",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_factor_threshold", 10000.0),
+ )
+ if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold):
+ if abs(base_factor) > exit_factor_threshold:
warnings.warn(
(
- f"_get_exit_factor |factor|={abs(base_factor):.2f} exceeds threshold {thr:.2f}"
+ f"_get_exit_factor |factor|={abs(base_factor):.2f} exceeds threshold {exit_factor_threshold:.2f}"
),
RuntimeWarning,
stacklevel=2,
profit_target_factor = 1.0
if profit_target > 0.0 and pnl > profit_target:
- win_reward_factor = float(params.get("win_reward_factor", 2.0))
- pnl_factor_beta = float(params.get("pnl_factor_beta", 0.5))
+ win_reward_factor = _get_param_float(
+ params,
+ "win_reward_factor",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0),
+ )
+ pnl_factor_beta = _get_param_float(
+ params,
+ "pnl_factor_beta",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
+ )
pnl_ratio = pnl / profit_target
profit_target_factor = 1.0 + win_reward_factor * math.tanh(
pnl_factor_beta * (pnl_ratio - 1.0)
)
efficiency_factor = 1.0
- efficiency_weight = float(params.get("efficiency_weight", 1.0))
- efficiency_center = float(params.get("efficiency_center", 0.5))
+ efficiency_weight = _get_param_float(
+ params,
+ "efficiency_weight",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_weight", 1.0),
+ )
+ efficiency_center = _get_param_float(
+ params,
+ "efficiency_center",
+ DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_center", 0.5),
+ )
if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0):
max_pnl = max(context.max_unrealized_profit, pnl)
min_pnl = min(context.min_unrealized_profit, pnl)
factor = _get_param_float(params, "base_factor", base_factor)
- profit_target_override = params.get("profit_target")
- if isinstance(profit_target_override, (int, float)):
- profit_target = float(profit_target_override)
+ if "profit_target" in params:
+ profit_target = _get_param_float(params, "profit_target", float(profit_target))
- rr_override = params.get("rr")
- if not isinstance(rr_override, (int, float)):
- rr_override = params.get("risk_reward_ratio")
- if isinstance(rr_override, (int, float)):
- risk_reward_ratio = float(rr_override)
+ if "risk_reward_ratio" in params:
+ risk_reward_ratio = _get_param_float(
+ params, "risk_reward_ratio", float(risk_reward_ratio)
+ )
# Scale profit target by risk-reward ratio (reward multiplier)
# E.g., profit_target=0.03, RR=2.0 → profit_target_final=0.06
params_validated, adjustments = validate_reward_parameters(params)
params = params_validated
- base_factor = float(params.get("base_factor", args.base_factor))
- profit_target = float(params.get("profit_target", args.profit_target))
- rr_override = params.get("rr")
- if not isinstance(rr_override, (int, float)):
- rr_override = params.get("risk_reward_ratio", args.risk_reward_ratio)
- risk_reward_ratio = float(rr_override)
+ base_factor = _get_param_float(params, "base_factor", float(args.base_factor))
+ profit_target = _get_param_float(params, "profit_target", float(args.profit_target))
+ risk_reward_ratio = _get_param_float(
+ params, "risk_reward_ratio", float(args.risk_reward_ratio)
+ )
cli_action_masking = _to_bool(args.action_masking)
if "action_masking" in params:
Positions,
RewardContext,
_get_exit_factor,
+ _get_param_float,
_get_pnl_factor,
bootstrap_confidence_intervals,
build_argument_parser,
action_masking=True,
)
self.assertLess(br_mid.idle_penalty, 0.0)
- idle_penalty_scale = float(params.get("idle_penalty_scale", 0.5))
- idle_penalty_power = float(params.get("idle_penalty_power", 1.025))
+ idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 0.5)
+ idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.025)
# Internal factor may come from params (overrides provided base_factor argument)
- factor_used = float(params.get("base_factor", base_factor))
+ factor_used = _get_param_float(params, "base_factor", float(base_factor))
idle_factor = factor_used * (profit_target * risk_reward_ratio) / 3.0
observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale)
if observed_ratio > 0:
params = self.DEFAULT_PARAMS.copy()
# Remove base_factor from params so that the function uses the provided argument (makes scaling observable)
params.pop("base_factor", None)
- threshold = float(params.get("exit_factor_threshold", 10_000.0))
+ exit_factor_threshold = _get_param_float(
+ params, "exit_factor_threshold", 10_000.0
+ )
context = RewardContext(
pnl=0.08, # above typical profit_target * RR
# Amplified: choose a much larger base_factor (ensure > threshold relative scale)
amplified_base_factor = max(
self.TEST_BASE_FACTOR * 50,
- threshold * self.TEST_RR_HIGH / max(context.pnl, 1e-9),
+ exit_factor_threshold * self.TEST_RR_HIGH / max(context.pnl, 1e-9),
)
amplified = calculate_reward(
context,