Files
alfred/tests/application/test_manage_subtitles.py
francwa 88f156b7a4 refactor(subtitles): rename SubtitleCandidate → SubtitleScanResult
The old name conflated 'might become a placed subtitle' with 'what a
scan pass produced'. The class is the output of a scan/identify pass —
language/format may still be None while classification is in progress,
confidence reflects classifier certainty, raw_tokens holds filename
fragments under analysis. SubtitleScanResult says that directly.

Pure rename + refreshed docstring; no behavior change. Touches the
domain entity, the matcher/identifier/utils services, the
manage_subtitles use case, the placer, the metadata store, the
shared-media cross-ref comment, and 7 test modules.
2026-05-21 08:05:46 +02:00

566 lines
21 KiB
Python

"""Tests for ``alfred.application.filesystem.manage_subtitles``.
``ManageSubtitlesUseCase`` orchestrates the subtitle pipeline:
KB load → pattern resolution → identify → match → place → persist.
Strategy: mock the heavy collaborators (``SubtitleIdentifier``,
``PatternDetector``, ``SubtitleMatcher``, ``SubtitlePlacer``,
``RuleSetRepository``, ``SubtitleMetadataStore``, ``SubtitleKnowledgeBase``)
at the use-case module path. The use case instantiates them inline so each
patch targets a single class symbol.
Coverage:
- ``TestSourceMissing`` — source_not_found short-circuit when neither file
nor parent dir exists.
- ``TestPatternResolution`` — confirmed_pattern_id wins; falls back to
stored confirmed pattern; falls back to detector; falls back to
"adjacent"; pattern_not_found error when KB has nothing.
- ``TestNoTracks`` — empty identifier output → status=ok, empty placed list.
- ``TestEmbeddedShortCircuit`` — EMBEDDED scan_strategy yields ``available``
list and never calls the matcher/placer.
- ``TestMatcherFlow`` — unresolved → needs_clarification; no matches → ok
with skipped_count; happy path runs placer + appends history.
- ``TestDryRun`` — dry_run skips placement, returns predicted destinations.
- ``TestHelpers`` — ``_infer_library_root``, ``_to_imdb_id``,
``_to_unresolved_dto``, ``_pair_placed_with_tracks``.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from alfred.application.filesystem.manage_subtitles import (
ManageSubtitlesUseCase,
_infer_library_root,
_pair_placed_with_tracks,
_to_imdb_id,
_to_unresolved_dto,
)
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleScanResult
from alfred.application.subtitles.placer import PlacedTrack, PlaceResult
from alfred.domain.subtitles.value_objects import (
ScanStrategy,
SubtitleFormat,
SubtitleLanguage,
SubtitleType,
)
SRT = SubtitleFormat(id="srt", extensions=[".srt"])
FRA = SubtitleLanguage(code="fra", tokens=["fr"])
ENG = SubtitleLanguage(code="eng", tokens=["en"])
def _track(
*,
lang=FRA,
fmt=SRT,
stype=SubtitleType.STANDARD,
file_path: Path | None = None,
is_embedded: bool = False,
raw_tokens: list[str] | None = None,
file_size_kb: float | None = None,
) -> SubtitleScanResult:
return SubtitleScanResult(
language=lang,
format=fmt,
subtitle_type=stype,
file_path=file_path,
is_embedded=is_embedded,
raw_tokens=raw_tokens or [],
file_size_kb=file_size_kb,
)
def _pattern(
pid: str = "adjacent", strategy: ScanStrategy = ScanStrategy.ADJACENT
) -> MagicMock:
p = MagicMock()
p.id = pid
p.scan_strategy = strategy
return p
# --------------------------------------------------------------------------- #
# Helper functions #
# --------------------------------------------------------------------------- #
class TestHelpers:
def test_infer_library_root_tv_show(self):
# video → Season 01 → Show
video = Path("/lib/tv/Show/Season.01/E01.mkv")
assert _infer_library_root(video, "tv_show") == Path("/lib/tv/Show")
def test_infer_library_root_movie(self):
video = Path("/lib/movies/Movie.2010/Movie.2010.mkv")
assert _infer_library_root(video, "movie") == Path("/lib/movies/Movie.2010")
def test_to_imdb_id_none_or_empty(self):
assert _to_imdb_id(None) is None
assert _to_imdb_id("") is None
def test_to_imdb_id_valid(self):
out = _to_imdb_id("tt1375666")
assert out is not None
assert str(out) == "tt1375666"
def test_to_imdb_id_invalid_returns_none(self):
assert _to_imdb_id("not-an-imdb-id") is None
def test_to_unresolved_dto_unknown_language(self):
t = _track(lang=None, raw_tokens=["fr", "x"], file_size_kb=12.0)
t.file_path = Path("/x/a.srt")
out = _to_unresolved_dto(t)
assert out.reason == "unknown_language"
assert out.raw_tokens == ["fr", "x"]
assert out.file_path == "/x/a.srt"
assert out.file_size_kb == 12.0
def test_to_unresolved_dto_low_confidence(self):
t = _track(lang=FRA, raw_tokens=["fr"])
out = _to_unresolved_dto(t)
assert out.reason == "low_confidence"
def test_to_unresolved_dto_no_file_path(self):
t = _track(lang=None)
out = _to_unresolved_dto(t)
assert out.file_path is None
def test_pair_placed_with_tracks_by_path(self):
src1, src2 = Path("/in/a.srt"), Path("/in/b.srt")
t1 = _track(file_path=src1, lang=FRA)
t2 = _track(file_path=src2, lang=ENG)
p1 = PlacedTrack(source=src1, destination=Path("/out/a"), filename="a")
p2 = PlacedTrack(source=src2, destination=Path("/out/b"), filename="b")
pairs = _pair_placed_with_tracks([p1, p2], [t1, t2])
assert pairs == [(p1, t1), (p2, t2)]
def test_pair_placed_falls_back_to_positional(self):
# Placed source path doesn't match any track.file_path → fallback uses tracks[0].
t = _track(file_path=Path("/in/known.srt"))
p = PlacedTrack(
source=Path("/in/ghost.srt"), destination=Path("/x"), filename="x"
)
pairs = _pair_placed_with_tracks([p], [t])
assert pairs == [(p, t)]
def test_pair_placed_empty_inputs(self):
assert _pair_placed_with_tracks([], []) == []
# --------------------------------------------------------------------------- #
# Use case shared fixtures #
# --------------------------------------------------------------------------- #
MOD = "alfred.application.filesystem.manage_subtitles"
@pytest.fixture
def video(tmp_path):
"""Real source + destination video paths inside tmp_path."""
src_dir = tmp_path / "dl"
src_dir.mkdir()
src = src_dir / "Movie.2010.mkv"
src.write_bytes(b"")
dest_dir = tmp_path / "lib" / "Movie.2010"
dest_dir.mkdir(parents=True)
dest = dest_dir / "Movie.2010.mkv"
dest.write_bytes(b"")
return src, dest
@pytest.fixture
def patches():
"""Patch all collaborator classes the use case instantiates inline."""
with (
patch(f"{MOD}.KnowledgeLoader") as mock_loader,
patch(f"{MOD}.SubtitleKnowledgeBase") as mock_kb_cls,
patch(f"{MOD}.SubtitleMetadataStore") as mock_store_cls,
patch(f"{MOD}.RuleSetRepository") as mock_repo_cls,
patch(f"{MOD}.SubtitleIdentifier") as mock_id_cls,
patch(f"{MOD}.PatternDetector") as mock_det_cls,
patch(f"{MOD}.SubtitleMatcher") as mock_match_cls,
patch(f"{MOD}.SubtitlePlacer") as mock_place_cls,
patch(f"{MOD}.get_memory") as mock_get_memory,
):
# KB returns a default "adjacent" pattern by default.
kb = mock_kb_cls.return_value
kb.pattern.return_value = _pattern()
# Store starts empty.
store = mock_store_cls.return_value
store.confirmed_pattern.return_value = None
# Detector returns no detection by default.
det = mock_det_cls.return_value
det.detect.return_value = {"detected": None, "confidence": 0.0}
# Identifier: 0 tracks by default.
ident = mock_id_cls.return_value
ident.identify.return_value = MediaSubtitleMetadata(
media_id=None,
media_type="movie",
release_group=None,
detected_pattern_id="adjacent",
)
# Matcher: no matched, no unresolved by default.
matcher = mock_match_cls.return_value
matcher.match.return_value = ([], [])
# Placer: empty result.
placer = mock_place_cls.return_value
placer.place.return_value = PlaceResult(placed=[], skipped=[])
# Rules: simple object passthrough; the use case only forwards it.
repo = mock_repo_cls.return_value
repo.load.return_value.resolve.return_value = MagicMock(name="Rules")
# get_memory: works by default.
mock_get_memory.return_value.ltm.subtitle_preferences = MagicMock()
yield {
"kb": kb,
"store": store,
"repo": repo,
"ident": ident,
"det": det,
"matcher": matcher,
"placer": placer,
"loader": mock_loader,
"get_memory": mock_get_memory,
}
# --------------------------------------------------------------------------- #
# Source missing #
# --------------------------------------------------------------------------- #
class TestSourceMissing:
def test_source_and_parent_missing_returns_error(self, tmp_path):
# Neither path nor parent exists.
uc = ManageSubtitlesUseCase()
out = uc.execute(
source_video=str(tmp_path / "ghost" / "ghost.mkv"),
destination_video=str(tmp_path / "lib" / "x.mkv"),
)
assert out.status == "error"
assert out.error == "source_not_found"
def test_source_missing_but_parent_exists_does_not_error_early(
self, tmp_path, patches
):
# Parent dir exists → use case proceeds. With default mocks the
# identifier returns 0 tracks → status="ok".
(tmp_path / "dl").mkdir()
(tmp_path / "lib").mkdir()
out = ManageSubtitlesUseCase().execute(
source_video=str(tmp_path / "dl" / "missing.mkv"),
destination_video=str(tmp_path / "lib" / "missing.mkv"),
media_type="movie",
)
assert out.status == "ok"
# --------------------------------------------------------------------------- #
# Pattern resolution #
# --------------------------------------------------------------------------- #
class TestPatternResolution:
def test_confirmed_pattern_id_wins(self, video, patches):
src, dest = video
custom = _pattern("subs_flat")
patches["kb"].pattern.side_effect = lambda pid: (
custom if pid == "subs_flat" else _pattern()
)
ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
confirmed_pattern_id="subs_flat",
)
# Identifier called with the confirmed pattern (not the default).
args, kwargs = patches["ident"].identify.call_args
assert kwargs["pattern"].id == "subs_flat"
# Detector should not even run when an explicit confirmation is given.
patches["det"].detect.assert_not_called()
def test_confirmed_pattern_id_unknown_falls_through_to_stored(self, video, patches):
src, dest = video
# KB knows nothing about the requested override → returns None.
# Stored value provides 'subs_flat'.
patches["store"].confirmed_pattern.return_value = "subs_flat"
flat = _pattern("subs_flat")
patches["kb"].pattern.side_effect = lambda pid: {
"subs_flat": flat,
"adjacent": _pattern(),
}.get(pid)
ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
confirmed_pattern_id="DOES_NOT_EXIST",
)
assert patches["ident"].identify.call_args.kwargs["pattern"].id == "subs_flat"
def test_detector_used_when_no_confirmed_and_no_stored(self, video, patches):
src, dest = video
detected = _pattern("episode_subfolder")
patches["det"].detect.return_value = {
"detected": detected,
"confidence": 0.9,
}
ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
)
assert (
patches["ident"].identify.call_args.kwargs["pattern"].id
== "episode_subfolder"
)
def test_detector_low_confidence_falls_back_to_adjacent(self, video, patches):
src, dest = video
patches["det"].detect.return_value = {
"detected": _pattern("episode_subfolder"),
"confidence": 0.1,
}
ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
)
# Falls back via kb.pattern('adjacent')
assert patches["kb"].pattern.call_args_list[-1].args == ("adjacent",)
def test_pattern_not_found_when_kb_returns_none(self, video, patches):
src, dest = video
patches["kb"].pattern.return_value = None # nothing known
patches["det"].detect.return_value = {"detected": None, "confidence": 0.0}
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
)
assert out.status == "error"
assert out.error == "pattern_not_found"
# --------------------------------------------------------------------------- #
# No tracks #
# --------------------------------------------------------------------------- #
class TestNoTracks:
def test_zero_tracks_returns_ok_empty(self, video, patches):
src, dest = video
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
)
assert out.status == "ok"
assert out.placed == []
assert out.skipped_count == 0
# --------------------------------------------------------------------------- #
# Embedded short-circuit #
# --------------------------------------------------------------------------- #
class TestEmbeddedShortCircuit:
def test_embedded_returns_available_and_skips_matcher(self, video, patches):
src, dest = video
patches["kb"].pattern.return_value = _pattern("embedded", ScanStrategy.EMBEDDED)
patches["ident"].identify.return_value = MediaSubtitleMetadata(
media_id=None,
media_type="movie",
release_group=None,
detected_pattern_id="embedded",
embedded_tracks=[
_track(lang=FRA, is_embedded=True),
_track(lang=ENG, stype=SubtitleType.SDH, is_embedded=True),
],
)
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
)
assert out.status == "ok"
assert out.placed == []
assert out.available is not None
langs = {a.language for a in out.available}
assert {"fra", "eng"}.issubset(langs)
patches["matcher"].match.assert_not_called()
patches["placer"].place.assert_not_called()
# --------------------------------------------------------------------------- #
# Matcher flow #
# --------------------------------------------------------------------------- #
class TestMatcherFlow:
def test_unresolved_returns_needs_clarification(self, video, patches):
src, dest = video
ext = [_track(file_path=src.parent / "a.srt")]
patches["ident"].identify.return_value = MediaSubtitleMetadata(
media_id=None,
media_type="movie",
release_group=None,
detected_pattern_id="adjacent",
external_tracks=ext,
)
unresolved_track = _track(
lang=None, raw_tokens=["xx"], file_path=src.parent / "?.srt"
)
patches["matcher"].match.return_value = ([], [unresolved_track])
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
)
assert out.status == "needs_clarification"
assert out.unresolved and out.unresolved[0].reason == "unknown_language"
patches["placer"].place.assert_not_called()
def test_no_matches_returns_ok_with_skipped(self, video, patches):
src, dest = video
patches["ident"].identify.return_value = MediaSubtitleMetadata(
media_id=None,
media_type="movie",
release_group=None,
detected_pattern_id="adjacent",
external_tracks=[_track(file_path=src.parent / "a.srt")],
embedded_tracks=[_track(is_embedded=True)],
)
patches["matcher"].match.return_value = ([], []) # no matches, no unresolved
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
)
assert out.status == "ok"
assert out.placed == []
# total_count = 1 ext + 1 emb = 2
assert out.skipped_count == 2
def test_happy_path_places_and_persists(self, video, patches):
src, dest = video
src_sub = src.parent / "a.srt"
src_sub.write_text("")
matched = [_track(file_path=src_sub, lang=FRA)]
patches["ident"].identify.return_value = MediaSubtitleMetadata(
media_id=None,
media_type="movie",
release_group=None,
detected_pattern_id="adjacent",
external_tracks=matched,
)
patches["matcher"].match.return_value = (matched, [])
placed = PlacedTrack(
source=src_sub,
destination=dest.parent / "Movie.2010.fra.srt",
filename="Movie.2010.fra.srt",
)
patches["placer"].place.return_value = PlaceResult(placed=[placed], skipped=[])
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
release_group="KONTRAST",
season=1,
episode=2,
)
assert out.status == "ok"
assert len(out.placed) == 1
assert out.placed[0].filename == "Movie.2010.fra.srt"
# History was appended with season/episode/group.
patches["store"].append_history.assert_called_once()
args, _ = patches["store"].append_history.call_args
# signature: append_history(pairs, season, episode, release_group)
assert args[1] == 1
assert args[2] == 2
assert args[3] == "KONTRAST"
def test_get_memory_failure_falls_through_to_rules_repo(self, video, patches):
# The use case swallows get_memory() exceptions and continues with
# subtitle_prefs=None. We assert: still progresses past matcher.
src, dest = video
patches["get_memory"].side_effect = RuntimeError("not initialised")
patches["ident"].identify.return_value = MediaSubtitleMetadata(
media_id=None,
media_type="movie",
release_group=None,
detected_pattern_id="adjacent",
external_tracks=[_track(file_path=src.parent / "a.srt")],
)
patches["matcher"].match.return_value = ([], [])
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
)
assert out.status == "ok"
# --------------------------------------------------------------------------- #
# Dry run #
# --------------------------------------------------------------------------- #
class TestDryRun:
def test_dry_run_skips_placer_and_returns_predicted(self, video, patches):
src, dest = video
src_sub = src.parent / "a.srt"
src_sub.write_text("")
matched = [_track(file_path=src_sub, lang=FRA)]
patches["ident"].identify.return_value = MediaSubtitleMetadata(
media_id=None,
media_type="movie",
release_group=None,
detected_pattern_id="adjacent",
external_tracks=matched,
)
patches["matcher"].match.return_value = (matched, [])
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
dry_run=True,
)
assert out.status == "ok"
assert out.placed and out.placed[0].filename.endswith(".fra.srt")
patches["placer"].place.assert_not_called()
patches["store"].append_history.assert_not_called()
def test_dry_run_skips_tracks_without_file_path(self, video, patches):
src, dest = video
matched = [_track(file_path=None, lang=FRA)] # no file_path → skipped
patches["ident"].identify.return_value = MediaSubtitleMetadata(
media_id=None,
media_type="movie",
release_group=None,
detected_pattern_id="adjacent",
external_tracks=matched,
)
patches["matcher"].match.return_value = (matched, [])
out = ManageSubtitlesUseCase().execute(
source_video=str(src),
destination_video=str(dest),
media_type="movie",
dry_run=True,
)
assert out.placed == []