From: Jérôme Benoit Date: Mon, 22 Jun 2026 12:31:01 +0000 (+0200) Subject: fix(quickadapter): quarantine corrupt optuna journal log and recover (#102) X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=3a7303e7b64f3ea57987ad72e8b76e5092e98466;p=freqai-strategies.git fix(quickadapter): quarantine corrupt optuna journal log and recover (#102) When `JournalFileBackend`'s replay encounters a corrupt journal record, optuna raises during `JournalStorage.__init__` (Class A/B: immediate `KeyError`/`json.JSONDecodeError`) or defers the raise to the next `_sync_with_backend` (Class C1/C2/C3: truncated tail / malformed last line / bare trailing newline at EOF). Both paths previously caused HPO for the affected pair to be silently skipped on every fit cycle. Wrap `JournalStorage` construction in a narrow `try / except (KeyError, ValueError, json.JSONDecodeError)`, atomically rename the corrupt log aside as `optuna-.log.corrupt-`, log a `WARNING`, and retry once on a fresh path. A bounded O(1) tail probe runs BEFORE construction to detect deferred-raise EOF corruption. `OSError` is intentionally excluded so filesystem failures stay operator-actionable. The recovery follows RocksDB's documented WAL recovery philosophy (quarantine + restart), preserves forensic evidence (rename, no `unlink`), and keeps the live-journal glob `optuna-*.log` from matching quarantined artefacts. --- diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index be17413..6a0311e 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -1,10 +1,12 @@ import copy +import json import logging import math import random import time import warnings from dataclasses import dataclass +from datetime import datetime, timezone from functools import lru_cache from pathlib import Path from typing import ( @@ -222,6 +224,14 @@ class QuickAdapterRegressorV3(BaseRegressionModel): optuna.study.StudyDirection.MAXIMIZE, ) * _OPTUNA_LABEL_N_OBJECTIVES _OPTUNA_STORAGE_BACKENDS: Final[tuple[str, ...]] = ("file", "sqlite") + _OPTUNA_JOURNAL_QUARANTINE_TAG: Final[str] = "corrupt" + _OPTUNA_JOURNAL_RECOVERABLE_ERRORS: Final[tuple[type[Exception], ...]] = ( + KeyError, + ValueError, + json.JSONDecodeError, + ) + _OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT: Final[int] = 8 + _OPTUNA_JOURNAL_TAIL_PROBE_BYTES: Final[int] = 65536 _OPTUNA_SAMPLERS: Final[_OptunaSamplers] = _OptunaSamplers() _OPTUNA_HPO_SAMPLERS: Final[_OptunaHpoSamplers] = _OptunaHpoSamplers() _OPTUNA_HPO_SAMPLERS_SET: Final[frozenset[OptunaSampler]] = frozenset( @@ -4195,6 +4205,112 @@ class QuickAdapterRegressorV3(BaseRegressionModel): self.optuna_save_best_params(pair, namespace) return study + @staticmethod + def _optuna_quarantine_path(journal_path: Path, now: datetime) -> Path: + """Quarantine target path for a corrupt Optuna journal. + + The tag is appended *after* ``.log`` so the live-journal glob + ``optuna-*.log`` never matches quarantined artefacts. Collisions + are bounded by ``_OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT``; + microsecond UTC stamping makes them practically impossible. + """ + stamp = now.strftime("%Y%m%dT%H%M%S%fZ") + tag = QuickAdapterRegressorV3._OPTUNA_JOURNAL_QUARANTINE_TAG + candidate = journal_path.with_name(f"{journal_path.name}.{tag}-{stamp}") + for n in range( + 1, QuickAdapterRegressorV3._OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT + 1 + ): + if not candidate.exists(): + return candidate + candidate = journal_path.with_name(f"{journal_path.name}.{tag}-{stamp}-{n}") + return candidate + + @staticmethod + def _optuna_quarantine_journal( + journal_path: Path, pair: str, cause: Exception + ) -> Optional[Path]: + """Atomically move a corrupt Optuna journal aside. + + Return the quarantine path on success, or ``None`` if + ``journal_path`` is missing (caller propagates the original + error). ``OSError`` from the rename is re-raised so operator- + actionable filesystem failures (RO mount, EXDEV, ENOSPC) are + not masked. + """ + if not journal_path.exists(): + return None + quarantine_path = QuickAdapterRegressorV3._optuna_quarantine_path( + journal_path, datetime.now(timezone.utc) + ) + try: + journal_path.rename(quarantine_path) + except OSError as rename_exc: + logger.error( + f"[{pair}] Optuna journal {journal_path.name} " + f"quarantine failed: {rename_exc!r}", + exc_info=True, + ) + raise + logger.warning( + f"[{pair}] Optuna journal {journal_path.name} corrupt ({cause!r}); " + f"quarantined to {quarantine_path.name}; resuming with fresh journal" + ) + return quarantine_path + + @staticmethod + def _optuna_journal_has_corrupt_tail(journal_path: Path) -> bool: + """Detect a deferred-raise hazard at EOF of a journal log file. + + ``JournalFileBackend.read_logs`` stores ``ValueError('Invalid + log format.')`` (missing trailing newline) and + ``json.JSONDecodeError`` (malformed or empty line) as + ``last_decode_error`` and re-raises only on the *next* + iteration. A bad trailing record with no successor therefore + does NOT raise during ``JournalStorage.__init__``; it surfaces + at the next ``_sync_with_backend``, outside the + ``optuna_create_storage`` try/except, where the broad handler + in ``optuna_create_study`` returns ``None`` (silent HPO skip). + + Bounded tail probe (last ``_OPTUNA_JOURNAL_TAIL_PROBE_BYTES``). + Return True iff the file is non-empty AND its trailing record + is (a) missing the newline, (b) empty (bare ``\\n``), or + (c) malformed JSON. Fail-open when the probe window cuts a + single line larger than the window; defer to the + post-construction handler. + """ + if not journal_path.exists(): + return False + try: + size = journal_path.stat().st_size + if size == 0: + return False + with journal_path.open("rb") as f: + f.seek( + max( + 0, + size - QuickAdapterRegressorV3._OPTUNA_JOURNAL_TAIL_PROBE_BYTES, + ) + ) + tail = f.read() + except OSError: + return False + if not tail.endswith(b"\n"): + return True + last_newline = tail.rfind(b"\n", 0, len(tail) - 1) + if last_newline == -1: + if size > QuickAdapterRegressorV3._OPTUNA_JOURNAL_TAIL_PROBE_BYTES: + return False + last_line = tail[:-1] + else: + last_line = tail[last_newline + 1 : -1] + if not last_line: + return True + try: + json.loads(last_line) + except json.JSONDecodeError: + return True + return False + def optuna_create_storage(self, pair: str) -> optuna.storages.BaseStorage: storage_dir = self.full_path storage_filename = f"optuna-{pair.split('/')[0]}" @@ -4202,11 +4318,34 @@ class QuickAdapterRegressorV3(BaseRegressionModel): if ( storage_backend == QuickAdapterRegressorV3._OPTUNA_STORAGE_BACKENDS[0] ): # "file" - storage = optuna.storages.JournalStorage( - optuna.storages.journal.JournalFileBackend( - f"{storage_dir}/{storage_filename}.log" + journal_path = storage_dir / f"{storage_filename}.log" + + # Pre-validate EOF: close the read_logs deferred-raise gap (see helper). + if QuickAdapterRegressorV3._optuna_journal_has_corrupt_tail(journal_path): + QuickAdapterRegressorV3._optuna_quarantine_journal( + journal_path, + pair, + ValueError( + "trailing journal record is truncated or malformed JSON" + ), ) - ) + + def _build_journal_storage() -> optuna.storages.JournalStorage: + return optuna.storages.JournalStorage( + optuna.storages.journal.JournalFileBackend(str(journal_path)) + ) + + try: + storage = _build_journal_storage() + except QuickAdapterRegressorV3._OPTUNA_JOURNAL_RECOVERABLE_ERRORS as exc: + # Replay-time corruption: quarantine + retry once. OSError is + # excluded from the tuple — FS failures stay operator-actionable. + quarantined = QuickAdapterRegressorV3._optuna_quarantine_journal( + journal_path, pair, exc + ) + if quarantined is None: + raise + storage = _build_journal_storage() elif ( storage_backend == QuickAdapterRegressorV3._OPTUNA_STORAGE_BACKENDS[1] ): # "sqlite"