566 lines
21 KiB
Python
566 lines
21 KiB
Python
"""Tests for ``alfred.application.filesystem.manage_subtitles``.
|
|
|
|
``ManageSubtitlesUseCase`` orchestrates the subtitle pipeline:
|
|
KB load → pattern resolution → identify → match → place → persist.
|
|
|
|
Strategy: mock the heavy collaborators (``SubtitleIdentifier``,
|
|
``PatternDetector``, ``SubtitleMatcher``, ``SubtitlePlacer``,
|
|
``RuleSetRepository``, ``SubtitleMetadataStore``, ``SubtitleKnowledgeBase``)
|
|
at the use-case module path. The use case instantiates them inline so each
|
|
patch targets a single class symbol.
|
|
|
|
Coverage:
|
|
|
|
- ``TestSourceMissing`` — source_not_found short-circuit when neither file
|
|
nor parent dir exists.
|
|
- ``TestPatternResolution`` — confirmed_pattern_id wins; falls back to
|
|
stored confirmed pattern; falls back to detector; falls back to
|
|
"adjacent"; pattern_not_found error when KB has nothing.
|
|
- ``TestNoTracks`` — empty identifier output → status=ok, empty placed list.
|
|
- ``TestEmbeddedShortCircuit`` — EMBEDDED scan_strategy yields ``available``
|
|
list and never calls the matcher/placer.
|
|
- ``TestMatcherFlow`` — unresolved → needs_clarification; no matches → ok
|
|
with skipped_count; happy path runs placer + appends history.
|
|
- ``TestDryRun`` — dry_run skips placement, returns predicted destinations.
|
|
- ``TestHelpers`` — ``_infer_library_root``, ``_to_imdb_id``,
|
|
``_to_unresolved_dto``, ``_pair_placed_with_tracks``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from alfred.application.filesystem.manage_subtitles import (
|
|
ManageSubtitlesUseCase,
|
|
_infer_library_root,
|
|
_pair_placed_with_tracks,
|
|
_to_imdb_id,
|
|
_to_unresolved_dto,
|
|
)
|
|
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleCandidate
|
|
from alfred.domain.subtitles.services.placer import PlacedTrack, PlaceResult
|
|
from alfred.domain.subtitles.value_objects import (
|
|
ScanStrategy,
|
|
SubtitleFormat,
|
|
SubtitleLanguage,
|
|
SubtitleType,
|
|
)
|
|
|
|
SRT = SubtitleFormat(id="srt", extensions=[".srt"])
|
|
FRA = SubtitleLanguage(code="fra", tokens=["fr"])
|
|
ENG = SubtitleLanguage(code="eng", tokens=["en"])
|
|
|
|
|
|
def _track(
|
|
*,
|
|
lang=FRA,
|
|
fmt=SRT,
|
|
stype=SubtitleType.STANDARD,
|
|
file_path: Path | None = None,
|
|
is_embedded: bool = False,
|
|
raw_tokens: list[str] | None = None,
|
|
file_size_kb: float | None = None,
|
|
) -> SubtitleCandidate:
|
|
return SubtitleCandidate(
|
|
language=lang,
|
|
format=fmt,
|
|
subtitle_type=stype,
|
|
file_path=file_path,
|
|
is_embedded=is_embedded,
|
|
raw_tokens=raw_tokens or [],
|
|
file_size_kb=file_size_kb,
|
|
)
|
|
|
|
|
|
def _pattern(
|
|
pid: str = "adjacent", strategy: ScanStrategy = ScanStrategy.ADJACENT
|
|
) -> MagicMock:
|
|
p = MagicMock()
|
|
p.id = pid
|
|
p.scan_strategy = strategy
|
|
return p
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Helper functions #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestHelpers:
|
|
def test_infer_library_root_tv_show(self):
|
|
# video → Season 01 → Show
|
|
video = Path("/lib/tv/Show/Season.01/E01.mkv")
|
|
assert _infer_library_root(video, "tv_show") == Path("/lib/tv/Show")
|
|
|
|
def test_infer_library_root_movie(self):
|
|
video = Path("/lib/movies/Movie.2010/Movie.2010.mkv")
|
|
assert _infer_library_root(video, "movie") == Path("/lib/movies/Movie.2010")
|
|
|
|
def test_to_imdb_id_none_or_empty(self):
|
|
assert _to_imdb_id(None) is None
|
|
assert _to_imdb_id("") is None
|
|
|
|
def test_to_imdb_id_valid(self):
|
|
out = _to_imdb_id("tt1375666")
|
|
assert out is not None
|
|
assert str(out) == "tt1375666"
|
|
|
|
def test_to_imdb_id_invalid_returns_none(self):
|
|
assert _to_imdb_id("not-an-imdb-id") is None
|
|
|
|
def test_to_unresolved_dto_unknown_language(self):
|
|
t = _track(lang=None, raw_tokens=["fr", "x"], file_size_kb=12.0)
|
|
t.file_path = Path("/x/a.srt")
|
|
out = _to_unresolved_dto(t)
|
|
assert out.reason == "unknown_language"
|
|
assert out.raw_tokens == ["fr", "x"]
|
|
assert out.file_path == "/x/a.srt"
|
|
assert out.file_size_kb == 12.0
|
|
|
|
def test_to_unresolved_dto_low_confidence(self):
|
|
t = _track(lang=FRA, raw_tokens=["fr"])
|
|
out = _to_unresolved_dto(t)
|
|
assert out.reason == "low_confidence"
|
|
|
|
def test_to_unresolved_dto_no_file_path(self):
|
|
t = _track(lang=None)
|
|
out = _to_unresolved_dto(t)
|
|
assert out.file_path is None
|
|
|
|
def test_pair_placed_with_tracks_by_path(self):
|
|
src1, src2 = Path("/in/a.srt"), Path("/in/b.srt")
|
|
t1 = _track(file_path=src1, lang=FRA)
|
|
t2 = _track(file_path=src2, lang=ENG)
|
|
p1 = PlacedTrack(source=src1, destination=Path("/out/a"), filename="a")
|
|
p2 = PlacedTrack(source=src2, destination=Path("/out/b"), filename="b")
|
|
pairs = _pair_placed_with_tracks([p1, p2], [t1, t2])
|
|
assert pairs == [(p1, t1), (p2, t2)]
|
|
|
|
def test_pair_placed_falls_back_to_positional(self):
|
|
# Placed source path doesn't match any track.file_path → fallback uses tracks[0].
|
|
t = _track(file_path=Path("/in/known.srt"))
|
|
p = PlacedTrack(
|
|
source=Path("/in/ghost.srt"), destination=Path("/x"), filename="x"
|
|
)
|
|
pairs = _pair_placed_with_tracks([p], [t])
|
|
assert pairs == [(p, t)]
|
|
|
|
def test_pair_placed_empty_inputs(self):
|
|
assert _pair_placed_with_tracks([], []) == []
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Use case shared fixtures #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
MOD = "alfred.application.filesystem.manage_subtitles"
|
|
|
|
|
|
@pytest.fixture
|
|
def video(tmp_path):
|
|
"""Real source + destination video paths inside tmp_path."""
|
|
src_dir = tmp_path / "dl"
|
|
src_dir.mkdir()
|
|
src = src_dir / "Movie.2010.mkv"
|
|
src.write_bytes(b"")
|
|
dest_dir = tmp_path / "lib" / "Movie.2010"
|
|
dest_dir.mkdir(parents=True)
|
|
dest = dest_dir / "Movie.2010.mkv"
|
|
dest.write_bytes(b"")
|
|
return src, dest
|
|
|
|
|
|
@pytest.fixture
|
|
def patches():
|
|
"""Patch all collaborator classes the use case instantiates inline."""
|
|
with (
|
|
patch(f"{MOD}.KnowledgeLoader") as mock_loader,
|
|
patch(f"{MOD}.SubtitleKnowledgeBase") as mock_kb_cls,
|
|
patch(f"{MOD}.SubtitleMetadataStore") as mock_store_cls,
|
|
patch(f"{MOD}.RuleSetRepository") as mock_repo_cls,
|
|
patch(f"{MOD}.SubtitleIdentifier") as mock_id_cls,
|
|
patch(f"{MOD}.PatternDetector") as mock_det_cls,
|
|
patch(f"{MOD}.SubtitleMatcher") as mock_match_cls,
|
|
patch(f"{MOD}.SubtitlePlacer") as mock_place_cls,
|
|
patch(f"{MOD}.get_memory") as mock_get_memory,
|
|
):
|
|
# KB returns a default "adjacent" pattern by default.
|
|
kb = mock_kb_cls.return_value
|
|
kb.pattern.return_value = _pattern()
|
|
|
|
# Store starts empty.
|
|
store = mock_store_cls.return_value
|
|
store.confirmed_pattern.return_value = None
|
|
|
|
# Detector returns no detection by default.
|
|
det = mock_det_cls.return_value
|
|
det.detect.return_value = {"detected": None, "confidence": 0.0}
|
|
|
|
# Identifier: 0 tracks by default.
|
|
ident = mock_id_cls.return_value
|
|
ident.identify.return_value = MediaSubtitleMetadata(
|
|
media_id=None,
|
|
media_type="movie",
|
|
release_group=None,
|
|
detected_pattern_id="adjacent",
|
|
)
|
|
|
|
# Matcher: no matched, no unresolved by default.
|
|
matcher = mock_match_cls.return_value
|
|
matcher.match.return_value = ([], [])
|
|
|
|
# Placer: empty result.
|
|
placer = mock_place_cls.return_value
|
|
placer.place.return_value = PlaceResult(placed=[], skipped=[])
|
|
|
|
# Rules: simple object passthrough; the use case only forwards it.
|
|
repo = mock_repo_cls.return_value
|
|
repo.load.return_value.resolve.return_value = MagicMock(name="Rules")
|
|
|
|
# get_memory: works by default.
|
|
mock_get_memory.return_value.ltm.subtitle_preferences = MagicMock()
|
|
|
|
yield {
|
|
"kb": kb,
|
|
"store": store,
|
|
"repo": repo,
|
|
"ident": ident,
|
|
"det": det,
|
|
"matcher": matcher,
|
|
"placer": placer,
|
|
"loader": mock_loader,
|
|
"get_memory": mock_get_memory,
|
|
}
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Source missing #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestSourceMissing:
|
|
def test_source_and_parent_missing_returns_error(self, tmp_path):
|
|
# Neither path nor parent exists.
|
|
uc = ManageSubtitlesUseCase()
|
|
out = uc.execute(
|
|
source_video=str(tmp_path / "ghost" / "ghost.mkv"),
|
|
destination_video=str(tmp_path / "lib" / "x.mkv"),
|
|
)
|
|
assert out.status == "error"
|
|
assert out.error == "source_not_found"
|
|
|
|
def test_source_missing_but_parent_exists_does_not_error_early(
|
|
self, tmp_path, patches
|
|
):
|
|
# Parent dir exists → use case proceeds. With default mocks the
|
|
# identifier returns 0 tracks → status="ok".
|
|
(tmp_path / "dl").mkdir()
|
|
(tmp_path / "lib").mkdir()
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(tmp_path / "dl" / "missing.mkv"),
|
|
destination_video=str(tmp_path / "lib" / "missing.mkv"),
|
|
media_type="movie",
|
|
)
|
|
assert out.status == "ok"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Pattern resolution #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestPatternResolution:
|
|
def test_confirmed_pattern_id_wins(self, video, patches):
|
|
src, dest = video
|
|
custom = _pattern("subs_flat")
|
|
patches["kb"].pattern.side_effect = lambda pid: (
|
|
custom if pid == "subs_flat" else _pattern()
|
|
)
|
|
ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
confirmed_pattern_id="subs_flat",
|
|
)
|
|
# Identifier called with the confirmed pattern (not the default).
|
|
args, kwargs = patches["ident"].identify.call_args
|
|
assert kwargs["pattern"].id == "subs_flat"
|
|
# Detector should not even run when an explicit confirmation is given.
|
|
patches["det"].detect.assert_not_called()
|
|
|
|
def test_confirmed_pattern_id_unknown_falls_through_to_stored(self, video, patches):
|
|
src, dest = video
|
|
# KB knows nothing about the requested override → returns None.
|
|
# Stored value provides 'subs_flat'.
|
|
patches["store"].confirmed_pattern.return_value = "subs_flat"
|
|
flat = _pattern("subs_flat")
|
|
patches["kb"].pattern.side_effect = lambda pid: {
|
|
"subs_flat": flat,
|
|
"adjacent": _pattern(),
|
|
}.get(pid)
|
|
ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
confirmed_pattern_id="DOES_NOT_EXIST",
|
|
)
|
|
assert patches["ident"].identify.call_args.kwargs["pattern"].id == "subs_flat"
|
|
|
|
def test_detector_used_when_no_confirmed_and_no_stored(self, video, patches):
|
|
src, dest = video
|
|
detected = _pattern("episode_subfolder")
|
|
patches["det"].detect.return_value = {
|
|
"detected": detected,
|
|
"confidence": 0.9,
|
|
}
|
|
ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
)
|
|
assert (
|
|
patches["ident"].identify.call_args.kwargs["pattern"].id
|
|
== "episode_subfolder"
|
|
)
|
|
|
|
def test_detector_low_confidence_falls_back_to_adjacent(self, video, patches):
|
|
src, dest = video
|
|
patches["det"].detect.return_value = {
|
|
"detected": _pattern("episode_subfolder"),
|
|
"confidence": 0.1,
|
|
}
|
|
ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
)
|
|
# Falls back via kb.pattern('adjacent')
|
|
assert patches["kb"].pattern.call_args_list[-1].args == ("adjacent",)
|
|
|
|
def test_pattern_not_found_when_kb_returns_none(self, video, patches):
|
|
src, dest = video
|
|
patches["kb"].pattern.return_value = None # nothing known
|
|
patches["det"].detect.return_value = {"detected": None, "confidence": 0.0}
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
)
|
|
assert out.status == "error"
|
|
assert out.error == "pattern_not_found"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# No tracks #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestNoTracks:
|
|
def test_zero_tracks_returns_ok_empty(self, video, patches):
|
|
src, dest = video
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
)
|
|
assert out.status == "ok"
|
|
assert out.placed == []
|
|
assert out.skipped_count == 0
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Embedded short-circuit #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestEmbeddedShortCircuit:
|
|
def test_embedded_returns_available_and_skips_matcher(self, video, patches):
|
|
src, dest = video
|
|
patches["kb"].pattern.return_value = _pattern("embedded", ScanStrategy.EMBEDDED)
|
|
patches["ident"].identify.return_value = MediaSubtitleMetadata(
|
|
media_id=None,
|
|
media_type="movie",
|
|
release_group=None,
|
|
detected_pattern_id="embedded",
|
|
embedded_tracks=[
|
|
_track(lang=FRA, is_embedded=True),
|
|
_track(lang=ENG, stype=SubtitleType.SDH, is_embedded=True),
|
|
],
|
|
)
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
)
|
|
assert out.status == "ok"
|
|
assert out.placed == []
|
|
assert out.available is not None
|
|
langs = {a.language for a in out.available}
|
|
assert {"fra", "eng"}.issubset(langs)
|
|
patches["matcher"].match.assert_not_called()
|
|
patches["placer"].place.assert_not_called()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Matcher flow #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestMatcherFlow:
|
|
def test_unresolved_returns_needs_clarification(self, video, patches):
|
|
src, dest = video
|
|
ext = [_track(file_path=src.parent / "a.srt")]
|
|
patches["ident"].identify.return_value = MediaSubtitleMetadata(
|
|
media_id=None,
|
|
media_type="movie",
|
|
release_group=None,
|
|
detected_pattern_id="adjacent",
|
|
external_tracks=ext,
|
|
)
|
|
unresolved_track = _track(
|
|
lang=None, raw_tokens=["xx"], file_path=src.parent / "?.srt"
|
|
)
|
|
patches["matcher"].match.return_value = ([], [unresolved_track])
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
)
|
|
assert out.status == "needs_clarification"
|
|
assert out.unresolved and out.unresolved[0].reason == "unknown_language"
|
|
patches["placer"].place.assert_not_called()
|
|
|
|
def test_no_matches_returns_ok_with_skipped(self, video, patches):
|
|
src, dest = video
|
|
patches["ident"].identify.return_value = MediaSubtitleMetadata(
|
|
media_id=None,
|
|
media_type="movie",
|
|
release_group=None,
|
|
detected_pattern_id="adjacent",
|
|
external_tracks=[_track(file_path=src.parent / "a.srt")],
|
|
embedded_tracks=[_track(is_embedded=True)],
|
|
)
|
|
patches["matcher"].match.return_value = ([], []) # no matches, no unresolved
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
)
|
|
assert out.status == "ok"
|
|
assert out.placed == []
|
|
# total_count = 1 ext + 1 emb = 2
|
|
assert out.skipped_count == 2
|
|
|
|
def test_happy_path_places_and_persists(self, video, patches):
|
|
src, dest = video
|
|
src_sub = src.parent / "a.srt"
|
|
src_sub.write_text("")
|
|
matched = [_track(file_path=src_sub, lang=FRA)]
|
|
patches["ident"].identify.return_value = MediaSubtitleMetadata(
|
|
media_id=None,
|
|
media_type="movie",
|
|
release_group=None,
|
|
detected_pattern_id="adjacent",
|
|
external_tracks=matched,
|
|
)
|
|
patches["matcher"].match.return_value = (matched, [])
|
|
placed = PlacedTrack(
|
|
source=src_sub,
|
|
destination=dest.parent / "Movie.2010.fra.srt",
|
|
filename="Movie.2010.fra.srt",
|
|
)
|
|
patches["placer"].place.return_value = PlaceResult(placed=[placed], skipped=[])
|
|
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
release_group="KONTRAST",
|
|
season=1,
|
|
episode=2,
|
|
)
|
|
assert out.status == "ok"
|
|
assert len(out.placed) == 1
|
|
assert out.placed[0].filename == "Movie.2010.fra.srt"
|
|
# History was appended with season/episode/group.
|
|
patches["store"].append_history.assert_called_once()
|
|
args, _ = patches["store"].append_history.call_args
|
|
# signature: append_history(pairs, season, episode, release_group)
|
|
assert args[1] == 1
|
|
assert args[2] == 2
|
|
assert args[3] == "KONTRAST"
|
|
|
|
def test_get_memory_failure_falls_through_to_rules_repo(self, video, patches):
|
|
# The use case swallows get_memory() exceptions and continues with
|
|
# subtitle_prefs=None. We assert: still progresses past matcher.
|
|
src, dest = video
|
|
patches["get_memory"].side_effect = RuntimeError("not initialised")
|
|
patches["ident"].identify.return_value = MediaSubtitleMetadata(
|
|
media_id=None,
|
|
media_type="movie",
|
|
release_group=None,
|
|
detected_pattern_id="adjacent",
|
|
external_tracks=[_track(file_path=src.parent / "a.srt")],
|
|
)
|
|
patches["matcher"].match.return_value = ([], [])
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
)
|
|
assert out.status == "ok"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Dry run #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestDryRun:
|
|
def test_dry_run_skips_placer_and_returns_predicted(self, video, patches):
|
|
src, dest = video
|
|
src_sub = src.parent / "a.srt"
|
|
src_sub.write_text("")
|
|
matched = [_track(file_path=src_sub, lang=FRA)]
|
|
patches["ident"].identify.return_value = MediaSubtitleMetadata(
|
|
media_id=None,
|
|
media_type="movie",
|
|
release_group=None,
|
|
detected_pattern_id="adjacent",
|
|
external_tracks=matched,
|
|
)
|
|
patches["matcher"].match.return_value = (matched, [])
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
dry_run=True,
|
|
)
|
|
assert out.status == "ok"
|
|
assert out.placed and out.placed[0].filename.endswith(".fra.srt")
|
|
patches["placer"].place.assert_not_called()
|
|
patches["store"].append_history.assert_not_called()
|
|
|
|
def test_dry_run_skips_tracks_without_file_path(self, video, patches):
|
|
src, dest = video
|
|
matched = [_track(file_path=None, lang=FRA)] # no file_path → skipped
|
|
patches["ident"].identify.return_value = MediaSubtitleMetadata(
|
|
media_id=None,
|
|
media_type="movie",
|
|
release_group=None,
|
|
detected_pattern_id="adjacent",
|
|
external_tracks=matched,
|
|
)
|
|
patches["matcher"].match.return_value = (matched, [])
|
|
out = ManageSubtitlesUseCase().execute(
|
|
source_video=str(src),
|
|
destination_video=str(dest),
|
|
media_type="movie",
|
|
dry_run=True,
|
|
)
|
|
assert out.placed == []
|