Files
alfred/alfred/infrastructure/metadata/store.py
T
francwa ba6f016d49 feat: generic MetadataStore + read_release_metadata + query_library
- Extract MetadataStore from SubtitleMetadataStore (alfred/infrastructure/metadata/).
  Generic load/save + typed update helpers (update_parse, update_probe, update_tmdb)
  for the per-release .alfred/metadata.yaml.
- SubtitleMetadataStore becomes a thin facade — owns subtitle_history shape,
  delegates I/O to MetadataStore.
- Agent._execute_tool_call auto-persists successful analyze_release / probe_media /
  find_media_imdb_id results to the release's .alfred file. find_media_imdb_id
  follows release_focus when it has no path argument.
- New tools:
  · read_release_metadata(release_path) — cacheable, key=release_path.
    Returns the .alfred content or has_metadata=false.
  · query_library(name) — substring scan across configured library roots.
- Both new tools added to CORE_TOOLS (always visible).
2026-05-15 11:02:25 +02:00

184 lines
6.8 KiB
Python

"""
MetadataStore — reads/writes the `.alfred/metadata.yaml` file colocated with
a release folder.
The store is intentionally domain-agnostic: it knows how to atomically
load/save the YAML and exposes typed update helpers for the broad facts a
release carries (parse, probe, TMDB lookup, detected pattern). Subtitle
history lives next to the same file but is appended through a dedicated
helper kept under `alfred/infrastructure/subtitle/` so the subtitle pipeline
keeps full ownership of its payload shape.
The file layout:
<release_root>/
.alfred/
metadata.yaml
The store never raises on a missing file — it returns empty defaults. Writes
are atomic (write to .tmp then rename).
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import yaml
logger = logging.getLogger(__name__)
class MetadataStore:
"""Manages `.alfred/metadata.yaml` for one release folder."""
def __init__(self, release_root: str | Path):
self._root = Path(release_root)
self._alfred_dir = self._root / ".alfred"
self._metadata_path = self._alfred_dir / "metadata.yaml"
# ------------------------------------------------------------------
# Identity
# ------------------------------------------------------------------
@property
def release_root(self) -> Path:
return self._root
@property
def metadata_path(self) -> Path:
return self._metadata_path
def exists(self) -> bool:
return self._metadata_path.exists()
# ------------------------------------------------------------------
# Load / Save
# ------------------------------------------------------------------
def load(self) -> dict:
"""Return the full metadata dict. Empty dict if file absent."""
if not self._metadata_path.exists():
return {}
try:
with open(self._metadata_path, encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.warning(f"MetadataStore: could not read {self._metadata_path}: {e}")
return {}
def save(self, data: dict) -> None:
"""Atomically write metadata.yaml. Creates .alfred/ if needed."""
self._alfred_dir.mkdir(parents=True, exist_ok=True)
tmp = self._metadata_path.with_suffix(".yaml.tmp")
try:
with open(tmp, "w", encoding="utf-8") as f:
yaml.safe_dump(
data,
f,
allow_unicode=True,
default_flow_style=False,
sort_keys=False,
)
tmp.rename(self._metadata_path)
except Exception as e:
logger.error(f"MetadataStore: could not write {self._metadata_path}: {e}")
tmp.unlink(missing_ok=True)
raise
# ------------------------------------------------------------------
# Generic update helper
# ------------------------------------------------------------------
def update_section(self, section: str, payload: dict[str, Any]) -> None:
"""
Merge `payload` into the top-level `section` block and stamp it.
The section is replaced wholesale (not deep-merged) so the last
successful tool run reflects the current truth. A `_updated_at`
ISO-8601 timestamp is added inside the section.
"""
data = self.load()
stamped = dict(payload)
stamped["_updated_at"] = datetime.now(UTC).isoformat()
data[section] = stamped
self.save(data)
# ------------------------------------------------------------------
# Typed update helpers — one per inspector tool
# ------------------------------------------------------------------
def update_parse(self, parse_result: dict[str, Any]) -> None:
"""Persist the result of analyze_release."""
clean = {k: v for k, v in parse_result.items() if k != "status"}
self.update_section("parse", clean)
def update_probe(self, probe_result: dict[str, Any]) -> None:
"""Persist the result of probe_media."""
clean = {k: v for k, v in probe_result.items() if k != "status"}
self.update_section("probe", clean)
def update_tmdb(self, tmdb_result: dict[str, Any]) -> None:
"""Persist the result of find_media_imdb_id."""
clean = {k: v for k, v in tmdb_result.items() if k != "status"}
self.update_section("tmdb", clean)
# Also promote core identity fields to the top level so they are
# cheap to read without parsing the full tmdb block.
data = self.load()
for key in ("imdb_id", "tmdb_id", "media_type"):
if key in clean and clean[key] is not None:
data[key] = clean[key]
if "title" in clean and clean["title"]:
data.setdefault("title", clean["title"])
self.save(data)
# ------------------------------------------------------------------
# Pattern (used by the subtitle pipeline)
# ------------------------------------------------------------------
def confirmed_pattern(self) -> str | None:
"""Return the confirmed pattern_id, or None."""
data = self.load()
if data.get("pattern_confirmed"):
return data.get("detected_pattern")
return None
def mark_pattern_confirmed(
self, pattern_id: str, media_info: dict | None = None
) -> None:
"""Persist detected_pattern + pattern_confirmed=true."""
data = self.load()
data["detected_pattern"] = pattern_id
data["pattern_confirmed"] = True
if media_info:
data.setdefault("media_type", media_info.get("media_type"))
data.setdefault("imdb_id", media_info.get("imdb_id"))
data.setdefault("title", media_info.get("title"))
self.save(data)
logger.info(
f"MetadataStore: confirmed pattern '{pattern_id}' for {self._root.name}"
)
# ------------------------------------------------------------------
# Subtitle history (kept for backwards compatibility with the
# subtitle pipeline — payload shape is owned by the caller).
# ------------------------------------------------------------------
def append_subtitle_history_entry(self, entry: dict[str, Any]) -> None:
"""Append one entry (raw dict) to subtitle_history."""
data = self.load()
history = data.setdefault("subtitle_history", [])
history.append(entry)
rg = entry.get("release_group")
if rg:
groups = data.setdefault("release_groups", [])
if rg not in groups:
groups.append(rg)
self.save(data)
def subtitle_history(self) -> list[dict]:
"""Return the raw subtitle history list."""
return self.load().get("subtitle_history", [])