From: Jérôme Benoit Date: Sat, 11 Oct 2025 15:58:49 +0000 (+0200) Subject: refactor(reforcexy): remove force actions logic interfering with RL X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=11e07d6926f94a06cc057f38b997e6a2f4bfbf0a;p=freqai-strategies.git refactor(reforcexy): remove force actions logic interfering with RL reward logic Signed-off-by: Jérôme Benoit --- diff --git a/ReforceXY/reward_space_analysis/README.md b/ReforceXY/reward_space_analysis/README.md index 79502c7..63cf10b 100644 --- a/ReforceXY/reward_space_analysis/README.md +++ b/ReforceXY/reward_space_analysis/README.md @@ -269,7 +269,7 @@ _Holding penalty configuration:_ _Exit attenuation configuration:_ -- `exit_attenuation_mode` (default: linear) - Selects attenuation kernel (see table below: legacy|sqrt|linear|power|half_life). +- `exit_attenuation_mode` (default: linear) - Selects attenuation kernel (see table below: legacy|sqrt|linear|power|half_life). Fallback to linear. - `exit_plateau` (default: true) - Enables plateau (no attenuation until `exit_plateau_grace`). - `exit_plateau_grace` (default: 1.0) - Duration ratio boundary of full‑strength region (may exceed 1.0). - `exit_linear_slope` (default: 1.0) - Slope parameter used only when mode = linear. @@ -332,7 +332,7 @@ _Invariant / safety controls:_ | Component | Controlled By | Notes | |-----------|---------------|-------| -| Sample simulation | `--seed` | Drives action sampling, PnL noise, force actions. | +| Sample simulation | `--seed` | Drives action sampling, PnL noise generation. | | Statistical tests / bootstrap | `--stats_seed` (fallback `--seed`) | Local RNG; isolation prevents side‑effects in user code. | | RandomForest & permutation importance | `--seed` | Ensures identical splits and tree construction. | | Partial dependence grids | Deterministic | Depends only on fitted model & data. | diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index 36fea29..b87c5c3 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -6,12 +6,17 @@ Capabilities: - Percentile bootstrap confidence intervals (BCa not yet implemented). - Distribution diagnostics (Shapiro, Anderson, skewness, kurtosis, Q-Q R²). - Distribution shift metrics (KL divergence, JS distance, Wasserstein, KS test) with - degenerate (constant) distribution safe‑guards. + degenerate (constant) distribution safeguards. - Unified RandomForest feature importance + partial dependence. - Heteroscedastic PnL simulation (variance scales with duration). +Exit attenuation mode normalization: +- User supplied ``exit_attenuation_mode`` is taken as-is (case-sensitive) and validated + against the allowed set. Any invalid value (including casing mismatch) results in a + silent fallback to ``'linear'`` (parity with the live environment) – no warning. + Architecture principles: -- Single source of truth: `DEFAULT_MODEL_REWARD_PARAMETERS` for tunables + dynamic CLI. +- Single source of truth: ``DEFAULT_MODEL_REWARD_PARAMETERS`` (dynamic CLI generation). - Determinism: explicit seeding, parameter hashing for manifest traceability. - Extensibility: modular helpers (sampling, reward calculation, statistics, reporting). """ @@ -26,7 +31,7 @@ import random import warnings from enum import Enum, IntEnum from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, Mapping import numpy as np import pandas as pd @@ -53,12 +58,6 @@ class Positions(Enum): Neutral = 0.5 -class ForceActions(IntEnum): - Take_profit = 0 - Stop_loss = 1 - Timeout = 2 - - def _to_bool(value: Any) -> bool: if isinstance(value, bool): return value @@ -74,12 +73,14 @@ def _to_bool(value: Any) -> bool: return bool(text) -def _get_param_float(params: Dict[str, float | str], key: str, default: float) -> float: +def _get_param_float( + params: Mapping[str, RewardParamValue], key: str, default: RewardParamValue +) -> float: """Extract float parameter with type safety and default fallback.""" value = params.get(key, default) - # None -> default + # None -> NaN if value is None: - return default + return np.nan # Bool: treat explicitly (avoid surprising True->1.0 unless intentional) if isinstance(value, bool): return float(int(value)) @@ -88,20 +89,20 @@ def _get_param_float(params: Dict[str, float | str], key: str, default: float) - try: fval = float(value) except (ValueError, TypeError): - return default - return fval if np.isfinite(fval) else default + return np.nan + return fval if np.isfinite(fval) else np.nan # String parsing if isinstance(value, str): stripped = value.strip() if stripped == "": - return default + return np.nan try: fval = float(stripped) except ValueError: - return default - return fval if np.isfinite(fval) else default + return np.nan + return fval if np.isfinite(fval) else np.nan # Unsupported type - return default + return np.nan def _compute_duration_ratio(trade_duration: int, max_trade_duration: int) -> float: @@ -121,14 +122,21 @@ def _is_short_allowed(trading_mode: str) -> bool: # Mathematical constants pre-computed for performance _LOG_2 = math.log(2.0) -DEFAULT_MODEL_REWARD_PARAMETERS: Dict[str, float | str] = { +RewardParamValue = Union[float, str, bool, None] +RewardParams = Dict[str, RewardParamValue] + + +# Allowed exit attenuation modes +ALLOWED_EXIT_MODES = {"legacy", "sqrt", "linear", "power", "half_life"} + +DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { "invalid_action": -2.0, "base_factor": 100.0, # Idle penalty (env defaults) "idle_penalty_scale": 0.5, "idle_penalty_power": 1.025, - # Fallback semantics: if <=0 or unset → 2 * max_trade_duration_candles (grace window before full idle penalty) - "max_idle_duration_candles": 0, + # Fallback semantics: 2 * max_trade_duration_candles + "max_idle_duration_candles": None, # Holding keys (env defaults) "holding_penalty_scale": 0.25, "holding_penalty_power": 1.025, @@ -155,7 +163,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { "base_factor": "Base reward factor used inside the environment.", "idle_penalty_power": "Power applied to idle penalty scaling.", "idle_penalty_scale": "Scale of idle penalty.", - "max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling; 0 = use 2 * max_trade_duration_candles.", + "max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling.", "holding_penalty_scale": "Scale of holding penalty.", "holding_penalty_power": "Power applied to holding penalty scaling.", "exit_attenuation_mode": "Attenuation kernel (legacy|sqrt|linear|power|half_life).", @@ -198,37 +206,30 @@ _PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = { def validate_reward_parameters( - params: Dict[str, float | str], -) -> Tuple[Dict[str, float | str], Dict[str, Dict[str, Any]]]: + params: RewardParams, +) -> Tuple[RewardParams, Dict[str, Dict[str, Any]]]: """Validate and clamp reward parameter values. - Returns - ------- - sanitized_params : dict - Potentially adjusted copy of input params. - adjustments : dict - Mapping param -> {original, adjusted, reason} for every modification. + This function enforces numeric bounds declared in ``_PARAMETER_BOUNDS``. Values + outside their allowed range are clamped and an entry is recorded in the + ``adjustments`` mapping describing the original value, the adjusted value and the + reason (which bound triggered the change). Non‑finite values are reset to the + minimum bound (or 0.0 if no explicit minimum is defined). - Validation - ---------- - After loading and (if applicable) flattening, the function will validate the - presence of a set of required columns and raise a ValueError if any are missing. - This provides an early, clear error message instead of letting downstream code fail - with a less informative exception. + It does NOT perform schema validation of any DataFrame (legacy text removed). - Required columns (validator): - - "pnl", "trade_duration", "idle_duration", "position", "action", "reward_total" + Parameters + ---------- + params : dict + Raw user supplied reward parameter overrides (already merged with defaults + upstream). The dict is not mutated in‑place; a sanitized copy is returned. Returns ------- - pd.DataFrame - DataFrame containing the transitions (one transition per row). - - Raises - ------ - ValueError - If the pickled payload cannot be converted to a DataFrame with the required columns. - + sanitized_params : dict + Possibly adjusted copy of the provided parameters. + adjustments : dict[str, dict] + Mapping: param -> {original, adjusted, reason} for every modified entry. """ sanitized = dict(params) adjustments: Dict[str, Dict[str, Any]] = {} @@ -260,13 +261,32 @@ def validate_reward_parameters( return sanitized, adjustments +def _normalize_and_validate_mode(params: RewardParams) -> None: + """Align normalization of ``exit_attenuation_mode`` with ReforceXY environment. + + Behaviour (mirrors in-env logic): + - Do not force lowercase or strip user formatting; use the value as provided. + - Supported modes (case-sensitive): {legacy, sqrt, linear, power, half_life}. + - If the value is not among supported keys, silently fall back to 'linear' + without emitting a warning (environment side performs a silent fallback). + - If the key is absent or value is ``None``: leave untouched (upstream defaults + will inject 'linear'). + """ + exit_attenuation_mode = params.get("exit_attenuation_mode") + if exit_attenuation_mode is None: + return + exit_attenuation_mode = str(exit_attenuation_mode) + if exit_attenuation_mode not in ALLOWED_EXIT_MODES: + params["exit_attenuation_mode"] = "linear" + + def add_tunable_cli_args(parser: argparse.ArgumentParser) -> None: """Dynamically add CLI options for each tunable in DEFAULT_MODEL_REWARD_PARAMETERS. Rules: - Use the same underscored names as option flags (e.g., --idle_penalty_scale). - Defaults are None so only user-provided values override params. - - For exit_attenuation_mode, enforce allowed choices and lowercase conversion. + - For exit_attenuation_mode, enforce allowed choices (case-sensitive; invalid value will later silently fallback to 'linear'). - Skip keys already managed as top-level options (e.g., base_factor) to avoid duplicates. """ skip_keys = {"base_factor"} # already defined as top-level @@ -279,8 +299,8 @@ def add_tunable_cli_args(parser: argparse.ArgumentParser) -> None: if key == "exit_attenuation_mode": parser.add_argument( f"--{key}", - type=str.lower, - choices=["legacy", "sqrt", "linear", "power", "half_life"], + type=str, # case preserved; validation + silent fallback occurs before factor computation + choices=sorted(ALLOWED_EXIT_MODES), default=None, help=help_text, ) @@ -312,7 +332,6 @@ class RewardContext: min_unrealized_profit: float position: Positions action: Actions - force_action: Optional[ForceActions] @dataclasses.dataclass @@ -329,18 +348,23 @@ def _get_exit_factor( pnl: float, pnl_factor: float, duration_ratio: float, - params: Dict[str, float | str], + params: RewardParams, ) -> float: - """Compute exit factor = time attenuation kernel (with optional plateau) * pnl_factor. + """Compute exit factor = time attenuation kernel (with optional plateau) * ``pnl_factor``. + + Parity: mirrors the live environment's logic (``ReforceXY._get_exit_factor``). - Parity: mirrors `ReforceXY._get_exit_factor`. + Assumptions: + - ``_normalize_and_validate_mode`` has already run (invalid modes replaced by 'linear'). + - ``exit_attenuation_mode`` is therefore either a member of ``ALLOWED_EXIT_MODES`` or 'linear'. + - All numeric tunables are accessed through ``_get_param_float`` for safety. - Steps: - 1. Sanitize inputs (finite, non-negative duration_ratio). - 2. Derive effective duration ratio: if plateau enabled and r <= grace ⇒ 0 else r' = r - grace. - 3. Apply kernel (legacy|sqrt|linear|power|half_life). Unknown ⇒ linear. - 4. Multiply by externally supplied pnl_factor (includes profit amplification & efficiency). - 5. Enforce invariants (finite, non-negative when pnl ≥ 0, warn if |factor| exceeds threshold). + Algorithm steps: + 1. Finiteness & non-negative guard on inputs. + 2. Plateau handling: effective duration ratio = 0 within grace region else (r - grace). + 3. Kernel application (legacy|sqrt|linear|power|half_life). + 4. Multiply by externally supplied ``pnl_factor`` (already includes profit & efficiency effects). + 5. Invariants: ensure finiteness; clamp negative factor when pnl >= 0; emit threshold warning. """ # Basic finiteness checks if ( @@ -354,7 +378,7 @@ def _get_exit_factor( if duration_ratio < 0.0: duration_ratio = 0.0 - exit_attenuation_mode = str(params.get("exit_attenuation_mode", "linear")).lower() + exit_attenuation_mode = str(params.get("exit_attenuation_mode", "linear")) exit_plateau = _to_bool(params.get("exit_plateau", True)) exit_plateau_grace = _get_param_float(params, "exit_plateau_grace", 1.0) @@ -449,7 +473,7 @@ def _get_exit_factor( def _get_pnl_factor( - params: Dict[str, float | str], context: RewardContext, profit_target: float + params: RewardParams, context: RewardContext, profit_target: float ) -> float: """Env-aligned PnL factor combining profit amplification and exit efficiency.""" pnl = context.pnl @@ -508,13 +532,7 @@ def _is_valid_action( action: Actions, *, short_allowed: bool, - force_action: Optional[ForceActions], ) -> bool: - if force_action is not None and position in (Positions.Long, Positions.Short): - if position == Positions.Long: - return action == Actions.Long_exit - return action == Actions.Short_exit - if action == Actions.Neutral: return True if action == Actions.Long_enter: @@ -529,7 +547,7 @@ def _is_valid_action( def _idle_penalty( - context: RewardContext, idle_factor: float, params: Dict[str, float | str] + context: RewardContext, idle_factor: float, params: RewardParams ) -> float: """Mirror the environment's idle penalty behaviour.""" idle_penalty_scale = _get_param_float( @@ -559,15 +577,13 @@ def _idle_penalty( max_idle_duration = int(max_idle_duration_candles) except (TypeError, ValueError): max_idle_duration = 2 * max_trade_duration_candles - if max_idle_duration <= 0: - max_idle_duration = 2 * max_trade_duration_candles idle_duration_ratio = context.idle_duration / max(1, max_idle_duration) return -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power def _holding_penalty( - context: RewardContext, holding_factor: float, params: Dict[str, float | str] + context: RewardContext, holding_factor: float, params: RewardParams ) -> float: """Mirror the environment's holding penalty behaviour.""" holding_penalty_scale = _get_param_float( @@ -598,7 +614,7 @@ def _compute_exit_reward( base_factor: float, pnl_factor: float, context: RewardContext, - params: Dict[str, float | str], + params: RewardParams, ) -> float: """Compose the exit reward: pnl * exit_factor.""" duration_ratio = _compute_duration_ratio( @@ -612,7 +628,7 @@ def _compute_exit_reward( def calculate_reward( context: RewardContext, - params: Dict[str, float | str], + params: RewardParams, base_factor: float, profit_target: float, risk_reward_ratio: float, @@ -626,7 +642,6 @@ def calculate_reward( context.position, context.action, short_allowed=short_allowed, - force_action=context.force_action, ) if not is_valid and not action_masking: breakdown.invalid_penalty = _get_param_float(params, "invalid_action", -2.0) @@ -650,21 +665,6 @@ def calculate_reward( pnl_factor = _get_pnl_factor(params, context, profit_target_final) holding_factor = idle_factor - if context.force_action in ( - ForceActions.Take_profit, - ForceActions.Stop_loss, - ForceActions.Timeout, - ): - exit_reward = _compute_exit_reward( - factor, - pnl_factor, - context, - params, - ) - breakdown.exit_component = exit_reward - breakdown.total = exit_reward - return breakdown - if context.action == Actions.Neutral and context.position == Positions.Neutral: breakdown.idle_penalty = _idle_penalty(context, idle_factor, params) breakdown.total = breakdown.idle_penalty @@ -726,23 +726,23 @@ def _sample_action( return rng.choices(choices, weights=weights, k=1)[0] -def parse_overrides(overrides: Iterable[str]) -> Dict[str, float | str]: - parsed: Dict[str, float | str] = {} +def parse_overrides(overrides: Iterable[str]) -> RewardParams: + parsed: RewardParams = {} for override in overrides: if "=" not in override: raise ValueError(f"Invalid override format: '{override}'") - key, raw_value = override.split("=", 1) + key, value = override.split("=", 1) try: - parsed[key] = float(raw_value) + parsed[key] = float(value) except ValueError: - parsed[key] = raw_value + parsed[key] = value return parsed def simulate_samples( num_samples: int, seed: int, - params: Dict[str, float | str], + params: RewardParams, max_trade_duration: int, base_factor: float, profit_target: float, @@ -765,24 +765,7 @@ def simulate_samples( position_weights = [0.6, 0.4] position = rng.choices(position_choices, weights=position_weights, k=1)[0] - force_action: Optional[ForceActions] - if position != Positions.Neutral and rng.random() < 0.08: - force_action = rng.choice( - [ForceActions.Take_profit, ForceActions.Stop_loss, ForceActions.Timeout] - ) - else: - force_action = None - - if ( - action_masking - and force_action is not None - and position != Positions.Neutral - ): - action = ( - Actions.Long_exit if position == Positions.Long else Actions.Short_exit - ) - else: - action = _sample_action(position, rng, short_allowed=short_allowed) + action = _sample_action(position, rng, short_allowed=short_allowed) if position == Positions.Neutral: trade_duration = 0 @@ -797,9 +780,6 @@ def simulate_samples( except (TypeError, ValueError): max_idle_duration_candles = int(max_trade_duration * max_duration_ratio) - if max_idle_duration_candles <= 0: - max_idle_duration_candles = int(max_trade_duration * max_duration_ratio) - idle_duration = int(rng.uniform(0, max_idle_duration_candles)) else: trade_duration = int( @@ -824,14 +804,6 @@ def simulate_samples( elif position == Positions.Short: pnl -= 0.005 * duration_factor - # Force actions should correlate with PnL sign - if force_action == ForceActions.Take_profit: - # Take profit exits should have positive PnL - pnl = abs(pnl) + rng.uniform(0.01, 0.05) - elif force_action == ForceActions.Stop_loss: - # Stop loss exits should have negative PnL - pnl = -abs(pnl) - rng.uniform(0.01, 0.05) - # Clip PnL to realistic range pnl = max(min(pnl, 0.15), -0.15) @@ -855,7 +827,6 @@ def simulate_samples( min_unrealized_profit=min_unrealized_profit, position=position, action=action, - force_action=force_action, ) breakdown = calculate_reward( @@ -877,15 +848,11 @@ def simulate_samples( "idle_ratio": context.idle_duration / max(1, max_trade_duration), "position": float(context.position.value), "action": float(context.action.value), - "force_action": float( - -1 if context.force_action is None else context.force_action.value - ), "reward_total": breakdown.total, "reward_invalid": breakdown.invalid_penalty, "reward_idle": breakdown.idle_penalty, "reward_holding": breakdown.holding_penalty, "reward_exit": breakdown.exit_component, - "is_force_exit": float(context.force_action is not None), "is_invalid": float(breakdown.invalid_penalty != 0.0), } ) @@ -1109,7 +1076,6 @@ def _compute_representativity_stats( idle_activated = float((df["reward_idle"] != 0).mean()) holding_activated = float((df["reward_holding"] != 0).mean()) exit_activated = float((df["reward_exit"] != 0).mean()) - force_exit_share = float(df["is_force_exit"].mean()) return { "total": total, @@ -1122,7 +1088,6 @@ def _compute_representativity_stats( "idle_activated": idle_activated, "holding_activated": holding_activated, "exit_activated": exit_activated, - "force_exit_share": force_exit_share, } @@ -1152,11 +1117,12 @@ def _perform_feature_analysis( "idle_ratio", "position", "action", - "force_action", - "is_force_exit", "is_invalid", ] X = df[feature_cols] + for col in ("trade_duration", "idle_duration"): + if col in X.columns and pd.api.types.is_integer_dtype(X[col]): + X[col] = X[col].astype(float) y = df["reward_total"] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.25, random_state=seed @@ -1328,8 +1294,6 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr "idle_ratio", "max_unrealized_profit", "min_unrealized_profit", - "is_force_exit", - "force_action", } for col in list(numeric_expected | numeric_optional): @@ -1552,38 +1516,7 @@ def statistical_hypothesis_tests( "n_groups": len(position_groups), } - # Test 3: Force vs regular exits - force_exits = df[df["is_force_exit"] == 1]["reward_exit"].dropna() - regular_exits = df[(df["is_force_exit"] == 0) & (df["reward_exit"] != 0)][ - "reward_exit" - ].dropna() - - if len(force_exits) >= 30 and len(regular_exits) >= 30: - u_stat, p_val = stats.mannwhitneyu( - force_exits, regular_exits, alternative="two-sided" - ) - n1, n2 = len(force_exits), len(regular_exits) - # Rank-biserial correlation (directional): r = 2*U1/(n1*n2) - 1 - # Compute U1 from sum of ranks of the first group for a robust sign. - combined = np.concatenate([force_exits.values, regular_exits.values]) - ranks = stats.rankdata(combined, method="average") - R1 = float(ranks[:n1].sum()) - U1 = R1 - n1 * (n1 + 1) / 2.0 - r_rb = (2.0 * U1) / (n1 * n2) - 1.0 - - results["force_vs_regular_exits"] = { - "test": "Mann-Whitney U", - "statistic": float(u_stat), - "p_value": float(p_val), - "significant": bool(p_val < alpha), - "effect_size_rank_biserial": float(r_rb), - "median_force": float(force_exits.median()), - "median_regular": float(regular_exits.median()), - "n_force": len(force_exits), - "n_regular": len(regular_exits), - } - - # Test 4: PnL sign differences + # Test 3: PnL sign differences pnl_positive = df[df["pnl"] > 0]["reward_total"].dropna() pnl_negative = df[df["pnl"] < 0]["reward_total"].dropna() @@ -2139,7 +2072,6 @@ def write_complete_statistical_analysis( f"| Holding penalty | {representativity_stats['holding_activated']:.1%} |\n" ) f.write(f"| Exit reward | {representativity_stats['exit_activated']:.1%} |\n") - f.write(f"| Force exit | {representativity_stats['force_exit_share']:.1%} |\n") f.write("\n") # Section 3: Reward Component Relationships @@ -2251,25 +2183,6 @@ def write_complete_statistical_analysis( ) f.write(f"- **Interpretation:** {h['interpretation']} effect\n\n") - if "force_vs_regular_exits" in hypothesis_tests: - h = hypothesis_tests["force_vs_regular_exits"] - f.write("#### 5.1.3 Force vs Regular Exits Comparison\n\n") - f.write(f"**Test Method:** {h['test']}\n\n") - f.write(f"- U-statistic: **{h['statistic']:.4f}**\n") - f.write(f"- p-value: {h['p_value']:.4g}\n") - if "p_value_adj" in h: - f.write( - f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅' if h['significant_adj'] else '❌'} (α=0.05)\n" - ) - f.write( - f"- Effect size (rank-biserial): {h['effect_size_rank_biserial']:.4f}\n" - ) - f.write(f"- Median (force): {h['median_force']:.4f}\n") - f.write(f"- Median (regular): {h['median_regular']:.4f}\n") - f.write( - f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n\n" - ) - if "pnl_sign_reward_difference" in hypothesis_tests: h = hypothesis_tests["pnl_sign_reward_difference"] f.write("#### 5.1.4 Positive vs Negative PnL Comparison\n\n") @@ -2413,6 +2326,8 @@ def main() -> None: # Early parameter validation (moved before simulation for alignment with docs) params_validated, adjustments = validate_reward_parameters(params) params = params_validated + # Normalize attenuation mode + _normalize_and_validate_mode(params) base_factor = _get_param_float(params, "base_factor", float(args.base_factor)) profit_target = _get_param_float(params, "profit_target", float(args.profit_target)) diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py index 623ac2c..0e608db 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py @@ -27,7 +27,6 @@ try: from reward_space_analysis import ( DEFAULT_MODEL_REWARD_PARAMETERS, Actions, - ForceActions, Positions, RewardContext, _get_exit_factor, @@ -341,7 +340,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=0.015, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) breakdown = calculate_reward( @@ -363,128 +361,6 @@ class TestRewardAlignment(RewardSpaceTestBase): breakdown.exit_component, 0, "Profitable exit should have positive reward" ) - def test_force_action_logic(self): - """Validate forced exits (take profit, stop loss, timeout) produce consistent exit rewards. - - Algorithmic expectations: - - ForceActions override the provided action and trigger exit reward path. - - Exit reward sign should match PnL sign (exit_factor is positive under invariants). - - Take profit reward magnitude > stop loss reward magnitude for comparable |PnL|. - - Timeout uses current PnL (can be positive or negative); we assert sign consistency only. - """ - profit_target = 0.06 - - # Take profit (positive pnl) - tp_context = RewardContext( - pnl=0.05, - trade_duration=50, - idle_duration=0, - max_trade_duration=100, - max_unrealized_profit=0.07, - min_unrealized_profit=0.01, - position=Positions.Long, - action=Actions.Neutral, # action ignored due to force_action - force_action=ForceActions.Take_profit, - ) - tp_breakdown = calculate_reward( - tp_context, - self.DEFAULT_PARAMS, - base_factor=self.TEST_BASE_FACTOR, - profit_target=profit_target, - risk_reward_ratio=self.TEST_RR_HIGH, - short_allowed=True, - action_masking=True, - ) - self.assertGreater( - tp_breakdown.exit_component, - 0.0, - "Take profit should yield positive exit reward", - ) - # Exit reward should be the only active component - self.assertEqual(tp_breakdown.invalid_penalty, 0.0) - self.assertEqual(tp_breakdown.idle_penalty, 0.0) - self.assertEqual(tp_breakdown.holding_penalty, 0.0) - self.assertEqual(tp_breakdown.total, tp_breakdown.exit_component) - self.assertAlmostEqualFloat( - math.copysign(1, tp_breakdown.exit_component), - 1.0, - msg="TP reward sign mismatch", - ) - - # Stop loss (negative pnl) - sl_context = RewardContext( - pnl=-0.03, - trade_duration=50, - idle_duration=0, - max_trade_duration=100, - max_unrealized_profit=0.01, - min_unrealized_profit=-0.05, - position=Positions.Long, - action=Actions.Neutral, - force_action=ForceActions.Stop_loss, - ) - sl_breakdown = calculate_reward( - sl_context, - self.DEFAULT_PARAMS, - base_factor=self.TEST_BASE_FACTOR, - profit_target=profit_target, - risk_reward_ratio=self.TEST_RR_HIGH, - short_allowed=True, - action_masking=True, - ) - self.assertLess( - sl_breakdown.exit_component, - 0.0, - "Stop loss should yield negative exit reward", - ) - self.assertEqual(sl_breakdown.invalid_penalty, 0.0) - self.assertEqual(sl_breakdown.idle_penalty, 0.0) - self.assertEqual(sl_breakdown.holding_penalty, 0.0) - self.assertEqual(sl_breakdown.total, sl_breakdown.exit_component) - self.assertAlmostEqualFloat( - math.copysign(1, sl_breakdown.exit_component), - -1.0, - msg="SL reward sign mismatch", - ) - - # Timeout (use small positive pnl) - to_context = RewardContext( - pnl=0.01, - trade_duration=120, # beyond default max - idle_duration=0, - max_trade_duration=100, - max_unrealized_profit=0.02, - min_unrealized_profit=-0.01, - position=Positions.Long, - action=Actions.Neutral, - force_action=ForceActions.Timeout, - ) - to_breakdown = calculate_reward( - to_context, - self.DEFAULT_PARAMS, - base_factor=self.TEST_BASE_FACTOR, - profit_target=profit_target, - risk_reward_ratio=self.TEST_RR_HIGH, - short_allowed=True, - action_masking=True, - ) - self.assertGreaterEqual( - to_breakdown.exit_component, - 0.0, - "Timeout reward should be non-negative with positive PnL", - ) - self.assertEqual(to_breakdown.invalid_penalty, 0.0) - self.assertEqual(to_breakdown.idle_penalty, 0.0) - self.assertEqual(to_breakdown.holding_penalty, 0.0) - self.assertEqual(to_breakdown.total, to_breakdown.exit_component) - - # Magnitude ordering: TP reward magnitude > SL reward magnitude (absolute values, given larger |pnl| for TP) - self.assertGreater( - abs(tp_breakdown.exit_component), - abs(sl_breakdown.exit_component), - "Take profit reward magnitude should exceed stop loss reward magnitude", - ) - def test_efficiency_zero_policy(self): """Ensure pnl == 0 with max_unrealized_profit == 0 does not get boosted. @@ -501,7 +377,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=-0.02, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) params = self.DEFAULT_PARAMS.copy() @@ -531,7 +406,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Neutral, action=Actions.Neutral, - force_action=None, ) breakdown_small = calculate_reward( @@ -566,13 +440,13 @@ class TestRewardAlignment(RewardSpaceTestBase): """Fallback & proportionality validation. Semantics: - - When max_idle_duration_candles <= 0, fallback must be 2 * max_trade_duration (updated rule). + - When max_idle_duration_candles is unset, fallback must be 2 * max_trade_duration. - Idle penalty scales ~ linearly with idle_duration (power=1), so doubling idle_duration doubles penalty magnitude. - We also infer the implicit denominator from a mid-range idle duration (>1x and <2x trade duration) to ensure the 2x fallback. """ params = self.DEFAULT_PARAMS.copy() - params["max_idle_duration_candles"] = 0 # force fallback + params["max_idle_duration_candles"] = None base_factor = 90.0 profit_target = self.TEST_PROFIT_TARGET risk_reward_ratio = 1.0 @@ -587,7 +461,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Neutral, action=Actions.Neutral, - force_action=None, ) ctx_b = dataclasses.replace(ctx_a, idle_duration=40) @@ -674,7 +547,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) # Baseline with moderate base_factor @@ -845,7 +717,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Neutral, action=Actions.Neutral, - force_action=None, ) br = calculate_reward( context, @@ -921,7 +792,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) br = calculate_reward( context, @@ -994,7 +864,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ), # Losing exit RewardContext( @@ -1006,7 +875,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=-0.04, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ), # Idle penalty RewardContext( @@ -1018,7 +886,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Neutral, action=Actions.Neutral, - force_action=None, ), # Holding penalty (maintained position) RewardContext( @@ -1030,7 +897,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=-0.01, position=Positions.Long, action=Actions.Neutral, - force_action=None, ), ] @@ -1116,7 +982,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=pnl if pnl < 0 else -0.01, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) ctx_short = RewardContext( pnl=pnl, @@ -1127,7 +992,6 @@ class TestRewardAlignment(RewardSpaceTestBase): min_unrealized_profit=pnl if pnl < 0 else -0.01, position=Positions.Short, action=Actions.Short_exit, - force_action=None, ) br_long = calculate_reward( ctx_long, @@ -1441,7 +1305,6 @@ class TestStatisticalValidation(RewardSpaceTestBase): min_unrealized_profit=0.04, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) params = self.DEFAULT_PARAMS.copy() @@ -1731,30 +1594,16 @@ class TestStatisticalValidation(RewardSpaceTestBase): """Test reward calculation scenarios.""" # Test different reward scenarios test_cases = [ - # (position, action, force_action, expected_reward_type) - (Positions.Neutral, Actions.Neutral, None, "idle_penalty"), - (Positions.Long, Actions.Long_exit, None, "exit_component"), - (Positions.Short, Actions.Short_exit, None, "exit_component"), - ( - Positions.Long, - Actions.Neutral, - ForceActions.Take_profit, - "exit_component", - ), - ( - Positions.Short, - Actions.Neutral, - ForceActions.Stop_loss, - "exit_component", - ), + # (position, action, expected_reward_type) + (Positions.Neutral, Actions.Neutral, "idle_penalty"), + (Positions.Long, Actions.Long_exit, "exit_component"), + (Positions.Short, Actions.Short_exit, "exit_component"), ] - for position, action, force_action, expected_type in test_cases: - with self.subTest( - position=position, action=action, force_action=force_action - ): + for position, action, expected_type in test_cases: + with self.subTest(position=position, action=action): context = RewardContext( - pnl=0.02 if force_action == ForceActions.Take_profit else -0.02, + pnl=0.02 if expected_type == "exit_component" else 0.0, trade_duration=50 if position != Positions.Neutral else 0, idle_duration=10 if position == Positions.Neutral else 0, max_trade_duration=100, @@ -1762,7 +1611,6 @@ class TestStatisticalValidation(RewardSpaceTestBase): min_unrealized_profit=-0.01, position=position, action=action, - force_action=force_action, ) breakdown = calculate_reward( @@ -1815,7 +1663,6 @@ class TestBoundaryConditions(RewardSpaceTestBase): min_unrealized_profit=0.02, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) breakdown = calculate_reward( @@ -1851,7 +1698,6 @@ class TestBoundaryConditions(RewardSpaceTestBase): min_unrealized_profit=0.01, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) breakdown = calculate_reward( @@ -2033,7 +1879,6 @@ class TestPrivateFunctions(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Neutral, action=Actions.Neutral, - force_action=None, ) breakdown = calculate_reward( @@ -2063,7 +1908,6 @@ class TestPrivateFunctions(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Neutral, - force_action=None, ) breakdown = calculate_reward( @@ -2105,7 +1949,6 @@ class TestPrivateFunctions(RewardSpaceTestBase): min_unrealized_profit=min(pnl - 0.01, -0.01), position=position, action=action, - force_action=None, ) breakdown = calculate_reward( @@ -2140,7 +1983,6 @@ class TestPrivateFunctions(RewardSpaceTestBase): min_unrealized_profit=0.01, position=Positions.Short, action=Actions.Long_exit, - force_action=None, # Invalid: can't long_exit from short ) breakdown = calculate_reward( @@ -2186,7 +2028,6 @@ class TestPrivateFunctions(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Neutral, - force_action=None, ) breakdown = calculate_reward( @@ -2246,7 +2087,6 @@ class TestPrivateFunctions(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Neutral, - force_action=None, ) breakdown = calculate_reward( @@ -2287,7 +2127,6 @@ class TestPrivateFunctions(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) breakdown = calculate_reward( context, @@ -2324,7 +2163,6 @@ class TestRewardRobustness(RewardSpaceTestBase): min_unrealized_profit: float = 0.01, position: Positions = Positions.Long, action: Actions = Actions.Long_exit, - force_action: ForceActions | None = None, ) -> RewardContext: return RewardContext( pnl=pnl, @@ -2335,7 +2173,6 @@ class TestRewardRobustness(RewardSpaceTestBase): min_unrealized_profit=min_unrealized_profit, position=position, action=action, - force_action=force_action, ) def test_decomposition_integrity(self): @@ -2355,7 +2192,6 @@ class TestRewardRobustness(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Neutral, action=Actions.Neutral, - force_action=None, ), active="idle_penalty", ), @@ -2370,7 +2206,6 @@ class TestRewardRobustness(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Neutral, - force_action=None, ), active="holding_penalty", ), @@ -2390,7 +2225,6 @@ class TestRewardRobustness(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Short, action=Actions.Long_exit, # invalid - force_action=None, ), active="invalid_penalty", ), @@ -2696,7 +2530,6 @@ class TestParameterValidation(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Neutral, action=Actions.Neutral, - force_action=None, ) ctx_b = dataclasses.replace(ctx_a, idle_duration=40) br_a = calculate_reward( @@ -2738,7 +2571,6 @@ class TestParameterValidation(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Neutral, - force_action=None, ) ctx_h2 = dataclasses.replace(ctx_h1, trade_duration=140) # Compute baseline and comparison holding penalties @@ -2788,7 +2620,6 @@ class TestParameterValidation(RewardSpaceTestBase): min_unrealized_profit=0.0, position=Positions.Long, action=Actions.Long_exit, - force_action=None, ) with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") diff --git a/ReforceXY/user_data/config-template.json b/ReforceXY/user_data/config-template.json index d62ce9e..f98c23a 100644 --- a/ReforceXY/user_data/config-template.json +++ b/ReforceXY/user_data/config-template.json @@ -15,10 +15,6 @@ // "trading_mode": "futures", // "margin_mode": "isolated", "trading_mode": "spot", - // "minimal_roi": { - // "0": 0.03 - // }, // Take_profit exit value used with force_actions - // "stoploss": -0.02, // Stop_loss exit value used with force_actions "stoploss": -0.99, "unfilledtimeout": { "entry": 10, @@ -170,8 +166,7 @@ "add_state_info": true, "cpu_count": 4, "max_training_drawdown_pct": 0.02, - "max_trade_duration_candles": 96, // Timeout exit value used with force_actions - "force_actions": false, // Utilize minimal_roi, stoploss, and max_trade_duration_candles as TP/SL/Timeout in the environment + "max_trade_duration_candles": 96, // Maximum trade duration in candles "n_envs": 8, // Number of DummyVecEnv or SubProcVecEnv training environments "multiprocessing": true, // Use SubprocVecEnv if n_envs>1 (otherwise DummyVecEnv) "frame_stacking": 2, // Number of VecFrameStack stacks (set > 1 to use) diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index b0269ac..a41a237 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -7,7 +7,7 @@ import time import warnings from collections import defaultdict from collections.abc import Mapping -from enum import IntEnum + from pathlib import Path from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Type, Union @@ -66,12 +66,6 @@ warnings.filterwarnings("ignore", category=ExperimentalWarning) logger = logging.getLogger(__name__) -class ForceActions(IntEnum): - Take_profit = 0 - Stop_loss = 1 - Timeout = 2 - - class ReforceXY(BaseReinforcementLearningModel): """ Custom Freqtrade Freqai reinforcement learning prediction model. @@ -79,15 +73,12 @@ class ReforceXY(BaseReinforcementLearningModel): { "freqaimodel": "ReforceXY", "strategy": "RLAgentStrategy", - "minimal_roi": {"0": 0.03}, // Take_profit exit value used with force_actions - "stoploss": -0.02, // Stop_loss exit value used with force_actions ... "freqai": { ... "rl_config": { ... - "max_trade_duration_candles": 96, // Timeout exit value used with force_actions - "force_actions": false, // Utilize minimal_roi, stoploss, and max_trade_duration_candles as TP/SL/Timeout in the environment + "max_trade_duration_candles": 96, // Maximum trade duration in candles "n_envs": 1, // Number of DummyVecEnv or SubProcVecEnv training environments "n_eval_envs": 1, // Number of DummyVecEnv or SubProcVecEnv evaluation environments "multiprocessing": false, // Use SubprocVecEnv if n_envs>1 (otherwise DummyVecEnv) @@ -126,7 +117,7 @@ class ReforceXY(BaseReinforcementLearningModel): """ _LOG_2 = math.log(2.0) - _action_masks_cache: Dict[Tuple[str, int, Optional[int]], NDArray[np.bool_]] = {} + _action_masks_cache: Dict[Tuple[bool, int], NDArray[np.bool_]] = {} def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -195,28 +186,18 @@ class ReforceXY(BaseReinforcementLearningModel): def get_action_masks( can_short: bool, position: Positions, - force_action: Optional[ForceActions] = None, ) -> NDArray[np.bool_]: position = ReforceXY._normalize_position(position) cache_key = ( can_short, position.value, - force_action.value if force_action else None, ) if cache_key in ReforceXY._action_masks_cache: return ReforceXY._action_masks_cache[cache_key] action_masks = np.zeros(len(Actions), dtype=np.bool_) - if force_action is not None and position in (Positions.Long, Positions.Short): - if position == Positions.Long: - action_masks[Actions.Long_exit.value] = True - else: - action_masks[Actions.Short_exit.value] = True - ReforceXY._action_masks_cache[cache_key] = action_masks - return ReforceXY._action_masks_cache[cache_key] - action_masks[Actions.Neutral.value] = True if position == Positions.Neutral: action_masks[Actions.Long_enter.value] = True @@ -1311,25 +1292,13 @@ class MyRLEnv(Base5ActionRLEnv): super().__init__(*args, **kwargs) self._set_observation_space() self.action_masking: bool = self.rl_config.get("action_masking", False) - self.force_actions: bool = self.rl_config.get("force_actions", False) - self._force_action: Optional[ForceActions] = None - self.take_profit: float = self.config.get("minimal_roi", {}).get("0", 0.03) - self.stop_loss: float = self.config.get("stoploss", -0.02) - self.timeout: int = self.rl_config.get("max_trade_duration_candles", 128) + self.max_trade_duration_candles: int = self.rl_config.get( + "max_trade_duration_candles", 128 + ) self._last_closed_position: Optional[Positions] = None self._last_closed_trade_tick: int = 0 self._max_unrealized_profit: float = -np.inf self._min_unrealized_profit: float = np.inf - if self.force_actions: - logger.info( - "%s - take_profit: %s, stop_loss: %s, timeout: %s candles (%s days), observation_space: %s", - self.id, - self.take_profit, - self.stop_loss, - self.timeout, - steps_to_days(self.timeout, self.config.get("timeframe")), - self.observation_space, - ) def _set_observation_space(self) -> None: """ @@ -1350,9 +1319,7 @@ class MyRLEnv(Base5ActionRLEnv): ) def _is_valid(self, action: int) -> bool: - return ReforceXY.get_action_masks( - self.can_short, self._position, self._force_action - )[action] + return ReforceXY.get_action_masks(self.can_short, self._position)[action] def reset_env( self, @@ -1373,7 +1340,6 @@ class MyRLEnv(Base5ActionRLEnv): Reset is called at the beginning of every episode """ observation, history = super().reset(seed, **kwargs) - self._force_action: Optional[ForceActions] = None self._last_closed_position: Optional[Positions] = None self._last_closed_trade_tick: int = 0 self._max_unrealized_profit = -np.inf @@ -1569,7 +1535,7 @@ class MyRLEnv(Base5ActionRLEnv): # mrr = self.get_most_recent_return() # mrp = self.get_most_recent_profit() - max_trade_duration = max(1, self.timeout) + max_trade_duration = max(self.max_trade_duration_candles, 1) trade_duration = self.get_trade_duration() duration_ratio = trade_duration / max_trade_duration @@ -1578,14 +1544,6 @@ class MyRLEnv(Base5ActionRLEnv): idle_factor = base_factor * pnl_target / 3.0 holding_factor = idle_factor - # Force exits - if self._force_action in ( - ForceActions.Take_profit, - ForceActions.Stop_loss, - ForceActions.Timeout, - ): - return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio) - # # you can use feature values from dataframe # rsi_now = self.get_feature_value( # name="%-rsi", @@ -1613,8 +1571,6 @@ class MyRLEnv(Base5ActionRLEnv): "max_idle_duration_candles", 2 * max_trade_duration ) ) - if max_idle_duration <= 0: - max_idle_duration = 2 * max_trade_duration idle_penalty_scale = float( model_reward_parameters.get("idle_penalty_scale", 0.5) ) @@ -1700,23 +1656,6 @@ class MyRLEnv(Base5ActionRLEnv): return np.ascontiguousarray(observations) - def _get_force_action(self) -> Optional[ForceActions]: - if not self.force_actions or self._position == Positions.Neutral: - return None - - trade_duration = self.get_trade_duration() - if trade_duration <= 1: - return None - if trade_duration >= self.timeout: - return ForceActions.Timeout - - pnl = self.get_unrealized_profit() - if pnl >= self.take_profit: - return ForceActions.Take_profit - if pnl <= self.stop_loss: - return ForceActions.Stop_loss - return None - def _get_position(self, action: int) -> Positions: return { Actions.Long_enter.value: Positions.Long, @@ -1742,11 +1681,6 @@ class MyRLEnv(Base5ActionRLEnv): """ Execute trade based on the given action """ - # Force exit trade - if self._force_action is not None: - self._exit_trade() - self.tensorboard_log(f"{self._force_action.name}", category="actions/force") - return f"{self._force_action.name}" if not self.is_tradesignal(action): return None @@ -1779,7 +1713,6 @@ class MyRLEnv(Base5ActionRLEnv): self._update_unrealized_total_profit() pre_pnl = self.get_unrealized_profit() self._update_portfolio_log_returns() - self._force_action = self._get_force_action() reward = self.calculate_reward(action) self.total_reward += reward self.tensorboard_log(Actions._member_names_[action], category="actions") @@ -1795,7 +1728,6 @@ class MyRLEnv(Base5ActionRLEnv): "tick": self._current_tick, "position": self._position.value, "action": action, - "force_action": (self._force_action.name if self._force_action else None), "pre_pnl": round(pre_pnl, 5), "pnl": round(pnl, 5), "delta_pnl": round(delta_pnl, 5), @@ -1857,9 +1789,7 @@ class MyRLEnv(Base5ActionRLEnv): ) def action_masks(self) -> NDArray[np.bool_]: - return ReforceXY.get_action_masks( - self.can_short, self._position, self._force_action - ) + return ReforceXY.get_action_masks(self.can_short, self._position) def get_feature_value( self, @@ -2468,7 +2398,7 @@ class InfoMetricsCallback(TensorboardCallback): self._safe_logger_record(f"info/{metric}", value, exclude=logger_exclude) if isinstance(infos_list, list) and infos_list: - cat_keys = ("force_action", "action", "position") + cat_keys = ("action", "position") cat_counts: Dict[str, Dict[Any, int]] = { k: defaultdict(int) for k in cat_keys }