]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
fix(quickadapter): quarantine corrupt optuna journal log and recover (#102)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 22 Jun 2026 12:31:01 +0000 (14:31 +0200)
committerGitHub <noreply@github.com>
Mon, 22 Jun 2026 12:31:01 +0000 (14:31 +0200)
When `JournalFileBackend`'s replay encounters a corrupt journal record, optuna raises during `JournalStorage.__init__` (Class A/B: immediate `KeyError`/`json.JSONDecodeError`) or defers the raise to the next `_sync_with_backend` (Class C1/C2/C3: truncated tail / malformed last line / bare trailing newline at EOF). Both paths previously caused HPO for the affected pair to be silently skipped on every fit cycle.

Wrap `JournalStorage` construction in a narrow `try / except (KeyError, ValueError, json.JSONDecodeError)`, atomically rename the corrupt log aside as `optuna-<COIN>.log.corrupt-<UTC_µs>`, log a `WARNING`, and retry once on a fresh path. A bounded O(1) tail probe runs BEFORE construction to detect deferred-raise EOF corruption. `OSError` is intentionally excluded so filesystem failures stay operator-actionable.

The recovery follows RocksDB's documented WAL recovery philosophy (quarantine + restart), preserves forensic evidence (rename, no `unlink`), and keeps the live-journal glob `optuna-*.log` from matching quarantined artefacts.

quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py

index be1741316f3fe08a0153d8982f52da15330a98f5..6a0311e2b451c35188bc3fb0b9d9361fc9ecbb70 100644 (file)
@@ -1,10 +1,12 @@
 import copy
+import json
 import logging
 import math
 import random
 import time
 import warnings
 from dataclasses import dataclass
+from datetime import datetime, timezone
 from functools import lru_cache
 from pathlib import Path
 from typing import (
@@ -222,6 +224,14 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         optuna.study.StudyDirection.MAXIMIZE,
     ) * _OPTUNA_LABEL_N_OBJECTIVES
     _OPTUNA_STORAGE_BACKENDS: Final[tuple[str, ...]] = ("file", "sqlite")
+    _OPTUNA_JOURNAL_QUARANTINE_TAG: Final[str] = "corrupt"
+    _OPTUNA_JOURNAL_RECOVERABLE_ERRORS: Final[tuple[type[Exception], ...]] = (
+        KeyError,
+        ValueError,
+        json.JSONDecodeError,
+    )
+    _OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT: Final[int] = 8
+    _OPTUNA_JOURNAL_TAIL_PROBE_BYTES: Final[int] = 65536
     _OPTUNA_SAMPLERS: Final[_OptunaSamplers] = _OptunaSamplers()
     _OPTUNA_HPO_SAMPLERS: Final[_OptunaHpoSamplers] = _OptunaHpoSamplers()
     _OPTUNA_HPO_SAMPLERS_SET: Final[frozenset[OptunaSampler]] = frozenset(
@@ -4195,6 +4205,112 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         self.optuna_save_best_params(pair, namespace)
         return study
 
+    @staticmethod
+    def _optuna_quarantine_path(journal_path: Path, now: datetime) -> Path:
+        """Quarantine target path for a corrupt Optuna journal.
+
+        The tag is appended *after* ``.log`` so the live-journal glob
+        ``optuna-*.log`` never matches quarantined artefacts. Collisions
+        are bounded by ``_OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT``;
+        microsecond UTC stamping makes them practically impossible.
+        """
+        stamp = now.strftime("%Y%m%dT%H%M%S%fZ")
+        tag = QuickAdapterRegressorV3._OPTUNA_JOURNAL_QUARANTINE_TAG
+        candidate = journal_path.with_name(f"{journal_path.name}.{tag}-{stamp}")
+        for n in range(
+            1, QuickAdapterRegressorV3._OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT + 1
+        ):
+            if not candidate.exists():
+                return candidate
+            candidate = journal_path.with_name(f"{journal_path.name}.{tag}-{stamp}-{n}")
+        return candidate
+
+    @staticmethod
+    def _optuna_quarantine_journal(
+        journal_path: Path, pair: str, cause: Exception
+    ) -> Optional[Path]:
+        """Atomically move a corrupt Optuna journal aside.
+
+        Return the quarantine path on success, or ``None`` if
+        ``journal_path`` is missing (caller propagates the original
+        error). ``OSError`` from the rename is re-raised so operator-
+        actionable filesystem failures (RO mount, EXDEV, ENOSPC) are
+        not masked.
+        """
+        if not journal_path.exists():
+            return None
+        quarantine_path = QuickAdapterRegressorV3._optuna_quarantine_path(
+            journal_path, datetime.now(timezone.utc)
+        )
+        try:
+            journal_path.rename(quarantine_path)
+        except OSError as rename_exc:
+            logger.error(
+                f"[{pair}] Optuna journal {journal_path.name} "
+                f"quarantine failed: {rename_exc!r}",
+                exc_info=True,
+            )
+            raise
+        logger.warning(
+            f"[{pair}] Optuna journal {journal_path.name} corrupt ({cause!r}); "
+            f"quarantined to {quarantine_path.name}; resuming with fresh journal"
+        )
+        return quarantine_path
+
+    @staticmethod
+    def _optuna_journal_has_corrupt_tail(journal_path: Path) -> bool:
+        """Detect a deferred-raise hazard at EOF of a journal log file.
+
+        ``JournalFileBackend.read_logs`` stores ``ValueError('Invalid
+        log format.')`` (missing trailing newline) and
+        ``json.JSONDecodeError`` (malformed or empty line) as
+        ``last_decode_error`` and re-raises only on the *next*
+        iteration. A bad trailing record with no successor therefore
+        does NOT raise during ``JournalStorage.__init__``; it surfaces
+        at the next ``_sync_with_backend``, outside the
+        ``optuna_create_storage`` try/except, where the broad handler
+        in ``optuna_create_study`` returns ``None`` (silent HPO skip).
+
+        Bounded tail probe (last ``_OPTUNA_JOURNAL_TAIL_PROBE_BYTES``).
+        Return True iff the file is non-empty AND its trailing record
+        is (a) missing the newline, (b) empty (bare ``\\n``), or
+        (c) malformed JSON. Fail-open when the probe window cuts a
+        single line larger than the window; defer to the
+        post-construction handler.
+        """
+        if not journal_path.exists():
+            return False
+        try:
+            size = journal_path.stat().st_size
+            if size == 0:
+                return False
+            with journal_path.open("rb") as f:
+                f.seek(
+                    max(
+                        0,
+                        size - QuickAdapterRegressorV3._OPTUNA_JOURNAL_TAIL_PROBE_BYTES,
+                    )
+                )
+                tail = f.read()
+        except OSError:
+            return False
+        if not tail.endswith(b"\n"):
+            return True
+        last_newline = tail.rfind(b"\n", 0, len(tail) - 1)
+        if last_newline == -1:
+            if size > QuickAdapterRegressorV3._OPTUNA_JOURNAL_TAIL_PROBE_BYTES:
+                return False
+            last_line = tail[:-1]
+        else:
+            last_line = tail[last_newline + 1 : -1]
+        if not last_line:
+            return True
+        try:
+            json.loads(last_line)
+        except json.JSONDecodeError:
+            return True
+        return False
+
     def optuna_create_storage(self, pair: str) -> optuna.storages.BaseStorage:
         storage_dir = self.full_path
         storage_filename = f"optuna-{pair.split('/')[0]}"
@@ -4202,11 +4318,34 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         if (
             storage_backend == QuickAdapterRegressorV3._OPTUNA_STORAGE_BACKENDS[0]
         ):  # "file"
-            storage = optuna.storages.JournalStorage(
-                optuna.storages.journal.JournalFileBackend(
-                    f"{storage_dir}/{storage_filename}.log"
+            journal_path = storage_dir / f"{storage_filename}.log"
+
+            # Pre-validate EOF: close the read_logs deferred-raise gap (see helper).
+            if QuickAdapterRegressorV3._optuna_journal_has_corrupt_tail(journal_path):
+                QuickAdapterRegressorV3._optuna_quarantine_journal(
+                    journal_path,
+                    pair,
+                    ValueError(
+                        "trailing journal record is truncated or malformed JSON"
+                    ),
                 )
-            )
+
+            def _build_journal_storage() -> optuna.storages.JournalStorage:
+                return optuna.storages.JournalStorage(
+                    optuna.storages.journal.JournalFileBackend(str(journal_path))
+                )
+
+            try:
+                storage = _build_journal_storage()
+            except QuickAdapterRegressorV3._OPTUNA_JOURNAL_RECOVERABLE_ERRORS as exc:
+                # Replay-time corruption: quarantine + retry once. OSError is
+                # excluded from the tuple — FS failures stay operator-actionable.
+                quarantined = QuickAdapterRegressorV3._optuna_quarantine_journal(
+                    journal_path, pair, exc
+                )
+                if quarantined is None:
+                    raise
+                storage = _build_journal_storage()
         elif (
             storage_backend == QuickAdapterRegressorV3._OPTUNA_STORAGE_BACKENDS[1]
         ):  # "sqlite"