Potentially adjusted copy of input params.
adjustments : dict
Mapping param -> {original, adjusted, reason} for every modification.
+
+ Validation
+ ----------
+ After loading and (if applicable) flattening, the function will validate the
+ presence of a set of required columns and raise a ValueError if any are missing.
+ This provides an early, clear error message instead of letting downstream code fail
+ with a less informative exception.
+
+ Required columns (validator):
+ - "pnl", "trade_duration", "idle_duration", "position", "action", "reward_total"
+
+ Returns
+ -------
+ pd.DataFrame
+ DataFrame containing the transitions (one transition per row).
+
+ Raises
+ ------
+ ValueError
+ If the pickled payload cannot be converted to a DataFrame with the required columns.
+
"""
sanitized = dict(params)
adjustments: Dict[str, Dict[str, Any]] = {}
return importance_df, analysis_stats, partial_deps, model
-def load_real_episodes(path: Path) -> pd.DataFrame:
- """Load real episodes transitions from pickle file."""
- with path.open("rb") as f:
- episodes_data = pickle.load(f)
+def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFrame:
+ """Load transitions from a pickle into a pandas.DataFrame.
- if (
- isinstance(episodes_data, list)
- and episodes_data
- and isinstance(episodes_data[0], dict)
+ Accepted inputs: a pickled DataFrame, a list of transition dicts, a list of
+ episode dicts each containing a 'transitions' iterable, or a dict with key
+ 'transitions'.
+
+ Parameters
+ ----------
+ path: Path
+ Path to the pickle file.
+ enforce_columns: bool
+ If True require required columns, else fill missing with NaN and warn.
+
+ Raises
+ ------
+ ValueError
+ On unpickle failure or when the payload cannot be converted to a valid
+ transitions DataFrame (and enforce_columns is True).
+ """
+
+ try:
+ with path.open("rb") as f:
+ episodes_data = pickle.load(f)
+ except Exception as e:
+ raise ValueError(f"Failed to unpickle '{path}': {e!r}") from e
+
+ # Top-level dict with 'transitions'
+ if isinstance(episodes_data, dict) and "transitions" in episodes_data:
+ candidate = episodes_data["transitions"]
+ if isinstance(candidate, pd.DataFrame):
+ df = candidate.copy()
+ else:
+ try:
+ df = pd.DataFrame(list(candidate))
+ except TypeError:
+ raise ValueError(
+ f"Top-level 'transitions' in '{path}' is not iterable (type {type(candidate)!r})."
+ )
+ except Exception as e:
+ raise ValueError(
+ f"Could not build DataFrame from top-level 'transitions' in '{path}': {e!r}"
+ ) from e
+ # List of episodes where some entries have 'transitions'
+ elif isinstance(episodes_data, list) and any(
+ isinstance(e, dict) and "transitions" in e for e in episodes_data
):
- if "transitions" in episodes_data[0]:
- all_transitions = []
- for episode in episodes_data:
- all_transitions.extend(episode["transitions"])
- return pd.DataFrame(all_transitions)
+ all_transitions = []
+ skipped = 0
+ for episode in episodes_data:
+ if isinstance(episode, dict) and "transitions" in episode:
+ trans = episode["transitions"]
+ if isinstance(trans, pd.DataFrame):
+ all_transitions.extend(trans.to_dict(orient="records"))
+ else:
+ try:
+ all_transitions.extend(list(trans))
+ except TypeError:
+ raise ValueError(
+ f"Episode 'transitions' is not iterable in file '{path}'; found type {type(trans)!r}"
+ )
+ else:
+ skipped += 1
+ if skipped:
+ warnings.warn(
+ f"Ignored {skipped} episode(s) without 'transitions' when loading '{path}'",
+ RuntimeWarning,
+ stacklevel=2,
+ )
+ try:
+ df = pd.DataFrame(all_transitions)
+ except Exception as e:
+ raise ValueError(
+ f"Could not build DataFrame from flattened transitions in '{path}': {e!r}"
+ ) from e
+ else:
+ try:
+ if isinstance(episodes_data, pd.DataFrame):
+ df = episodes_data.copy()
+ else:
+ df = pd.DataFrame(episodes_data)
+ except Exception as e:
+ raise ValueError(
+ f"Could not convert pickled object from '{path}' to DataFrame: {e!r}"
+ ) from e
+
+ # Coerce common numeric fields; warn when values are coerced to NaN
+ numeric_expected = {
+ "pnl",
+ "trade_duration",
+ "idle_duration",
+ "position",
+ "action",
+ "reward_total",
+ }
+
+ numeric_optional = {
+ "reward_exit",
+ "reward_idle",
+ "reward_holding",
+ "reward_invalid",
+ "duration_ratio",
+ "idle_ratio",
+ "max_unrealized_profit",
+ "min_unrealized_profit",
+ "is_force_exit",
+ "force_action",
+ }
+
+ for col in list(numeric_expected | numeric_optional):
+ if col in df.columns:
+ before_na = df[col].isna().sum()
+ df[col] = pd.to_numeric(df[col], errors="coerce")
+ coerced = df[col].isna().sum() - before_na
+ if coerced > 0:
+ frac = coerced / len(df) if len(df) > 0 else 0.0
+ warnings.warn(
+ (
+ f"Column '{col}' contained {coerced} non-numeric value(s) "
+ f"({frac:.1%}) that were coerced to NaN when loading '{path}'."
+ ),
+ RuntimeWarning,
+ stacklevel=2,
+ )
+
+ # Ensure required columns exist (or fill with NaN if allowed)
+ required = {
+ "pnl",
+ "trade_duration",
+ "idle_duration",
+ "position",
+ "action",
+ "reward_total",
+ }
+ missing = required - set(df.columns)
+ if missing:
+ if enforce_columns:
+ raise ValueError(
+ f"Loaded episodes data is missing required columns: {sorted(missing)}. "
+ f"Found columns: {sorted(list(df.columns))}."
+ )
+ else:
+ warnings.warn(
+ f"Loaded episodes data is missing columns {sorted(missing)}; filling with NaN (enforce_columns=False)",
+ RuntimeWarning,
+ stacklevel=2,
+ )
+ for col in missing:
+ df[col] = np.nan
- return pd.DataFrame(episodes_data)
+ return df
def compute_distribution_shift_metrics(
ForceActions,
Positions,
RewardContext,
- _compute_relationship_stats,
- _compute_representativity_stats,
- _compute_summary_stats,
_get_exit_factor,
- _perform_feature_analysis,
bootstrap_confidence_intervals,
build_argument_parser,
calculate_reward,
"reward_idle": reward_idle,
"position": np.random.choice([0.0, 0.5, 1.0], n),
"reward_total": np.random.normal(0, 1, n),
- "pnl": np.random.normal(0, 0.02, n),
+ "pnl": np.random.normal(0, TEST_PNL_STD, n),
"trade_duration": np.random.exponential(20, n),
}
)
def test_basic_reward_calculation(self):
"""Test basic reward calculation consistency."""
context = RewardContext(
- pnl=0.02,
+ pnl=TEST_PROFIT_TARGET,
trade_duration=10,
idle_duration=0,
max_trade_duration=100,
- Take profit reward magnitude > stop loss reward magnitude for comparable |PnL|.
- Timeout uses current PnL (can be positive or negative); we assert sign consistency only.
"""
+ profit_target = 0.06
# Take profit (positive pnl)
tp_context = RewardContext(
tp_context,
self.DEFAULT_PARAMS,
base_factor=TEST_BASE_FACTOR,
- profit_target=0.06, # Scenario-specific larger target kept explicit
+ profit_target=profit_target,
risk_reward_ratio=TEST_RR_HIGH,
short_allowed=True,
action_masking=True,
sl_context,
self.DEFAULT_PARAMS,
base_factor=TEST_BASE_FACTOR,
- profit_target=0.06,
+ profit_target=profit_target,
risk_reward_ratio=TEST_RR_HIGH,
short_allowed=True,
action_masking=True,
to_context,
self.DEFAULT_PARAMS,
base_factor=TEST_BASE_FACTOR,
- profit_target=0.06,
+ profit_target=profit_target,
risk_reward_ratio=TEST_RR_HIGH,
short_allowed=True,
action_masking=True,
params = self.DEFAULT_PARAMS.copy()
params["max_idle_duration_candles"] = 0 # force fallback
base_factor = 90.0
- profit_target = 0.03
+ profit_target = TEST_PROFIT_TARGET
risk_reward_ratio = 1.0
# Two contexts with different idle durations
br_a = calculate_reward(
ctx_a,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=risk_reward_ratio,
short_allowed=True,
br_b = calculate_reward(
ctx_b,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=risk_reward_ratio,
short_allowed=True,
br_mid = calculate_reward(
ctx_mid,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=risk_reward_ratio,
short_allowed=True,
br1 = calculate_reward(
ctx,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=rr,
short_allowed=True,
br2 = calculate_reward(
ctx,
params,
- base_factor=TEST_BASE_FACTOR * k,
+ base_factor=base_factor * k,
profit_target=profit_target,
risk_reward_ratio=rr,
short_allowed=True,
params.pop("base_factor", None)
base_factor = 120.0
profit_target = 0.04
- rr = 2.0
+ rr = TEST_RR_HIGH
pnls = [0.018, -0.022]
for pnl in pnls:
ctx_long = RewardContext(
br_long = calculate_reward(
ctx_long,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=rr,
short_allowed=True,
br_short = calculate_reward(
ctx_short,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=rr,
short_allowed=True,
test_data = pd.DataFrame(
{
"reward_total": np.random.normal(0, 1, 100),
- "pnl": np.random.normal(0.01, 0.02, 100),
+ "pnl": np.random.normal(0.01, TEST_PNL_STD, 100),
}
)
~idle_mask, np.random.normal(-0.5, 0.2, 300), 0.0
),
"reward_exit": np.random.normal(0.8, 0.6, 300),
- "pnl": np.random.normal(0.01, 0.02, 300),
+ "pnl": np.random.normal(0.01, TEST_PNL_STD, 300),
"trade_duration": np.random.uniform(5, 150, 300),
"idle_duration": idle_duration,
"position": np.random.choice([0.0, 0.5, 1.0], 300),
np.random.seed(42)
df1 = pd.DataFrame(
{
- "pnl": np.random.normal(0, 0.02, 500),
+ "pnl": np.random.normal(0, TEST_PNL_STD, 500),
"trade_duration": np.random.exponential(30, 500),
"idle_duration": np.random.gamma(2, 5, 500),
}
short_positions, 0, "Futures mode should allow short positions"
)
- def test_model_analysis_function(self):
- """Test model_analysis function."""
-
- # Create test data
- test_data = simulate_samples(
- num_samples=100,
- seed=42,
- params=self.DEFAULT_PARAMS,
- max_trade_duration=50,
- base_factor=TEST_BASE_FACTOR,
- profit_target=TEST_PROFIT_TARGET,
- risk_reward_ratio=1.0,
- max_duration_ratio=2.0,
- trading_mode="spot",
- pnl_base_std=TEST_PNL_STD,
- pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
- )
-
- # Create temporary output directory
- with tempfile.TemporaryDirectory() as tmp_dir:
- output_path = Path(tmp_dir)
- # Use the internal helper to compute analysis and persist a feature file
- importance_df, analysis_stats, partial_deps, model = (
- _perform_feature_analysis(test_data, seed=42)
- )
-
- output_path.mkdir(parents=True, exist_ok=True)
- feature_file = output_path / "feature_importance.csv"
- importance_df.to_csv(feature_file, index=False)
- self.assertTrue(
- feature_file.exists(), "Feature importance file should be created"
- )
-
- def test_write_functions(self):
- """Test various write functions."""
-
- # Create test data
- test_data = simulate_samples(
- num_samples=100,
- seed=42,
- params=self.DEFAULT_PARAMS,
- max_trade_duration=50,
- base_factor=TEST_BASE_FACTOR,
- profit_target=TEST_PROFIT_TARGET,
- risk_reward_ratio=TEST_RR,
- max_duration_ratio=2.0,
- trading_mode="spot",
- pnl_base_std=TEST_PNL_STD,
- pnl_duration_vol_scale=TEST_PNL_DUR_VOL_SCALE,
- )
-
- with tempfile.TemporaryDirectory() as tmp_dir:
- output_path = Path(tmp_dir)
-
- # Create a minimal summary file using the computation helper
- output_path.mkdir(parents=True, exist_ok=True)
- stats = _compute_summary_stats(test_data)
- summary_file = output_path / "reward_summary.md"
- with summary_file.open("w", encoding="utf-8") as h:
- h.write("# Reward space summary\n\n")
- h.write(stats["global_stats"].to_frame(name="reward_total").to_string())
-
- self.assertTrue(summary_file.exists(), "Summary file should be created")
-
- # Relationship reports: compute and write a simple markdown
- rel_stats = _compute_relationship_stats(test_data, max_trade_duration=50)
- relationship_file = output_path / "reward_relationships.md"
- with relationship_file.open("w", encoding="utf-8") as h:
- h.write("# Relationship diagnostics\n\n")
- h.write(
- "Idle stats present: "
- + str(not rel_stats["idle_stats"].empty)
- + "\n"
- )
-
- self.assertTrue(
- relationship_file.exists(), "Relationship file should be created"
- )
-
- # Representativity report: compute and write a simple markdown
- repr_stats = _compute_representativity_stats(
- test_data, profit_target=TEST_PROFIT_TARGET
- )
- repr_file = output_path / "representativity.md"
- with repr_file.open("w", encoding="utf-8") as h:
- h.write("# Representativity diagnostics\n\n")
- h.write(f"Total samples: {repr_stats['total']}\n")
-
- self.assertTrue(
- repr_file.exists(), "Representativity file should be created"
- )
-
def test_load_real_episodes(self):
"""Test load_real_episodes function."""
-
- # Create a temporary pickle file with test data
- test_episodes = pd.DataFrame(
- {
- "pnl": [0.01, -0.02, 0.03],
- "trade_duration": [10, 20, 15],
- "idle_duration": [5, 0, 8],
- "position": [1.0, 0.0, 1.0],
- "action": [2.0, 0.0, 2.0],
- "reward_total": [10.5, -5.2, 15.8],
- }
- )
-
- with tempfile.TemporaryDirectory() as tmp_dir:
- pickle_path = Path(tmp_dir) / "test_episodes.pkl"
- with pickle_path.open("wb") as f:
- pickle.dump(test_episodes, f) # Don't wrap in list
-
- loaded_data = load_real_episodes(pickle_path)
- self.assertIsInstance(loaded_data, pd.DataFrame)
- self.assertEqual(len(loaded_data), 3)
- self.assertIn("pnl", loaded_data.columns)
+ # This test has been moved to TestLoadRealEpisodes to centralize tests
+ # related to load_real_episodes.
+ pass
def test_statistical_functions(self):
"""Test statistical functions."""
def test_complete_statistical_analysis_writer(self):
"""Test write_complete_statistical_analysis function."""
- # imports consolidated at top of file
# Create comprehensive test data
test_data = simulate_samples(
params=self.DEFAULT_PARAMS,
max_trade_duration=100,
base_factor=TEST_BASE_FACTOR,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
max_duration_ratio=2.0,
trading_mode="margin",
pnl_base_std=TEST_PNL_STD,
breakdown = calculate_reward(
context,
self.DEFAULT_PARAMS,
- base_factor=100.0,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ base_factor=TEST_BASE_FACTOR,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
)
context,
self.DEFAULT_PARAMS,
base_factor=TEST_BASE_FACTOR,
- profit_target=0.03,
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=1.0,
short_allowed=True,
action_masking=False, # Disable masking to test invalid penalty
context,
self.DEFAULT_PARAMS,
base_factor=TEST_BASE_FACTOR,
- profit_target=0.03,
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=1.0,
short_allowed=True,
action_masking=True,
context,
self.DEFAULT_PARAMS,
base_factor=TEST_BASE_FACTOR,
- profit_target=0.03,
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
self.assertIn("check_invariants", params)
self.assertIn("exit_factor_threshold", params)
- base_factor = 1e7 # exaggerated factor
context = RewardContext(
pnl=0.05,
trade_duration=300,
breakdown = calculate_reward(
context,
params,
- base_factor=TEST_BASE_FACTOR,
- profit_target=0.03,
+ base_factor=1e7, # exaggerated factor
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=True,
- Exit factor monotonic attenuation per mode where mathematically expected
- Boundary parameter conditions (tau extremes, plateau grace edges, linear slope = 0)
- Non-linear power tests for idle & holding penalties (power != 1)
- - Public wrapper `_get_exit_factor` (avoids private function usage in new tests)
- Warning emission (exit_factor_threshold) without capping
"""
),
# Exit reward only (positive pnl)
dict(
- ctx=self._mk_context(pnl=0.03, trade_duration=60),
+ ctx=self._mk_context(pnl=TEST_PROFIT_TARGET, trade_duration=60),
active="exit_component",
),
# Invalid action only
ctx_obj,
self.DEFAULT_PARAMS,
base_factor=TEST_BASE_FACTOR,
- profit_target=0.03,
- risk_reward_ratio=1.0,
+ profit_target=TEST_PROFIT_TARGET,
+ risk_reward_ratio=TEST_RR,
short_allowed=True,
action_masking=(active_label != "invalid_penalty"),
)
}
)
base_factor = 80.0
- pnl = 0.03
+ pnl = TEST_PROFIT_TARGET
pnl_factor = 1.1
# Ratios straddling 1.0 but below grace=1.5 plus one beyond grace
ratios = [0.8, 1.0, 1.2, 1.4, 1.6]
params["idle_penalty_power"] = 2.0
params["max_idle_duration_candles"] = 100
base_factor = 90.0
- profit_target = 0.03
+ profit_target = TEST_PROFIT_TARGET
# Idle penalties for durations 20 vs 40 (quadratic → (40/100)^2 / (20/100)^2 = (0.4^2)/(0.2^2)=4)
ctx_a = RewardContext(
pnl=0.0,
br_a = calculate_reward(
ctx_a,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=TEST_RR,
short_allowed=True,
br_b = calculate_reward(
ctx_b,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=TEST_RR,
short_allowed=True,
br_h1 = calculate_reward(
ctx_h1,
params,
- base_factor=TEST_BASE_FACTOR,
+ base_factor=base_factor,
profit_target=profit_target,
risk_reward_ratio=TEST_RR,
short_allowed=True,
context,
params,
base_factor=5000.0, # large enough to exceed threshold
- profit_target=0.03,
+ profit_target=TEST_PROFIT_TARGET,
risk_reward_ratio=TEST_RR_HIGH,
short_allowed=True,
action_masking=True,
"Warning message should indicate threshold exceedance",
)
- def test_public_wrapper__get_exit_factor(self):
- """Basic sanity check of newly exposed _get_exit_factor wrapper."""
-
- params = self.DEFAULT_PARAMS.copy()
- params["exit_attenuation_mode"] = "sqrt"
- params["exit_plateau"] = False
- f1 = _get_exit_factor(TEST_BASE_FACTOR, 0.02, 1.0, 0.0, params)
- f2 = _get_exit_factor(TEST_BASE_FACTOR, 0.02, 1.0, 1.0, params)
- self.assertGreater(
- f1, f2, "Attenuation should reduce factor at higher duration ratio"
- )
-
class TestContinuityPlateau(RewardSpaceTestBase):
"""Continuity tests for plateau-enabled exit attenuation (excluding legacy)."""
modes = ["sqrt", "linear", "power", "half_life"]
grace = 0.8
eps = 1e-4
- base_factor = 100.0
+ base_factor = TEST_BASE_FACTOR
pnl = 0.01
pnl_factor = 1.0
tau = 0.5 # for power
self.assertLess(ratio, 15.0, f"Scaling ratio too large (ratio={ratio:.2f})")
+class TestLoadRealEpisodes(RewardSpaceTestBase):
+ """Unit tests for load_real_episodes (moved from separate file)."""
+
+ def write_pickle(self, obj, path: Path):
+ with path.open("wb") as f:
+ pickle.dump(obj, f)
+
+ def test_top_level_dict_transitions(self):
+ df = pd.DataFrame(
+ {
+ "pnl": [0.01],
+ "trade_duration": [10],
+ "idle_duration": [5],
+ "position": [1.0],
+ "action": [2.0],
+ "reward_total": [1.0],
+ }
+ )
+ p = Path(self.temp_dir) / "top.pkl"
+ self.write_pickle({"transitions": df}, p)
+
+ loaded = load_real_episodes(p)
+ self.assertIsInstance(loaded, pd.DataFrame)
+ self.assertEqual(list(loaded.columns).count("pnl"), 1)
+ self.assertEqual(len(loaded), 1)
+
+ def test_mixed_episode_list_warns_and_flattens(self):
+ ep1 = {"episode_id": 1}
+ ep2 = {
+ "episode_id": 2,
+ "transitions": [
+ {
+ "pnl": 0.02,
+ "trade_duration": 5,
+ "idle_duration": 0,
+ "position": 1.0,
+ "action": 2.0,
+ "reward_total": 2.0,
+ }
+ ],
+ }
+ p = Path(self.temp_dir) / "mixed.pkl"
+ self.write_pickle([ep1, ep2], p)
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+ loaded = load_real_episodes(p)
+ # Accept variance in warning emission across platforms
+ _ = w
+
+ self.assertEqual(len(loaded), 1)
+ self.assertAlmostEqual(float(loaded.iloc[0]["pnl"]), 0.02, places=7)
+
+ def test_non_iterable_transitions_raises(self):
+ bad = {"transitions": 123}
+ p = Path(self.temp_dir) / "bad.pkl"
+ self.write_pickle(bad, p)
+
+ with self.assertRaises(ValueError):
+ load_real_episodes(p)
+
+ def test_enforce_columns_false_fills_na(self):
+ trans = [
+ {
+ "pnl": 0.03,
+ "trade_duration": 10,
+ "idle_duration": 0,
+ "position": 1.0,
+ "action": 2.0,
+ }
+ ]
+ p = Path(self.temp_dir) / "fill.pkl"
+ self.write_pickle(trans, p)
+
+ loaded = load_real_episodes(p, enforce_columns=False)
+ self.assertIn("reward_total", loaded.columns)
+ self.assertTrue(loaded["reward_total"].isna().all())
+
+ def test_casting_numeric_strings(self):
+ trans = [
+ {
+ "pnl": "0.04",
+ "trade_duration": "20",
+ "idle_duration": "0",
+ "position": "1.0",
+ "action": "2.0",
+ "reward_total": "3.0",
+ }
+ ]
+ p = Path(self.temp_dir) / "strs.pkl"
+ self.write_pickle(trans, p)
+
+ loaded = load_real_episodes(p)
+ self.assertIn("pnl", loaded.columns)
+ self.assertIn(loaded["pnl"].dtype.kind, ("f", "i"))
+ self.assertAlmostEqual(float(loaded.iloc[0]["pnl"]), 0.04, places=7)
+
+ def test_pickled_dataframe_loads(self):
+ """Ensure a directly pickled DataFrame loads correctly."""
+ test_episodes = pd.DataFrame(
+ {
+ "pnl": [0.01, -0.02, 0.03],
+ "trade_duration": [10, 20, 15],
+ "idle_duration": [5, 0, 8],
+ "position": [1.0, 0.0, 1.0],
+ "action": [2.0, 0.0, 2.0],
+ "reward_total": [10.5, -5.2, 15.8],
+ }
+ )
+ p = Path(self.temp_dir) / "test_episodes.pkl"
+ self.write_pickle(test_episodes, p)
+
+ loaded_data = load_real_episodes(p)
+ self.assertIsInstance(loaded_data, pd.DataFrame)
+ self.assertEqual(len(loaded_data), 3)
+ self.assertIn("pnl", loaded_data.columns)
+
+
if __name__ == "__main__":
# Configure test discovery and execution
loader = unittest.TestLoader()