From: Jérôme Benoit Date: Wed, 8 Oct 2025 23:02:38 +0000 (+0200) Subject: fix(reforcexy): fix efficiency factor computation logic X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=42b1767d48ebf4ec679e96bbe0a77b5b1a791095;p=freqai-strategies.git fix(reforcexy): fix efficiency factor computation logic Signed-off-by: Jérôme Benoit --- diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index 4e3ebad..3cf8384 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -89,7 +89,7 @@ def _get_param_float(params: Dict[str, float | str], key: str, default: float) - fval = float(value) except (ValueError, TypeError): return default - return fval if math.isfinite(fval) else default + return fval if np.isfinite(fval) else default # String parsing if isinstance(value, str): stripped = value.strip() @@ -99,7 +99,7 @@ def _get_param_float(params: Dict[str, float | str], key: str, default: float) - fval = float(stripped) except ValueError: return default - return fval if math.isfinite(fval) else default + return fval if np.isfinite(fval) else default # Unsupported type return default @@ -247,10 +247,10 @@ def validate_reward_parameters( if "max" in bounds and adjusted > bounds["max"]: adjusted = bounds["max"] reason_parts.append(f"max={bounds['max']}") - if not math.isfinite(adjusted): + if not np.isfinite(adjusted): adjusted = bounds.get("min", 0.0) reason_parts.append("non_finite_reset") - if not math.isclose(adjusted, original): + if not np.isclose(adjusted, original): sanitized[key] = adjusted adjustments[key] = { "original": original, @@ -344,9 +344,9 @@ def _get_exit_factor( """ # Basic finiteness checks if ( - not math.isfinite(base_factor) - or not math.isfinite(pnl) - or not math.isfinite(duration_ratio) + not np.isfinite(base_factor) + or not np.isfinite(pnl) + or not np.isfinite(duration_ratio) ): return 0.0 @@ -425,13 +425,13 @@ def _get_exit_factor( # Invariant & safety checks if _to_bool(params.get("check_invariants", True)): - if not math.isfinite(base_factor): + if not np.isfinite(base_factor): return 0.0 if base_factor < 0.0 and pnl >= 0.0: # Clamp: avoid negative amplification on non-negative pnl base_factor = 0.0 thr = params.get("exit_factor_threshold") - if isinstance(thr, (int, float)) and thr > 0 and math.isfinite(thr): + if isinstance(thr, (int, float)) and thr > 0 and np.isfinite(thr): if abs(base_factor) > thr: warnings.warn( ( @@ -450,7 +450,7 @@ def _get_pnl_factor( """Env-aligned PnL factor combining profit amplification and exit efficiency.""" pnl = context.pnl - if not math.isfinite(pnl) or not math.isfinite(profit_target): + if not np.isfinite(pnl) or not np.isfinite(profit_target): return 0.0 profit_target_factor = 1.0 @@ -465,15 +465,20 @@ def _get_pnl_factor( efficiency_factor = 1.0 efficiency_weight = float(params.get("efficiency_weight", 1.0)) efficiency_center = float(params.get("efficiency_center", 0.5)) - if efficiency_weight != 0.0 and pnl >= 0.0: + if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0): max_pnl = max(context.max_unrealized_profit, pnl) min_pnl = min(context.min_unrealized_profit, pnl) range_pnl = max_pnl - min_pnl - if math.isfinite(range_pnl) and not math.isclose(range_pnl, 0.0): + if np.isfinite(range_pnl) and not np.isclose(range_pnl, 0.0): efficiency_ratio = (pnl - min_pnl) / range_pnl - efficiency_factor = 1.0 + efficiency_weight * ( - efficiency_ratio - efficiency_center - ) + if pnl > 0.0: + efficiency_factor = 1.0 + efficiency_weight * ( + efficiency_ratio - efficiency_center + ) + elif pnl < 0.0: + efficiency_factor = 1.0 + efficiency_weight * ( + efficiency_center - efficiency_ratio + ) return max(0.0, profit_target_factor * efficiency_factor) @@ -1008,7 +1013,7 @@ def _compute_relationship_stats( trade_bins = np.linspace(0, max_trade_duration * 3.0, 13) pnl_min = float(df["pnl"].min()) pnl_max = float(df["pnl"].max()) - if math.isclose(pnl_min, pnl_max): + if np.isclose(pnl_min, pnl_max): pnl_max = pnl_min + 1e-6 pnl_bins = np.linspace(pnl_min, pnl_max, 13) @@ -1357,7 +1362,7 @@ def compute_distribution_shift_metrics( # Guard against degenerate distributions (all values identical) if not np.isfinite(min_val) or not np.isfinite(max_val): continue - if math.isclose(max_val, min_val, rel_tol=0, abs_tol=1e-12): + if np.isclose(max_val, min_val, rel_tol=0, abs_tol=1e-12): # All mass at a single point -> shift metrics are all zero by definition metrics[f"{feature}_kl_divergence"] = 0.0 metrics[f"{feature}_js_distance"] = 0.0 diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py index 92bc2ac..9bfe856 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py @@ -31,6 +31,7 @@ try: Positions, RewardContext, _get_exit_factor, + _get_pnl_factor, bootstrap_confidence_intervals, build_argument_parser, calculate_reward, @@ -82,7 +83,7 @@ class RewardSpaceTestBase(unittest.TestCase): msg: str | None = None, ) -> None: """Absolute tolerance compare with explicit failure and finite check.""" - if not (math.isfinite(first) and math.isfinite(second)): + if not (np.isfinite(first) and np.isfinite(second)): self.fail(msg or f"Non-finite comparison (a={first}, b={second})") diff = abs(first - second) if diff > tolerance: @@ -483,6 +484,33 @@ class TestRewardAlignment(RewardSpaceTestBase): "Take profit reward magnitude should exceed stop loss reward magnitude", ) + def test_efficiency_zero_policy(self): + """Ensure pnl == 0 with max_unrealized_profit == 0 does not get boosted. + + This verifies the policy: near-zero pnl -> no efficiency modulation. + """ + + # Build context where pnl == 0.0 and max_unrealized_profit == pnl + ctx = RewardContext( + pnl=0.0, + trade_duration=1, + idle_duration=0, + max_trade_duration=100, + max_unrealized_profit=0.0, + min_unrealized_profit=-0.02, + position=Positions.Long, + action=Actions.Long_exit, + force_action=None, + ) + + params = self.DEFAULT_PARAMS.copy() + profit_target = self.TEST_PROFIT_TARGET * self.TEST_RR + + pnl_factor = _get_pnl_factor(params, ctx, profit_target) + # Expect no efficiency modulation: factor should be >= 0 and close to 1.0 + self.assertTrue(np.isfinite(pnl_factor)) + self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=1e-6) + def test_max_idle_duration_candles_logic(self): """Idle penalty scaling test with explicit max_idle_duration_candles.""" params_small = self.DEFAULT_PARAMS.copy() @@ -914,7 +942,7 @@ class TestRewardAlignment(RewardSpaceTestBase): asymptote = 1.0 + win_reward_factor final_ratio = ratios_observed[-1] # Expect to be very close to asymptote (tanh(0.5*(10-1)) ≈ 0.9997) - if not math.isfinite(final_ratio): + if not np.isfinite(final_ratio): self.fail(f"Final ratio is not finite: {final_ratio}") self.assertLess( abs(final_ratio - asymptote), @@ -930,7 +958,7 @@ class TestRewardAlignment(RewardSpaceTestBase): expected_ratios.append(expected) # Compare each observed to expected within loose tolerance (model parity) for obs, exp in zip(ratios_observed, expected_ratios): - if not (math.isfinite(obs) and math.isfinite(exp)): + if not (np.isfinite(obs) and np.isfinite(exp)): self.fail(f"Non-finite observed/expected ratio: obs={obs}, exp={exp}") self.assertLess( abs(obs - exp), diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index 593064c..01fa6d8 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -1526,15 +1526,20 @@ class MyRLEnv(Base5ActionRLEnv): efficiency_factor = 1.0 efficiency_weight = float(model_reward_parameters.get("efficiency_weight", 1.0)) efficiency_center = float(model_reward_parameters.get("efficiency_center", 0.5)) - if efficiency_weight != 0.0 and pnl >= 0.0: + if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0): max_pnl = max(self.get_max_unrealized_profit(), pnl) min_pnl = min(self.get_min_unrealized_profit(), pnl) range_pnl = max_pnl - min_pnl if np.isfinite(range_pnl) and not np.isclose(range_pnl, 0.0): efficiency_ratio = (pnl - min_pnl) / range_pnl - efficiency_factor = 1.0 + efficiency_weight * ( - efficiency_ratio - efficiency_center - ) + if pnl > 0.0: + efficiency_factor = 1.0 + efficiency_weight * ( + efficiency_ratio - efficiency_center + ) + elif pnl < 0.0: + efficiency_factor = 1.0 + efficiency_weight * ( + efficiency_center - efficiency_ratio + ) return max(0.0, pnl_target_factor * efficiency_factor)