]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(reforcexy): cleanup rsa code
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 9 Oct 2025 22:18:01 +0000 (00:18 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 9 Oct 2025 22:18:01 +0000 (00:18 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/reward_space_analysis/reward_space_analysis.py
ReforceXY/reward_space_analysis/test_reward_space_analysis.py

index eaf9b4f759ab30fbbfe6b6a1a6b6571d96774387..36fea29ebb2392f3a25c9aa7ba8a4c893e4433a0 100644 (file)
@@ -374,13 +374,13 @@ def _get_exit_factor(
         return f / (1.0 + exit_linear_slope * dr)
 
     def _power_kernel(f: float, dr: float) -> float:
-        tau = params.get("exit_power_tau")
-        if isinstance(tau, (int, float)):
-            tau = float(tau)
-            if 0.0 < tau <= 1.0:
-                alpha = -math.log(tau) / _LOG_2
-            else:
-                alpha = 1.0
+        tau = _get_param_float(
+            params,
+            "exit_power_tau",
+            DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_power_tau", 0.5),
+        )
+        if 0.0 < tau <= 1.0:
+            alpha = -math.log(tau) / _LOG_2
         else:
             alpha = 1.0
         return f / math.pow(1.0 + dr, alpha)
@@ -430,12 +430,16 @@ def _get_exit_factor(
         if base_factor < 0.0 and pnl >= 0.0:
             # Clamp: avoid negative amplification on non-negative pnl
             base_factor = 0.0
-        thr = params.get("exit_factor_threshold")
-        if isinstance(thr, (int, float)) and thr > 0 and np.isfinite(thr):
-            if abs(base_factor) > thr:
+        exit_factor_threshold = _get_param_float(
+            params,
+            "exit_factor_threshold",
+            DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_factor_threshold", 10000.0),
+        )
+        if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold):
+            if abs(base_factor) > exit_factor_threshold:
                 warnings.warn(
                     (
-                        f"_get_exit_factor |factor|={abs(base_factor):.2f} exceeds threshold {thr:.2f}"
+                        f"_get_exit_factor |factor|={abs(base_factor):.2f} exceeds threshold {exit_factor_threshold:.2f}"
                     ),
                     RuntimeWarning,
                     stacklevel=2,
@@ -455,16 +459,32 @@ def _get_pnl_factor(
 
     profit_target_factor = 1.0
     if profit_target > 0.0 and pnl > profit_target:
-        win_reward_factor = float(params.get("win_reward_factor", 2.0))
-        pnl_factor_beta = float(params.get("pnl_factor_beta", 0.5))
+        win_reward_factor = _get_param_float(
+            params,
+            "win_reward_factor",
+            DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0),
+        )
+        pnl_factor_beta = _get_param_float(
+            params,
+            "pnl_factor_beta",
+            DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
+        )
         pnl_ratio = pnl / profit_target
         profit_target_factor = 1.0 + win_reward_factor * math.tanh(
             pnl_factor_beta * (pnl_ratio - 1.0)
         )
 
     efficiency_factor = 1.0
-    efficiency_weight = float(params.get("efficiency_weight", 1.0))
-    efficiency_center = float(params.get("efficiency_center", 0.5))
+    efficiency_weight = _get_param_float(
+        params,
+        "efficiency_weight",
+        DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_weight", 1.0),
+    )
+    efficiency_center = _get_param_float(
+        params,
+        "efficiency_center",
+        DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_center", 0.5),
+    )
     if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0):
         max_pnl = max(context.max_unrealized_profit, pnl)
         min_pnl = min(context.min_unrealized_profit, pnl)
@@ -615,15 +635,13 @@ def calculate_reward(
 
     factor = _get_param_float(params, "base_factor", base_factor)
 
-    profit_target_override = params.get("profit_target")
-    if isinstance(profit_target_override, (int, float)):
-        profit_target = float(profit_target_override)
+    if "profit_target" in params:
+        profit_target = _get_param_float(params, "profit_target", float(profit_target))
 
-    rr_override = params.get("rr")
-    if not isinstance(rr_override, (int, float)):
-        rr_override = params.get("risk_reward_ratio")
-    if isinstance(rr_override, (int, float)):
-        risk_reward_ratio = float(rr_override)
+    if "risk_reward_ratio" in params:
+        risk_reward_ratio = _get_param_float(
+            params, "risk_reward_ratio", float(risk_reward_ratio)
+        )
 
     # Scale profit target by risk-reward ratio (reward multiplier)
     # E.g., profit_target=0.03, RR=2.0 → profit_target_final=0.06
@@ -2396,12 +2414,11 @@ def main() -> None:
     params_validated, adjustments = validate_reward_parameters(params)
     params = params_validated
 
-    base_factor = float(params.get("base_factor", args.base_factor))
-    profit_target = float(params.get("profit_target", args.profit_target))
-    rr_override = params.get("rr")
-    if not isinstance(rr_override, (int, float)):
-        rr_override = params.get("risk_reward_ratio", args.risk_reward_ratio)
-    risk_reward_ratio = float(rr_override)
+    base_factor = _get_param_float(params, "base_factor", float(args.base_factor))
+    profit_target = _get_param_float(params, "profit_target", float(args.profit_target))
+    risk_reward_ratio = _get_param_float(
+        params, "risk_reward_ratio", float(args.risk_reward_ratio)
+    )
 
     cli_action_masking = _to_bool(args.action_masking)
     if "action_masking" in params:
index 3d70992b59b93627009e1bc22aba275e3617f276..6212d925657f2af5a5366f4309819e36c7d855bf 100644 (file)
@@ -31,6 +31,7 @@ try:
         Positions,
         RewardContext,
         _get_exit_factor,
+        _get_param_float,
         _get_pnl_factor,
         bootstrap_confidence_intervals,
         build_argument_parser,
@@ -636,10 +637,10 @@ class TestRewardAlignment(RewardSpaceTestBase):
             action_masking=True,
         )
         self.assertLess(br_mid.idle_penalty, 0.0)
-        idle_penalty_scale = float(params.get("idle_penalty_scale", 0.5))
-        idle_penalty_power = float(params.get("idle_penalty_power", 1.025))
+        idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 0.5)
+        idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.025)
         # Internal factor may come from params (overrides provided base_factor argument)
-        factor_used = float(params.get("base_factor", base_factor))
+        factor_used = _get_param_float(params, "base_factor", float(base_factor))
         idle_factor = factor_used * (profit_target * risk_reward_ratio) / 3.0
         observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale)
         if observed_ratio > 0:
@@ -660,7 +661,9 @@ class TestRewardAlignment(RewardSpaceTestBase):
         params = self.DEFAULT_PARAMS.copy()
         # Remove base_factor from params so that the function uses the provided argument (makes scaling observable)
         params.pop("base_factor", None)
-        threshold = float(params.get("exit_factor_threshold", 10_000.0))
+        exit_factor_threshold = _get_param_float(
+            params, "exit_factor_threshold", 10_000.0
+        )
 
         context = RewardContext(
             pnl=0.08,  # above typical profit_target * RR
@@ -688,7 +691,7 @@ class TestRewardAlignment(RewardSpaceTestBase):
         # Amplified: choose a much larger base_factor (ensure > threshold relative scale)
         amplified_base_factor = max(
             self.TEST_BASE_FACTOR * 50,
-            threshold * self.TEST_RR_HIGH / max(context.pnl, 1e-9),
+            exit_factor_threshold * self.TEST_RR_HIGH / max(context.pnl, 1e-9),
         )
         amplified = calculate_reward(
             context,