refactor(release): thread ReleaseKnowledge through callers

Wires the new explicit-kb signatures into every caller of the release
parser and the filesystem-extension helpers.

- application/filesystem/resolve_destination.py: module-level singleton
  _KB: ReleaseKnowledge = YamlReleaseKnowledge(); each use case now calls
  parse_release(release_name, _KB) and sanitizes TMDB strings via
  _KB.sanitize_for_fs(...) before passing them to the pure ParsedRelease
  builders. Local _sanitize helper + _WIN_FORBIDDEN regex dropped.
- application/filesystem/detect_media_type.py: signature is now
  detect_media_type(parsed, source_path, kb); uses kb.metadata_extensions,
  kb.video_extensions, kb.non_video_extensions.
- infrastructure/filesystem/find_video.py: find_video_file(path, kb) uses
  kb.video_extensions instead of an imported constant.
- agent/tools/filesystem.py::analyze_release imports the application _KB
  singleton and passes it through to parse_release / detect_media_type /
  find_video_file.
This commit is contained in:
2026-05-19 22:05:19 +02:00
parent 4a74fff9cc
commit bf37a9d09e
4 changed files with 52 additions and 39 deletions
+4 -3
View File
@@ -190,15 +190,16 @@ def set_path_for_folder(folder_name: str, path_value: str) -> dict[str, Any]:
def analyze_release(release_name: str, source_path: str) -> dict[str, Any]: def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/analyze_release.yaml.""" """Thin tool wrapper — semantics live in alfred/agent/tools/specs/analyze_release.yaml."""
from alfred.application.filesystem.resolve_destination import _KB # noqa: PLC0415
from alfred.domain.release.services import parse_release # noqa: PLC0415 from alfred.domain.release.services import parse_release # noqa: PLC0415
path = Path(source_path) path = Path(source_path)
parsed = parse_release(release_name) parsed = parse_release(release_name, _KB)
parsed.media_type = detect_media_type(parsed, path) parsed.media_type = detect_media_type(parsed, path, _KB)
probe_used = False probe_used = False
if parsed.media_type not in ("unknown", "other"): if parsed.media_type not in ("unknown", "other"):
video_file = find_video_file(path) video_file = find_video_file(path, _KB)
if video_file: if video_file:
media_info = probe(video_file) media_info = probe(video_file)
if media_info: if media_info:
@@ -19,15 +19,13 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from alfred.domain.release.value_objects import ( from alfred.domain.release.ports import ReleaseKnowledge
_METADATA_EXTENSIONS, from alfred.domain.release.value_objects import ParsedRelease
_NON_VIDEO_EXTENSIONS,
_VIDEO_EXTENSIONS,
ParsedRelease,
)
def detect_media_type(parsed: ParsedRelease, source_path: Path) -> str: def detect_media_type(
parsed: ParsedRelease, source_path: Path, kb: ReleaseKnowledge
) -> str:
""" """
Return a refined media_type string for the given source_path. Return a refined media_type string for the given source_path.
@@ -37,10 +35,10 @@ def detect_media_type(parsed: ParsedRelease, source_path: Path) -> str:
extensions = _collect_extensions(source_path) extensions = _collect_extensions(source_path)
# Metadata extensions (.nfo, .srt, …) are always present alongside releases # Metadata extensions (.nfo, .srt, …) are always present alongside releases
# and must not influence the type decision. # and must not influence the type decision.
conclusive = extensions - _METADATA_EXTENSIONS conclusive = extensions - kb.metadata_extensions
has_video = bool(conclusive & _VIDEO_EXTENSIONS) has_video = bool(conclusive & kb.video_extensions)
has_non_video = bool(conclusive & _NON_VIDEO_EXTENSIONS) has_non_video = bool(conclusive & kb.non_video_extensions)
if has_video and has_non_video: if has_video and has_non_video:
return "unknown" return "unknown"
@@ -8,34 +8,39 @@ Four distinct use cases, one per release type:
- resolve_series_destination : complete series multi-season pack (folder move) - resolve_series_destination : complete series multi-season pack (folder move)
Each returns a dedicated DTO with only the fields that make sense for that type. Each returns a dedicated DTO with only the fields that make sense for that type.
These use cases follow Option B of the snapshot-VO design: ``ParsedRelease``
arrives with ``title_sanitized`` already computed, and TMDB-supplied strings
are sanitized **at the use-case boundary** (here) before being passed into
``ParsedRelease`` builder methods. The builders themselves perform no I/O and
no sanitization.
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
import re
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from alfred.domain.release import parse_release from alfred.domain.release import parse_release
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.persistence import get_memory from alfred.infrastructure.persistence import get_memory
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_WIN_FORBIDDEN = re.compile(r'[?:*"<>|\\]') # Single module-level knowledge instance. YAML is loaded once at first import.
# Tests that need a custom KB can monkeypatch this attribute.
_KB: ReleaseKnowledge = YamlReleaseKnowledge()
def _sanitize(text: str) -> str:
return _WIN_FORBIDDEN.sub("", text)
def _find_existing_tvshow_folders( def _find_existing_tvshow_folders(
tv_root: Path, tmdb_title: str, tmdb_year: int tv_root: Path, tmdb_title_safe: str, tmdb_year: int
) -> list[str]: ) -> list[str]:
"""Return folder names in tv_root that match title + year prefix.""" """Return folder names in tv_root that match title + year prefix."""
if not tv_root.exists(): if not tv_root.exists():
return [] return []
clean_title = _sanitize(tmdb_title).replace(" ", ".") clean_title = tmdb_title_safe.replace(" ", ".")
prefix = f"{clean_title}.{tmdb_year}".lower() prefix = f"{clean_title}.{tmdb_year}".lower()
return sorted( return sorted(
entry.name entry.name
@@ -66,6 +71,7 @@ class _Clarification:
def _resolve_series_folder( def _resolve_series_folder(
tv_root: Path, tv_root: Path,
tmdb_title: str, tmdb_title: str,
tmdb_title_safe: str,
tmdb_year: int, tmdb_year: int,
computed_name: str, computed_name: str,
confirmed_folder: str | None, confirmed_folder: str | None,
@@ -80,7 +86,7 @@ def _resolve_series_folder(
if confirmed_folder: if confirmed_folder:
return confirmed_folder, not (tv_root / confirmed_folder).exists() return confirmed_folder, not (tv_root / confirmed_folder).exists()
existing = _find_existing_tvshow_folders(tv_root, tmdb_title, tmdb_year) existing = _find_existing_tvshow_folders(tv_root, tmdb_title_safe, tmdb_year)
if not existing: if not existing:
return computed_name, True return computed_name, True
@@ -246,11 +252,12 @@ def resolve_season_destination(
message="TV show library path is not configured.", message="TV show library path is not configured.",
) )
parsed = parse_release(release_name) parsed = parse_release(release_name, _KB)
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year)) tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder( resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
) )
if isinstance(resolved, _Clarification): if isinstance(resolved, _Clarification):
return ResolvedSeasonDestination( return ResolvedSeasonDestination(
@@ -295,12 +302,16 @@ def resolve_episode_destination(
message="TV show library path is not configured.", message="TV show library path is not configured.",
) )
parsed = parse_release(release_name) parsed = parse_release(release_name, _KB)
ext = Path(source_file).suffix ext = Path(source_file).suffix
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year)) tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
tmdb_episode_title_safe = (
_KB.sanitize_for_fs(tmdb_episode_title) if tmdb_episode_title else None
)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder( resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
) )
if isinstance(resolved, _Clarification): if isinstance(resolved, _Clarification):
return ResolvedEpisodeDestination( return ResolvedEpisodeDestination(
@@ -311,7 +322,7 @@ def resolve_episode_destination(
series_folder_name, is_new = resolved series_folder_name, is_new = resolved
season_folder_name = parsed.season_folder_name() season_folder_name = parsed.season_folder_name()
filename = _sanitize(parsed.episode_filename(tmdb_episode_title, ext)) filename = parsed.episode_filename(tmdb_episode_title_safe, ext)
series_path = tv_root / series_folder_name series_path = tv_root / series_folder_name
season_path = series_path / season_folder_name season_path = series_path / season_folder_name
@@ -349,11 +360,12 @@ def resolve_movie_destination(
message="Movie library path is not configured.", message="Movie library path is not configured.",
) )
parsed = parse_release(release_name) parsed = parse_release(release_name, _KB)
ext = Path(source_file).suffix ext = Path(source_file).suffix
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
folder_name = _sanitize(parsed.movie_folder_name(tmdb_title, tmdb_year)) folder_name = parsed.movie_folder_name(tmdb_title_safe, tmdb_year)
filename = _sanitize(parsed.movie_filename(tmdb_title, tmdb_year, ext)) filename = parsed.movie_filename(tmdb_title_safe, tmdb_year, ext)
folder_path = Path(movies_root) / folder_name folder_path = Path(movies_root) / folder_name
file_path = folder_path / filename file_path = folder_path / filename
@@ -387,11 +399,12 @@ def resolve_series_destination(
message="TV show library path is not configured.", message="TV show library path is not configured.",
) )
parsed = parse_release(release_name) parsed = parse_release(release_name, _KB)
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year)) tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder( resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
) )
if isinstance(resolved, _Clarification): if isinstance(resolved, _Clarification):
return ResolvedSeriesDestination( return ResolvedSeriesDestination(
@@ -4,10 +4,10 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from alfred.domain.release.value_objects import _VIDEO_EXTENSIONS from alfred.domain.release.ports import ReleaseKnowledge
def find_video_file(path: Path) -> Path | None: def find_video_file(path: Path, kb: ReleaseKnowledge) -> Path | None:
""" """
Return the first video file found at path. Return the first video file found at path.
@@ -15,11 +15,12 @@ def find_video_file(path: Path) -> Path | None:
- If path is a folder — scan recursively, return the first video found - If path is a folder — scan recursively, return the first video found
(sorted by name for determinism, picks S01E01 before S01E02 etc.). (sorted by name for determinism, picks S01E01 before S01E02 etc.).
""" """
video_exts = kb.video_extensions
if path.is_file(): if path.is_file():
return path if path.suffix.lower() in _VIDEO_EXTENSIONS else None return path if path.suffix.lower() in video_exts else None
for candidate in sorted(path.rglob("*")): for candidate in sorted(path.rglob("*")):
if candidate.is_file() and candidate.suffix.lower() in _VIDEO_EXTENSIONS: if candidate.is_file() and candidate.suffix.lower() in video_exts:
return candidate return candidate
return None return None