From: Jérôme Benoit Date: Thu, 9 Oct 2025 22:18:01 +0000 (+0200) Subject: refactor(reforcexy): cleanup rsa code X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=d9a355c00597b45d535e00443b7fd9302424a362;p=freqai-strategies.git refactor(reforcexy): cleanup rsa code Signed-off-by: Jérôme Benoit --- diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index eaf9b4f..36fea29 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -374,13 +374,13 @@ def _get_exit_factor( return f / (1.0 + exit_linear_slope * dr) def _power_kernel(f: float, dr: float) -> float: - tau = params.get("exit_power_tau") - if isinstance(tau, (int, float)): - tau = float(tau) - if 0.0 < tau <= 1.0: - alpha = -math.log(tau) / _LOG_2 - else: - alpha = 1.0 + tau = _get_param_float( + params, + "exit_power_tau", + DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_power_tau", 0.5), + ) + if 0.0 < tau <= 1.0: + alpha = -math.log(tau) / _LOG_2 else: alpha = 1.0 return f / math.pow(1.0 + dr, alpha) @@ -430,12 +430,16 @@ def _get_exit_factor( if base_factor < 0.0 and pnl >= 0.0: # Clamp: avoid negative amplification on non-negative pnl base_factor = 0.0 - thr = params.get("exit_factor_threshold") - if isinstance(thr, (int, float)) and thr > 0 and np.isfinite(thr): - if abs(base_factor) > thr: + exit_factor_threshold = _get_param_float( + params, + "exit_factor_threshold", + DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_factor_threshold", 10000.0), + ) + if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold): + if abs(base_factor) > exit_factor_threshold: warnings.warn( ( - f"_get_exit_factor |factor|={abs(base_factor):.2f} exceeds threshold {thr:.2f}" + f"_get_exit_factor |factor|={abs(base_factor):.2f} exceeds threshold {exit_factor_threshold:.2f}" ), RuntimeWarning, stacklevel=2, @@ -455,16 +459,32 @@ def _get_pnl_factor( profit_target_factor = 1.0 if profit_target > 0.0 and pnl > profit_target: - win_reward_factor = float(params.get("win_reward_factor", 2.0)) - pnl_factor_beta = float(params.get("pnl_factor_beta", 0.5)) + win_reward_factor = _get_param_float( + params, + "win_reward_factor", + DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0), + ) + pnl_factor_beta = _get_param_float( + params, + "pnl_factor_beta", + DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5), + ) pnl_ratio = pnl / profit_target profit_target_factor = 1.0 + win_reward_factor * math.tanh( pnl_factor_beta * (pnl_ratio - 1.0) ) efficiency_factor = 1.0 - efficiency_weight = float(params.get("efficiency_weight", 1.0)) - efficiency_center = float(params.get("efficiency_center", 0.5)) + efficiency_weight = _get_param_float( + params, + "efficiency_weight", + DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_weight", 1.0), + ) + efficiency_center = _get_param_float( + params, + "efficiency_center", + DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_center", 0.5), + ) if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0): max_pnl = max(context.max_unrealized_profit, pnl) min_pnl = min(context.min_unrealized_profit, pnl) @@ -615,15 +635,13 @@ def calculate_reward( factor = _get_param_float(params, "base_factor", base_factor) - profit_target_override = params.get("profit_target") - if isinstance(profit_target_override, (int, float)): - profit_target = float(profit_target_override) + if "profit_target" in params: + profit_target = _get_param_float(params, "profit_target", float(profit_target)) - rr_override = params.get("rr") - if not isinstance(rr_override, (int, float)): - rr_override = params.get("risk_reward_ratio") - if isinstance(rr_override, (int, float)): - risk_reward_ratio = float(rr_override) + if "risk_reward_ratio" in params: + risk_reward_ratio = _get_param_float( + params, "risk_reward_ratio", float(risk_reward_ratio) + ) # Scale profit target by risk-reward ratio (reward multiplier) # E.g., profit_target=0.03, RR=2.0 → profit_target_final=0.06 @@ -2396,12 +2414,11 @@ def main() -> None: params_validated, adjustments = validate_reward_parameters(params) params = params_validated - base_factor = float(params.get("base_factor", args.base_factor)) - profit_target = float(params.get("profit_target", args.profit_target)) - rr_override = params.get("rr") - if not isinstance(rr_override, (int, float)): - rr_override = params.get("risk_reward_ratio", args.risk_reward_ratio) - risk_reward_ratio = float(rr_override) + base_factor = _get_param_float(params, "base_factor", float(args.base_factor)) + profit_target = _get_param_float(params, "profit_target", float(args.profit_target)) + risk_reward_ratio = _get_param_float( + params, "risk_reward_ratio", float(args.risk_reward_ratio) + ) cli_action_masking = _to_bool(args.action_masking) if "action_masking" in params: diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py index 3d70992..6212d92 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py @@ -31,6 +31,7 @@ try: Positions, RewardContext, _get_exit_factor, + _get_param_float, _get_pnl_factor, bootstrap_confidence_intervals, build_argument_parser, @@ -636,10 +637,10 @@ class TestRewardAlignment(RewardSpaceTestBase): action_masking=True, ) self.assertLess(br_mid.idle_penalty, 0.0) - idle_penalty_scale = float(params.get("idle_penalty_scale", 0.5)) - idle_penalty_power = float(params.get("idle_penalty_power", 1.025)) + idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 0.5) + idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.025) # Internal factor may come from params (overrides provided base_factor argument) - factor_used = float(params.get("base_factor", base_factor)) + factor_used = _get_param_float(params, "base_factor", float(base_factor)) idle_factor = factor_used * (profit_target * risk_reward_ratio) / 3.0 observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale) if observed_ratio > 0: @@ -660,7 +661,9 @@ class TestRewardAlignment(RewardSpaceTestBase): params = self.DEFAULT_PARAMS.copy() # Remove base_factor from params so that the function uses the provided argument (makes scaling observable) params.pop("base_factor", None) - threshold = float(params.get("exit_factor_threshold", 10_000.0)) + exit_factor_threshold = _get_param_float( + params, "exit_factor_threshold", 10_000.0 + ) context = RewardContext( pnl=0.08, # above typical profit_target * RR @@ -688,7 +691,7 @@ class TestRewardAlignment(RewardSpaceTestBase): # Amplified: choose a much larger base_factor (ensure > threshold relative scale) amplified_base_factor = max( self.TEST_BASE_FACTOR * 50, - threshold * self.TEST_RR_HIGH / max(context.pnl, 1e-9), + exit_factor_threshold * self.TEST_RR_HIGH / max(context.pnl, 1e-9), ) amplified = calculate_reward( context,