5107cb32c0
Add a derived 'recommended_action' property on InspectedResult that collapses the orchestrator's go / wait / skip decision into one value: - 'skip' → no main_video, or media_type == 'other' - 'ask_user' → media_type == 'unknown', or road == 'path_of_pain' - 'process' → confident parse with a main video on disk The ordering is part of the contract (skip > ask_user > process) — documented in the property docstring. Until now every consumer (workflows, the agent, the orchestrator sketch) had to re-derive this from the road / media_type / main_video triple, with subtle drift between sites. One place, one rule. Exposed through the analyze_release tool so the LLM can route on it. Spec YAML updated to describe the new field. Suite: 1083 passed (+6 new tests in tests/application/test_inspect.py covering the four branches and the precedence rules).
357 lines
13 KiB
Python
357 lines
13 KiB
Python
"""Tests for the ``inspect_release`` orchestrator (Phase C).
|
|
|
|
Covers the four composition steps as a black box: a real
|
|
``YamlReleaseKnowledge``, real on-disk filesystem under ``tmp_path``,
|
|
and a stubbed ``MediaProber`` so we don't depend on a system ``ffprobe``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from alfred.application.release import InspectedResult, inspect_release
|
|
from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack
|
|
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
|
|
|
_KB = YamlReleaseKnowledge()
|
|
|
|
_MOVIE_NAME = "Inception.2010.1080p.BluRay.x264-GROUP"
|
|
_TV_NAME = "Dexter.S01E01.1080p.WEB-DL.x264-GROUP"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Test doubles #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class _StubProber:
|
|
"""Minimal MediaProber stub. Records the path it was asked to probe."""
|
|
|
|
def __init__(self, info: MediaInfo | None) -> None:
|
|
self._info = info
|
|
self.calls: list[Path] = []
|
|
|
|
def list_subtitle_streams(self, video: Path): # pragma: no cover - unused here
|
|
return []
|
|
|
|
def probe(self, video: Path) -> MediaInfo | None:
|
|
self.calls.append(video)
|
|
return self._info
|
|
|
|
|
|
class _RaisingProber:
|
|
"""A prober that would explode if called — used to assert no probe."""
|
|
|
|
def list_subtitle_streams(self, video: Path): # pragma: no cover
|
|
raise AssertionError("list_subtitle_streams must not be called")
|
|
|
|
def probe(self, video: Path): # pragma: no cover
|
|
raise AssertionError("probe must not be called")
|
|
|
|
|
|
def _media_info_1080p_h264() -> MediaInfo:
|
|
return MediaInfo(
|
|
video_tracks=(VideoTrack(index=0, codec="h264", width=1920, height=1080),),
|
|
audio_tracks=(
|
|
AudioTrack(
|
|
index=1,
|
|
codec="ac3",
|
|
channels=6,
|
|
channel_layout="5.1",
|
|
language="eng",
|
|
is_default=True,
|
|
),
|
|
),
|
|
subtitle_tracks=(),
|
|
duration_seconds=7200.0,
|
|
bitrate_kbps=8000,
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Happy paths #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestInspectMovieFolder:
|
|
def test_returns_inspected_result_with_all_fields(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
video = folder / "movie.mkv"
|
|
video.write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert isinstance(result, InspectedResult)
|
|
assert result.source_path == folder
|
|
assert result.main_video == video
|
|
assert result.media_info is not None
|
|
assert result.probe_used is True
|
|
assert prober.calls == [video]
|
|
|
|
def test_parsed_carries_token_level_fields(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.parsed.title.lower().startswith("inception")
|
|
assert result.parsed.year == 2010
|
|
assert result.parsed.group == "GROUP"
|
|
assert result.parsed.media_type == "movie"
|
|
|
|
def test_report_has_confidence_and_road(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(None)
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert 0 <= result.report.confidence <= 100
|
|
assert result.report.road in ("easy", "shitty", "path_of_pain")
|
|
|
|
|
|
class TestInspectSingleFile:
|
|
def test_file_is_its_own_main_video(self, tmp_path: Path) -> None:
|
|
f = tmp_path / f"{_MOVIE_NAME}.mkv"
|
|
f.write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(_MOVIE_NAME, f, _KB, prober)
|
|
|
|
assert result.main_video == f
|
|
assert result.probe_used is True
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Probe-gating logic #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestProbeGating:
|
|
def test_no_video_means_no_probe(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
# Only a non-video file present.
|
|
(folder / "readme.txt").write_text("hi")
|
|
prober = _RaisingProber()
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.main_video is None
|
|
assert result.media_info is None
|
|
assert result.probe_used is False
|
|
|
|
def test_media_type_other_means_no_probe(self, tmp_path: Path) -> None:
|
|
# An ISO-only folder gets detect_media_type → "other".
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "disc.iso").write_bytes(b"")
|
|
prober = _RaisingProber()
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.parsed.media_type == "other"
|
|
assert result.media_info is None
|
|
assert result.probe_used is False
|
|
|
|
def test_probe_failure_keeps_probe_used_false(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(None) # ffprobe simulated as failing
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.main_video is not None
|
|
assert result.media_info is None
|
|
assert result.probe_used is False
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Mutation contract #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestMutationContract:
|
|
def test_detect_media_type_refines_parsed(self, tmp_path: Path) -> None:
|
|
# Release name parses to "movie", but folder mixes video + non_video
|
|
# (e.g. an ISO sitting next to an mkv) → detect_media_type returns
|
|
# "unknown", which is in _NON_PROBABLE_MEDIA_TYPES → no probe.
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
(folder / "extras.iso").write_bytes(b"")
|
|
prober = _RaisingProber()
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
assert result.parsed.media_type == "unknown"
|
|
assert result.probe_used is False
|
|
|
|
def test_enrich_runs_when_probe_succeeds(self, tmp_path: Path) -> None:
|
|
# Build a release name with no codec; probe should fill it in.
|
|
name = "Inception.2010.1080p.BluRay-GROUP"
|
|
folder = tmp_path / name
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(name, folder, _KB, prober)
|
|
|
|
assert result.probe_used is True
|
|
# enrich_from_probe should have filled the missing codec field.
|
|
assert result.parsed.codec is not None
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Resilience #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestResilience:
|
|
def test_nonexistent_path_does_not_raise(self, tmp_path: Path) -> None:
|
|
ghost = tmp_path / "does-not-exist"
|
|
prober = _RaisingProber()
|
|
|
|
result = inspect_release(_MOVIE_NAME, ghost, _KB, prober)
|
|
|
|
assert result.main_video is None
|
|
assert result.media_info is None
|
|
assert result.probe_used is False
|
|
|
|
def test_tv_release_inspection(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _TV_NAME
|
|
folder.mkdir()
|
|
video = folder / "episode.mkv"
|
|
video.write_bytes(b"")
|
|
prober = _StubProber(_media_info_1080p_h264())
|
|
|
|
result = inspect_release(_TV_NAME, folder, _KB, prober)
|
|
|
|
assert result.parsed.media_type == "tv_show"
|
|
assert result.parsed.season == 1
|
|
assert result.parsed.episode == 1
|
|
assert result.main_video == video
|
|
assert result.probe_used is True
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Frozen contract #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestFrozen:
|
|
def test_inspected_result_is_frozen(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
prober = _StubProber(None)
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
|
|
|
# frozen=True → assigning a field raises FrozenInstanceError.
|
|
import dataclasses
|
|
|
|
try:
|
|
result.probe_used = True # type: ignore[misc]
|
|
except dataclasses.FrozenInstanceError:
|
|
pass
|
|
else: # pragma: no cover
|
|
raise AssertionError("InspectedResult should be frozen")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# recommended_action #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestRecommendedAction:
|
|
"""``recommended_action`` collapses the orchestrator's go / wait /
|
|
skip decision into a single property. The check ordering is part
|
|
of the contract (skip wins over ask_user, ask_user wins over
|
|
process) — see the property docstring."""
|
|
|
|
def test_skip_when_no_main_video(self, tmp_path: Path) -> None:
|
|
# Folder with no video at all → main_video is None → skip.
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "readme.txt").write_text("hi")
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, _RaisingProber())
|
|
|
|
assert result.main_video is None
|
|
assert result.recommended_action == "skip"
|
|
|
|
def test_skip_when_media_type_other(self, tmp_path: Path) -> None:
|
|
# Folder with only non-video files (ISO) → media_type == "other"
|
|
# AND main_video is None (find_main_video filters by video ext).
|
|
# Both branches resolve to "skip"; this asserts the contract holds.
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "disc.iso").write_bytes(b"")
|
|
|
|
result = inspect_release(_MOVIE_NAME, folder, _KB, _RaisingProber())
|
|
|
|
assert result.parsed.media_type == "other"
|
|
assert result.recommended_action == "skip"
|
|
|
|
def test_ask_user_when_media_type_unknown(self, tmp_path: Path) -> None:
|
|
# Mixed video + non-video → detect_media_type returns "unknown".
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
(folder / "extras.iso").write_bytes(b"")
|
|
|
|
result = inspect_release(
|
|
_MOVIE_NAME, folder, _KB, _StubProber(_media_info_1080p_h264())
|
|
)
|
|
|
|
assert result.parsed.media_type == "unknown"
|
|
assert result.recommended_action == "ask_user"
|
|
|
|
def test_ask_user_when_path_of_pain_road(self, tmp_path: Path) -> None:
|
|
# Malformed name (forbidden chars) → road == "path_of_pain".
|
|
name = "garbage@#%name"
|
|
folder = tmp_path / "release"
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
|
|
result = inspect_release(
|
|
name, folder, _KB, _StubProber(_media_info_1080p_h264())
|
|
)
|
|
|
|
assert result.report.road == "path_of_pain"
|
|
# main_video is found but the road still flags uncertainty.
|
|
assert result.main_video is not None
|
|
assert result.recommended_action == "ask_user"
|
|
|
|
def test_process_for_confident_movie(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _MOVIE_NAME
|
|
folder.mkdir()
|
|
(folder / "movie.mkv").write_bytes(b"")
|
|
|
|
result = inspect_release(
|
|
_MOVIE_NAME, folder, _KB, _StubProber(_media_info_1080p_h264())
|
|
)
|
|
|
|
assert result.parsed.media_type == "movie"
|
|
assert result.report.road in ("easy", "shitty")
|
|
assert result.recommended_action == "process"
|
|
|
|
def test_process_for_confident_tv_show(self, tmp_path: Path) -> None:
|
|
folder = tmp_path / _TV_NAME
|
|
folder.mkdir()
|
|
(folder / "episode.mkv").write_bytes(b"")
|
|
|
|
result = inspect_release(
|
|
_TV_NAME, folder, _KB, _StubProber(_media_info_1080p_h264())
|
|
)
|
|
|
|
assert result.parsed.media_type == "tv_show"
|
|
assert result.recommended_action == "process"
|