03aa844d7d
New application-layer entry point that composes the four inspection
layers in one call:
1. parse_release(name, kb) -> (ParsedRelease, ParseReport)
2. detect_media_type(parsed, path, kb) -> patch parsed.media_type
3. find_main_video(path, kb) -> Path | None (top-level scan)
4. prober.probe(video) + enrich -> when video exists and
media_type not in
{unknown, other}
Returns a frozen InspectedResult(parsed, report, source_path,
main_video, media_info, probe_used). kb and prober are injected — no
module-level singletons in inspect.py.
analyze_release tool now delegates to inspect_release; its output
gains two fields, confidence (0-100) and road (easy/shitty/path_of_pain),
surfaced from ParseReport so the LLM can route by confidence. Spec
updated to document them.
12 new tests covering happy paths, probe gating (no video, media_type
'other', probe failure), mutation contract (detect refining
parsed.media_type, enrich filling None fields), resilience
(nonexistent path), and frozen contract. Suite: 1058 passing.
266 lines
9.4 KiB
Python
266 lines
9.4 KiB
Python
"""Tests for the ``inspect_release`` orchestrator (Phase C).
|
|
|
|
Covers the four composition steps as a black box: a real
|
|
``YamlReleaseKnowledge``, real on-disk filesystem under ``tmp_path``,
|
|
and a stubbed ``MediaProber`` so we don't depend on a system ``ffprobe``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from alfred.application.release import InspectedResult, inspect_release
|
|
from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack
|
|
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
|
|
|
_KB = YamlReleaseKnowledge()
|
|
|
|
_MOVIE_NAME = "Inception.2010.1080p.BluRay.x264-GROUP"
|
|
_TV_NAME = "Dexter.S01E01.1080p.WEB-DL.x264-GROUP"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Test doubles #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class _StubProber:
|
|
"""Minimal MediaProber stub. Records the path it was asked to probe."""
|
|
|
|
def __init__(self, info: MediaInfo | None) -> None:
|
|
self._info = info
|
|
self.calls: list[Path] = []
|
|
|
|
def list_subtitle_streams(self, video: Path): # pragma: no cover - unused here
|
|
return []
|
|
|
|
def probe(self, video: Path) -> MediaInfo | None:
|
|
self.calls.append(video)
|
|
return self._info
|
|
|
|
|
|
class _RaisingProber:
|
|
"""A prober that would explode if called — used to assert no probe."""
|
|
|
|
def list_subtitle_streams(self, video: Path): # pragma: no cover
|
|
raise AssertionError("list_subtitle_streams must not be called")
|
|
|
|
def probe(self, video: Path): # pragma: no cover
|
|
raise AssertionError("probe must not be called")
|
|
|
|
|
|
def _media_info_1080p_h264() -> MediaInfo:
|
|
return MediaInfo(
|
|
video_tracks=(VideoTrack(index=0, codec="h264", width=1920, height=1080),),
|
|
audio_tracks=(
|
|
AudioTrack(
|
|
index=1,
|
|
codec="ac3",
|
|
channels=6,
|
|
channel_layout="5.1",
|
|
language="eng",
|
|
is_default=True,
|
|
),
|
|
),
|
|
subtitle_tracks=(),
|
|
duration_seconds=7200.0,
|
|
bitrate_kbps=8000,
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Happy paths #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestInspectMovieFolder:
|
|
def test_returns_inspected_result_with_all_fields(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
video = folder / "movie.mkv"
|
|
video.write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert isinstance(result, InspectedResult)
|
|
assert result.source_path == folder
|
|
assert result.main_video == video
|
|
assert result.media_info is not None
|
|
assert result.probe_used is True
|
|
assert prober.calls == [video]
|
|
|
|
def test_parsed_carries_token_level_fields(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.parsed.title.lower().startswith("inception")
|
|
assert result.parsed.year == 2010
|
|
assert result.parsed.group == "GROUP"
|
|
assert result.parsed.media_type == "movie"
|
|
|
|
def test_report_has_confidence_and_road(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(None)
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert 0 <= result.report.confidence <= 100
|
|
assert result.report.road in ("easy", "shitty", "path_of_pain")
|
|
|
|
|
|
class TestInspectSingleFile:
|
|
def test_file_is_its_own_main_video(self, tmp_path: Path) -> None:
|
|
f = tmp_path / f"{_MOVIE_NAME}.mkv"
|
|
f.write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(_MOVIE_NAME, f, _KB, prober)
|
|
|
|
assert result.main_video == f
|
|
assert result.probe_used is True
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Probe-gating logic #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestProbeGating:
|
|
def test_no_video_means_no_probe(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
# Only a non-video file present.
|
|
(folder / "readme.txt").write_text("hi")
|
|
prober = _RaisingProber()
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.main_video is None
|
|
assert result.media_info is None
|
|
assert result.probe_used is False
|
|
|
|
def test_media_type_other_means_no_probe(self, tmp_path: Path) -> None:
|
|
# An ISO-only folder gets detect_media_type → "other".
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "disc.iso").write_bytes(b"")
|
|
prober = _RaisingProber()
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.parsed.media_type == "other"
|
|
assert result.media_info is None
|
|
assert result.probe_used is False
|
|
|
|
def test_probe_failure_keeps_probe_used_false(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(None) # ffprobe simulated as failing
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.main_video is not None
|
|
assert result.media_info is None
|
|
assert result.probe_used is False
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Mutation contract #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestMutationContract:
|
|
def test_detect_media_type_refines_parsed(self, tmp_path: Path) -> None:
|
|
# Release name parses to "movie", but folder mixes video + non_video
|
|
# (e.g. an ISO sitting next to an mkv) → detect_media_type returns
|
|
# "unknown", which is in _NON_PROBABLE_MEDIA_TYPES → no probe.
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
(folder / "extras.iso").write_bytes(b"")
|
|
prober = _RaisingProber()
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.parsed.media_type == "unknown"
|
|
assert result.probe_used is False
|
|
|
|
def test_enrich_runs_when_probe_succeeds(self, tmp_path: Path) -> None:
|
|
# Build a release name with no codec; probe should fill it in.
|
|
name = "Inception.2010.1080p.BluRay-GROUP"
|
|
folder = tmp_path / name
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(name, folder, _KB, prober)
|
|
|
|
assert result.probe_used is True
|
|
# enrich_from_probe should have filled the missing codec field.
|
|
assert result.parsed.codec is not None
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Resilience #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestResilience:
|
|
def test_nonexistent_path_does_not_raise(self, tmp_path: Path) -> None:
|
|
ghost = tmp_path / "does-not-exist"
|
|
prober = _RaisingProber()
|
|
|
|
result = inspect_release(_MOVIE_NAME, ghost, _KB, prober)
|
|
|
|
assert result.main_video is None
|
|
assert result.media_info is None
|
|
assert result.probe_used is False
|
|
|
|
def test_tv_release_inspection(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _TV_NAME
|
|
folder.mkdir()
|
|
video = folder / "episode.mkv"
|
|
video.write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(_TV_NAME, folder, _KB, prober)
|
|
|
|
assert result.parsed.media_type == "tv_show"
|
|
assert result.parsed.season == 1
|
|
assert result.parsed.episode == 1
|
|
assert result.main_video == video
|
|
assert result.probe_used is True
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Frozen contract #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestFrozen:
|
|
def test_inspected_result_is_frozen(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(None)
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
# frozen=True → assigning a field raises FrozenInstanceError.
|
|
import dataclasses
|
|
|
|
try:
|
|
result.probe_used = True # type: ignore[misc]
|
|
except dataclasses.FrozenInstanceError:
|
|
pass
|
|
else: # pragma: no cover
|
|
raise AssertionError("InspectedResult should be frozen")
|