diff --git a/alfred/domain/subtitles/scanner.py b/alfred/domain/subtitles/scanner.py deleted file mode 100644 index 0a30d00..0000000 --- a/alfred/domain/subtitles/scanner.py +++ /dev/null @@ -1,207 +0,0 @@ -"""SubtitleScanner — inspects local subtitle files and filters them per user preferences. - -Given a video file path, the scanner: - 1. Looks for subtitle files in the same directory as the video. - 2. Optionally also inspects a Subs/ subfolder adjacent to the video. - 3. Classifies each file (language, SDH, forced) from its filename, delegating - all token knowledge to SubtitleKnowledgeBase (which itself merges - LanguageRegistry + subtitle-specific tokens from subtitles.yaml). - 4. Filters according to SubtitlePreferences (languages, min_size_kb, keep_sdh, - keep_forced). - 5. Returns a list of SubtitleCandidate — one per file that passes the filter, - with the destination filename already computed. - -Filename classification heuristics ------------------------------------ -We parse the stem of each subtitle file looking for known patterns: - - fre.srt → lang=fre, sdh=False, forced=False - fre.sdh.srt → lang=fre, sdh=True - fre.forced.srt → lang=fre, forced=True - Breaking.Bad.S01E01.French.srt → lang=fre (alias match via LanguageRegistry) - Breaking.Bad.S01E01.VOSTFR.srt → lang=fre (subtitle-specific token) - -ISO 639-2/B codes are used throughout (matching the project-wide canonical form -from iso_languages.yaml — what ffprobe emits). - -Output naming convention (matches SubtitlePreferences docstring): - {lang}.srt - {lang}.sdh.srt - {lang}.forced.srt -""" - -import logging -import re -from dataclasses import dataclass -from pathlib import Path - -from .knowledge.base import SubtitleKnowledgeBase -from .value_objects import SubtitleType - -logger = logging.getLogger(__name__) - -_TOKEN_SPLIT = re.compile(r"[\.\s_\-]+") - - -@dataclass -class SubtitleCandidate: - """A subtitle file that passed the filter, ready to be placed.""" - - source_path: Path - language: str # ISO 639-2/B code, e.g. "fre" - is_sdh: bool - is_forced: bool - extension: str # e.g. ".srt" - - @property - def destination_name(self) -> str: - """ - Compute the destination filename per naming convention: - {lang}.srt - {lang}.sdh.srt - {lang}.forced.srt - """ - ext = self.extension.lstrip(".") - parts = [self.language] - if self.is_sdh: - parts.append("sdh") - elif self.is_forced: - parts.append("forced") - return ".".join(parts) + "." + ext - - -# Module-level KB instance — built lazily on first use to avoid loading YAML at import. -_KB: SubtitleKnowledgeBase | None = None - - -def _kb() -> SubtitleKnowledgeBase: - global _KB # noqa: PLW0603 — intentional lazy module-level cache - if _KB is None: - _KB = SubtitleKnowledgeBase() - return _KB - - -def _classify(path: Path) -> tuple[str | None, bool, bool]: - """ - Parse a subtitle filename and return (language_code, is_sdh, is_forced). - - ``language_code`` is the ISO 639-2/B canonical code (e.g. ``"fre"``). - Returns (None, False, False) if the language cannot be determined. - """ - stem = path.stem.lower() - tokens = _TOKEN_SPLIT.split(stem) - kb = _kb() - - language: str | None = None - is_sdh = False - is_forced = False - - for token in tokens: - if not token: - continue - if language is None: - lang = kb.language_for_token(token) - if lang is not None: - language = lang.code - continue - stype = kb.type_for_token(token) - if stype is SubtitleType.SDH: - is_sdh = True - elif stype is SubtitleType.FORCED: - is_forced = True - - return language, is_sdh, is_forced - - -class SubtitleScanner: - """ - Scans subtitle files next to a video and filters them per SubtitlePreferences. - - Usage: - scanner = SubtitleScanner(prefs) - candidates = scanner.scan(video_path) - # Each candidate has .source_path and .destination_name - """ - - def __init__( - self, languages: list[str], min_size_kb: int, keep_sdh: bool, keep_forced: bool - ): - self.languages = [lang.lower() for lang in languages] - self.min_size_kb = min_size_kb - self.keep_sdh = keep_sdh - self.keep_forced = keep_forced - self._kb = _kb() - self._subtitle_extensions = {e.lower() for e in self._kb.known_extensions()} - - def scan(self, video_path: Path) -> list[SubtitleCandidate]: - """ - Return all subtitle candidates found next to the video that pass the filter. - - Scans: - - Same directory as the video (flat siblings) - - Subs/ subfolder if present - """ - candidates: list[SubtitleCandidate] = [] - search_dirs = [video_path.parent] - - subs_dir = video_path.parent / "Subs" - if subs_dir.is_dir(): - search_dirs.append(subs_dir) - logger.debug(f"SubtitleScanner: found Subs/ folder at {subs_dir}") - - for directory in search_dirs: - for path in sorted(directory.iterdir()): - if not path.is_file(): - continue - if path.suffix.lower() not in self._subtitle_extensions: - continue - - candidate = self._evaluate(path) - if candidate is not None: - candidates.append(candidate) - - logger.info( - f"SubtitleScanner: {len(candidates)} candidate(s) found for {video_path.name}" - ) - return candidates - - def _evaluate(self, path: Path) -> SubtitleCandidate | None: - """Apply all filters to a single subtitle file. Returns None if it should be dropped.""" - # Size filter - size_kb = path.stat().st_size / 1024 - if size_kb < self.min_size_kb: - logger.debug( - f"SubtitleScanner: skip {path.name} (too small: {size_kb:.1f} KB)" - ) - return None - - language, is_sdh, is_forced = _classify(path) - - # Language filter - if language is None: - logger.debug(f"SubtitleScanner: skip {path.name} (language unknown)") - return None - - if language not in self.languages: - logger.debug( - f"SubtitleScanner: skip {path.name} (language '{language}' not in prefs)" - ) - return None - - # SDH filter - if is_sdh and not self.keep_sdh: - logger.debug(f"SubtitleScanner: skip {path.name} (SDH not wanted)") - return None - - # Forced filter - if is_forced and not self.keep_forced: - logger.debug(f"SubtitleScanner: skip {path.name} (forced not wanted)") - return None - - return SubtitleCandidate( - source_path=path, - language=language, - is_sdh=is_sdh, - is_forced=is_forced, - extension=path.suffix.lower(), - ) diff --git a/tests/domain/test_subtitle_scanner.py b/tests/domain/test_subtitle_scanner.py deleted file mode 100644 index 5251de3..0000000 --- a/tests/domain/test_subtitle_scanner.py +++ /dev/null @@ -1,232 +0,0 @@ -"""Tests for SubtitleScanner and _classify helper.""" - -from pathlib import Path - -from alfred.domain.subtitles.scanner import ( - SubtitleCandidate, - SubtitleScanner, - _classify, -) - -# --------------------------------------------------------------------------- -# _classify — unit tests for the filename parser -# --------------------------------------------------------------------------- - - -class TestClassify: - def test_iso_lang_code_639_1_alias(self, tmp_path): - # ``fr`` is an alias of the canonical ISO 639-2/B code ``fre``. - p = tmp_path / "fr.srt" - p.write_text("") - lang, is_sdh, is_forced = _classify(p) - assert lang == "fre" - assert not is_sdh - assert not is_forced - - def test_english_keyword(self, tmp_path): - p = tmp_path / "english.srt" - p.write_text("") - lang, _, _ = _classify(p) - assert lang == "eng" - - def test_french_keyword(self, tmp_path): - p = tmp_path / "Show.S01E01.French.srt" - p.write_text("") - lang, _, _ = _classify(p) - assert lang == "fre" - - def test_vostfr_is_french(self, tmp_path): - p = tmp_path / "Show.S01E01.VOSTFR.srt" - p.write_text("") - lang, _, _ = _classify(p) - assert lang == "fre" - - def test_sdh_token(self, tmp_path): - p = tmp_path / "fre.sdh.srt" - p.write_text("") - lang, is_sdh, _ = _classify(p) - assert lang == "fre" - assert is_sdh - - def test_hi_no_longer_marks_sdh(self, tmp_path): - # ``hi`` is the ISO 639-1 alias for Hindi; it must not mark a file as - # SDH any more (regression of the previous collision between SDH and - # Hindi tokens). Use ``sdh`` / ``cc`` / ``hearing`` to flag SDH instead. - p = tmp_path / "en.hi.srt" - p.write_text("") - lang, is_sdh, _ = _classify(p) - assert lang == "eng" - assert not is_sdh - - def test_forced_token(self, tmp_path): - p = tmp_path / "fre.forced.srt" - p.write_text("") - _, _, is_forced = _classify(p) - assert is_forced - - def test_unknown_language_returns_none(self, tmp_path): - p = tmp_path / "Show.S01E01.720p.srt" - p.write_text("") - lang, _, _ = _classify(p) - assert lang is None - - def test_dot_separator(self, tmp_path): - p = tmp_path / "fre.sdh.srt" - p.write_text("") - lang, is_sdh, _ = _classify(p) - assert lang == "fre" - assert is_sdh - - def test_hyphen_separator(self, tmp_path): - p = tmp_path / "fre-forced.srt" - p.write_text("") - lang, _, is_forced = _classify(p) - assert lang == "fre" - assert is_forced - - -# --------------------------------------------------------------------------- -# SubtitleCandidate.destination_name -# --------------------------------------------------------------------------- - - -class TestSubtitleCandidateDestinationName: - def _make(self, lang="fre", is_sdh=False, is_forced=False, ext=".srt", path=None): - return SubtitleCandidate( - source_path=path or Path("/fake/fre.srt"), - language=lang, - is_sdh=is_sdh, - is_forced=is_forced, - extension=ext, - ) - - def test_standard(self): - assert self._make().destination_name == "fre.srt" - - def test_sdh(self): - assert self._make(is_sdh=True).destination_name == "fre.sdh.srt" - - def test_forced(self): - assert self._make(is_forced=True).destination_name == "fre.forced.srt" - - def test_ass_extension(self): - assert self._make(ext=".ass").destination_name == "fre.ass" - - def test_english_standard(self): - assert self._make(lang="eng").destination_name == "eng.srt" - - -# --------------------------------------------------------------------------- -# SubtitleScanner — integration with real filesystem -# --------------------------------------------------------------------------- - - -class TestSubtitleScanner: - def _scanner(self, languages=None, min_size_kb=0, keep_sdh=True, keep_forced=True): - return SubtitleScanner( - languages=languages or ["fre", "eng"], - min_size_kb=min_size_kb, - keep_sdh=keep_sdh, - keep_forced=keep_forced, - ) - - def _video(self, tmp_path): - video = tmp_path / "Movie.mkv" - video.write_bytes(b"video") - return video - - def test_finds_adjacent_subtitle(self, tmp_path): - video = self._video(tmp_path) - (tmp_path / "fre.srt").write_text("subtitle content") - - candidates = self._scanner().scan(video) - - assert len(candidates) == 1 - assert candidates[0].language == "fre" - - def test_finds_adjacent_subtitle_legacy_639_1(self, tmp_path): - # Reading existing media libraries: ``fr.srt`` is still recognized as - # French and classified canonically as ``fre`` — covers user libraries - # written before the ISO 639-2/B migration. - video = self._video(tmp_path) - (tmp_path / "fr.srt").write_text("subtitle content") - - candidates = self._scanner().scan(video) - - assert len(candidates) == 1 - assert candidates[0].language == "fre" - - def test_finds_multiple_languages(self, tmp_path): - video = self._video(tmp_path) - (tmp_path / "fre.srt").write_text("fr subtitle") - (tmp_path / "eng.srt").write_text("en subtitle") - - candidates = self._scanner().scan(video) - langs = {c.language for c in candidates} - assert langs == {"fre", "eng"} - - def test_scans_subs_subfolder(self, tmp_path): - video = self._video(tmp_path) - subs = tmp_path / "Subs" - subs.mkdir() - (subs / "fre.srt").write_text("subtitle") - - candidates = self._scanner().scan(video) - assert any(c.language == "fre" for c in candidates) - - def test_filters_unknown_language(self, tmp_path): - video = self._video(tmp_path) - (tmp_path / "unknown.srt").write_text("subtitle") - - candidates = self._scanner().scan(video) - assert len(candidates) == 0 - - def test_filters_wrong_language(self, tmp_path): - video = self._video(tmp_path) - (tmp_path / "ger.srt").write_text("german subtitle") - - candidates = self._scanner(languages=["fre"]).scan(video) - assert len(candidates) == 0 - - def test_filters_too_small_file(self, tmp_path): - video = self._video(tmp_path) - small = tmp_path / "fre.srt" - small.write_bytes(b"x") # 1 byte, well below any min_size_kb - - candidates = self._scanner(min_size_kb=10).scan(video) - assert len(candidates) == 0 - - def test_filters_sdh_when_not_wanted(self, tmp_path): - video = self._video(tmp_path) - (tmp_path / "fre.sdh.srt").write_text("sdh subtitle") - - candidates = self._scanner(keep_sdh=False).scan(video) - assert len(candidates) == 0 - - def test_filters_forced_when_not_wanted(self, tmp_path): - video = self._video(tmp_path) - (tmp_path / "fre.forced.srt").write_text("forced subtitle") - - candidates = self._scanner(keep_forced=False).scan(video) - assert len(candidates) == 0 - - def test_keeps_sdh_when_wanted(self, tmp_path): - video = self._video(tmp_path) - (tmp_path / "fre.sdh.srt").write_text("sdh subtitle") - - candidates = self._scanner(keep_sdh=True).scan(video) - assert len(candidates) == 1 - assert candidates[0].is_sdh - - def test_ignores_non_subtitle_files(self, tmp_path): - video = self._video(tmp_path) - (tmp_path / "fre.nfo").write_text("nfo file") - (tmp_path / "fre.jpg").write_bytes(b"image") - - candidates = self._scanner().scan(video) - assert len(candidates) == 0 - - def test_returns_empty_when_no_subtitles(self, tmp_path): - video = self._video(tmp_path) - candidates = self._scanner().scan(video) - assert candidates == []