import copy
+import json
import logging
import math
import random
import time
import warnings
from dataclasses import dataclass
+from datetime import datetime, timezone
from functools import lru_cache
from pathlib import Path
from typing import (
optuna.study.StudyDirection.MAXIMIZE,
) * _OPTUNA_LABEL_N_OBJECTIVES
_OPTUNA_STORAGE_BACKENDS: Final[tuple[str, ...]] = ("file", "sqlite")
+ _OPTUNA_JOURNAL_QUARANTINE_TAG: Final[str] = "corrupt"
+ _OPTUNA_JOURNAL_RECOVERABLE_ERRORS: Final[tuple[type[Exception], ...]] = (
+ KeyError,
+ ValueError,
+ json.JSONDecodeError,
+ )
+ _OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT: Final[int] = 8
+ _OPTUNA_JOURNAL_TAIL_PROBE_BYTES: Final[int] = 65536
_OPTUNA_SAMPLERS: Final[_OptunaSamplers] = _OptunaSamplers()
_OPTUNA_HPO_SAMPLERS: Final[_OptunaHpoSamplers] = _OptunaHpoSamplers()
_OPTUNA_HPO_SAMPLERS_SET: Final[frozenset[OptunaSampler]] = frozenset(
self.optuna_save_best_params(pair, namespace)
return study
+ @staticmethod
+ def _optuna_quarantine_path(journal_path: Path, now: datetime) -> Path:
+ """Quarantine target path for a corrupt Optuna journal.
+
+ The tag is appended *after* ``.log`` so the live-journal glob
+ ``optuna-*.log`` never matches quarantined artefacts. Collisions
+ are bounded by ``_OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT``;
+ microsecond UTC stamping makes them practically impossible.
+ """
+ stamp = now.strftime("%Y%m%dT%H%M%S%fZ")
+ tag = QuickAdapterRegressorV3._OPTUNA_JOURNAL_QUARANTINE_TAG
+ candidate = journal_path.with_name(f"{journal_path.name}.{tag}-{stamp}")
+ for n in range(
+ 1, QuickAdapterRegressorV3._OPTUNA_JOURNAL_QUARANTINE_TIE_BREAK_LIMIT + 1
+ ):
+ if not candidate.exists():
+ return candidate
+ candidate = journal_path.with_name(f"{journal_path.name}.{tag}-{stamp}-{n}")
+ return candidate
+
+ @staticmethod
+ def _optuna_quarantine_journal(
+ journal_path: Path, pair: str, cause: Exception
+ ) -> Optional[Path]:
+ """Atomically move a corrupt Optuna journal aside.
+
+ Return the quarantine path on success, or ``None`` if
+ ``journal_path`` is missing (caller propagates the original
+ error). ``OSError`` from the rename is re-raised so operator-
+ actionable filesystem failures (RO mount, EXDEV, ENOSPC) are
+ not masked.
+ """
+ if not journal_path.exists():
+ return None
+ quarantine_path = QuickAdapterRegressorV3._optuna_quarantine_path(
+ journal_path, datetime.now(timezone.utc)
+ )
+ try:
+ journal_path.rename(quarantine_path)
+ except OSError as rename_exc:
+ logger.error(
+ f"[{pair}] Optuna journal {journal_path.name} "
+ f"quarantine failed: {rename_exc!r}",
+ exc_info=True,
+ )
+ raise
+ logger.warning(
+ f"[{pair}] Optuna journal {journal_path.name} corrupt ({cause!r}); "
+ f"quarantined to {quarantine_path.name}; resuming with fresh journal"
+ )
+ return quarantine_path
+
+ @staticmethod
+ def _optuna_journal_has_corrupt_tail(journal_path: Path) -> bool:
+ """Detect a deferred-raise hazard at EOF of a journal log file.
+
+ ``JournalFileBackend.read_logs`` stores ``ValueError('Invalid
+ log format.')`` (missing trailing newline) and
+ ``json.JSONDecodeError`` (malformed or empty line) as
+ ``last_decode_error`` and re-raises only on the *next*
+ iteration. A bad trailing record with no successor therefore
+ does NOT raise during ``JournalStorage.__init__``; it surfaces
+ at the next ``_sync_with_backend``, outside the
+ ``optuna_create_storage`` try/except, where the broad handler
+ in ``optuna_create_study`` returns ``None`` (silent HPO skip).
+
+ Bounded tail probe (last ``_OPTUNA_JOURNAL_TAIL_PROBE_BYTES``).
+ Return True iff the file is non-empty AND its trailing record
+ is (a) missing the newline, (b) empty (bare ``\\n``), or
+ (c) malformed JSON. Fail-open when the probe window cuts a
+ single line larger than the window; defer to the
+ post-construction handler.
+ """
+ if not journal_path.exists():
+ return False
+ try:
+ size = journal_path.stat().st_size
+ if size == 0:
+ return False
+ with journal_path.open("rb") as f:
+ f.seek(
+ max(
+ 0,
+ size - QuickAdapterRegressorV3._OPTUNA_JOURNAL_TAIL_PROBE_BYTES,
+ )
+ )
+ tail = f.read()
+ except OSError:
+ return False
+ if not tail.endswith(b"\n"):
+ return True
+ last_newline = tail.rfind(b"\n", 0, len(tail) - 1)
+ if last_newline == -1:
+ if size > QuickAdapterRegressorV3._OPTUNA_JOURNAL_TAIL_PROBE_BYTES:
+ return False
+ last_line = tail[:-1]
+ else:
+ last_line = tail[last_newline + 1 : -1]
+ if not last_line:
+ return True
+ try:
+ json.loads(last_line)
+ except json.JSONDecodeError:
+ return True
+ return False
+
def optuna_create_storage(self, pair: str) -> optuna.storages.BaseStorage:
storage_dir = self.full_path
storage_filename = f"optuna-{pair.split('/')[0]}"
if (
storage_backend == QuickAdapterRegressorV3._OPTUNA_STORAGE_BACKENDS[0]
): # "file"
- storage = optuna.storages.JournalStorage(
- optuna.storages.journal.JournalFileBackend(
- f"{storage_dir}/{storage_filename}.log"
+ journal_path = storage_dir / f"{storage_filename}.log"
+
+ # Pre-validate EOF: close the read_logs deferred-raise gap (see helper).
+ if QuickAdapterRegressorV3._optuna_journal_has_corrupt_tail(journal_path):
+ QuickAdapterRegressorV3._optuna_quarantine_journal(
+ journal_path,
+ pair,
+ ValueError(
+ "trailing journal record is truncated or malformed JSON"
+ ),
)
- )
+
+ def _build_journal_storage() -> optuna.storages.JournalStorage:
+ return optuna.storages.JournalStorage(
+ optuna.storages.journal.JournalFileBackend(str(journal_path))
+ )
+
+ try:
+ storage = _build_journal_storage()
+ except QuickAdapterRegressorV3._OPTUNA_JOURNAL_RECOVERABLE_ERRORS as exc:
+ # Replay-time corruption: quarantine + retry once. OSError is
+ # excluded from the tuple — FS failures stay operator-actionable.
+ quarantined = QuickAdapterRegressorV3._optuna_quarantine_journal(
+ journal_path, pair, exc
+ )
+ if quarantined is None:
+ raise
+ storage = _build_journal_storage()
elif (
storage_backend == QuickAdapterRegressorV3._OPTUNA_STORAGE_BACKENDS[1]
): # "sqlite"