"""Tests for ``alfred.domain.subtitles.services.identifier``. Coverage: - ``TestTokenize`` — ``_tokenize`` strips parentheses and splits on ``[.\\s_-]``; ``_tokenize_suffix`` peels the episode stem prefix. - ``TestCountEntries`` — last-cue-number heuristic for SRT files. - ``TestEmbedded`` — ffprobe is mocked; dispositions map to SDH/FORCED / STANDARD; non-existent file → empty list; ffprobe error → empty. - ``TestAdjacent`` — adjacent strategy: only known extensions, excludes the video file itself. - ``TestFlat`` — Subs/ folder adjacent or at release root. - ``TestEpisodeSubfolder`` — Subs/{stem}/*.srt; tokens after prefix. - ``TestClassify`` — language + type token detection, confidence math. - ``TestSizeDisambiguation`` — size_and_count post-processing rules (2-track → standard+sdh; 3+ → forced + standard + sdh). """ from __future__ import annotations from unittest.mock import patch import pytest from alfred.domain.subtitles.entities import SubtitleCandidate from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase from alfred.domain.subtitles.services.identifier import ( SubtitleIdentifier, _count_entries, _tokenize, _tokenize_suffix, ) from alfred.domain.subtitles.value_objects import ( ScanStrategy, SubtitleLanguage, SubtitlePattern, SubtitleType, TypeDetectionMethod, ) @pytest.fixture(scope="module") def kb(): return SubtitleKnowledgeBase() @pytest.fixture def identifier(kb): return SubtitleIdentifier(kb) def _pattern( strategy: ScanStrategy, root_folder: str | None = None, detection: TypeDetectionMethod = TypeDetectionMethod.TOKEN_IN_NAME, ) -> SubtitlePattern: return SubtitlePattern( id=f"test-{strategy.value}", description="", scan_strategy=strategy, root_folder=root_folder, type_detection=detection, ) # --------------------------------------------------------------------------- # # _tokenize / _tokenize_suffix # # --------------------------------------------------------------------------- # class TestTokenize: def test_basic_dotted(self): assert _tokenize("Show.S01E01.French") == ["show", "s01e01", "french"] def test_mixed_separators(self): assert _tokenize("Show_S01-E01 French") == ["show", "s01", "e01", "french"] def test_strips_parenthesized(self): assert _tokenize("episode (Brazil).French") == ["episode", "french"] def test_empty_string(self): assert _tokenize("") == [] def test_suffix_strips_episode_prefix(self): out = _tokenize_suffix("Show.S01E01.English", "Show.S01E01") assert out == ["english"] def test_suffix_falls_back_when_no_prefix(self): # filename doesn't start with episode_stem → full tokenize. out = _tokenize_suffix("Other.srt", "Show.S01E01") assert "other" in out def test_suffix_falls_back_when_suffix_is_empty(self): # Suffix would tokenize to nothing → fall back to full stem. out = _tokenize_suffix("Show.S01E01", "Show.S01E01") # full tokenize of "Show.S01E01" → ['show', 's01e01'] assert out == ["show", "s01e01"] # --------------------------------------------------------------------------- # # _count_entries # # --------------------------------------------------------------------------- # class TestCountEntries: def test_last_cue_number(self, tmp_path): srt = tmp_path / "x.srt" srt.write_text( "1\n00:00:01,000 --> 00:00:02,000\nHello\n\n" "2\n00:00:03,000 --> 00:00:04,000\nWorld\n\n" "42\n00:00:05,000 --> 00:00:06,000\nLast\n", encoding="utf-8", ) assert _count_entries(srt) == 42 def test_missing_file_returns_zero(self, tmp_path): assert _count_entries(tmp_path / "nope.srt") == 0 def test_empty_file_returns_zero(self, tmp_path): f = tmp_path / "x.srt" f.write_text("") assert _count_entries(f) == 0 # --------------------------------------------------------------------------- # # Embedded scan # # --------------------------------------------------------------------------- # class TestEmbedded: def test_missing_file_returns_empty(self, identifier, tmp_path): assert identifier._scan_embedded(tmp_path / "missing.mkv") == [] def test_ffprobe_failure_returns_empty(self, identifier, tmp_path): video = tmp_path / "v.mkv" video.write_bytes(b"") with patch( "alfred.domain.subtitles.services.identifier.subprocess.run", side_effect=FileNotFoundError("no ffprobe"), ): assert identifier._scan_embedded(video) == [] def test_disposition_to_subtitle_type(self, identifier, tmp_path): video = tmp_path / "v.mkv" video.write_bytes(b"") fake_output = ( '{"streams":[' '{"tags":{"language":"eng"},"disposition":{"hearing_impaired":1}},' '{"tags":{"language":"fre"},"disposition":{"forced":1}},' '{"tags":{"language":"spa"},"disposition":{}},' '{"tags":{},"disposition":{}}' "]}" ) class FakeResult: stdout = fake_output with patch( "alfred.domain.subtitles.services.identifier.subprocess.run", return_value=FakeResult(), ): tracks = identifier._scan_embedded(video) assert len(tracks) == 4 assert tracks[0].subtitle_type == SubtitleType.SDH assert tracks[0].language.code == "eng" assert tracks[1].subtitle_type == SubtitleType.FORCED assert tracks[1].language.code == "fre" assert tracks[2].subtitle_type == SubtitleType.STANDARD assert tracks[3].language is None # no language tag for t in tracks: assert t.is_embedded is True # --------------------------------------------------------------------------- # # Adjacent / Flat / Episode subfolder discovery # # --------------------------------------------------------------------------- # class TestAdjacent: def test_finds_only_known_subtitle_extensions(self, identifier, tmp_path): video = tmp_path / "Show.S01E01.mkv" video.write_bytes(b"") (tmp_path / "Show.S01E01.English.srt").write_text("") (tmp_path / "Show.S01E01.French.ass").write_text("") # Non-subtitle files must be ignored. (tmp_path / "Show.S01E01.nfo").write_text("") (tmp_path / "cover.jpg").write_bytes(b"") result = identifier._find_adjacent(video) names = sorted(p.name for p in result) assert names == ["Show.S01E01.English.srt", "Show.S01E01.French.ass"] def test_excludes_the_video_file(self, identifier, tmp_path): # An adjacent file with the *same stem* as the video would be the # video itself (e.g. a .mkv named like the .srt). Not expected here, # but the implementation guards via `p.stem != video.stem`. video = tmp_path / "Show.S01E01.mkv" video.write_bytes(b"") (tmp_path / "Show.S01E01.srt").write_text("") # same stem # Same stem → excluded; only subs with a different stem are returned. assert identifier._find_adjacent(video) == [] class TestFlat: def test_subs_folder_adjacent(self, identifier, tmp_path): video = tmp_path / "Show.S01E01.mkv" video.write_bytes(b"") subs = tmp_path / "Subs" subs.mkdir() (subs / "English.srt").write_text("") result = identifier._find_flat(video, "Subs") assert len(result) == 1 def test_subs_folder_at_release_root_fallback(self, identifier, tmp_path): season = tmp_path / "Season.1" season.mkdir() video = season / "Show.S01E01.mkv" video.write_bytes(b"") subs = tmp_path / "Subs" subs.mkdir() (subs / "English.srt").write_text("") result = identifier._find_flat(video, "Subs") assert len(result) == 1 def test_no_subs_folder_returns_empty(self, identifier, tmp_path): video = tmp_path / "v.mkv" video.write_bytes(b"") assert identifier._find_flat(video, "Subs") == [] class TestEpisodeSubfolder: def test_found_and_stem_returned(self, identifier, tmp_path): video = tmp_path / "Show.S01E01.mkv" video.write_bytes(b"") subs = tmp_path / "Subs" / "Show.S01E01" subs.mkdir(parents=True) (subs / "2_English.srt").write_text("") files, stem = identifier._find_episode_subfolder(video, "Subs") assert len(files) == 1 assert stem == "Show.S01E01" def test_not_found(self, identifier, tmp_path): video = tmp_path / "Show.S01E01.mkv" video.write_bytes(b"") files, stem = identifier._find_episode_subfolder(video, "Subs") assert files == [] assert stem == "Show.S01E01" # --------------------------------------------------------------------------- # # Classification # # --------------------------------------------------------------------------- # class TestClassify: def test_classifies_language_and_format(self, identifier, tmp_path): f = tmp_path / "Show.S01E01.English.srt" f.write_text("1\n00:00:01,000 --> 00:00:02,000\nHi\n") track = identifier._classify_single(f) assert track.language.code == "eng" assert track.format.id == "srt" assert track.confidence > 0 assert track.is_embedded is False def test_classifies_type_token(self, identifier, tmp_path): f = tmp_path / "Show.S01E01.English.sdh.srt" f.write_text("") track = identifier._classify_single(f) assert track.subtitle_type == SubtitleType.SDH def test_unknown_tokens_lower_confidence(self, identifier, tmp_path): f = tmp_path / "Show.S01E01.gibberish.srt" f.write_text("") track = identifier._classify_single(f) # No lang/type recognized → confidence is 0 or very low. assert track.language is None assert track.confidence < 0.5 def test_episode_stem_prefix_stripped(self, identifier, tmp_path): f = tmp_path / "Show.S01E01.English.srt" f.write_text("") track = identifier._classify_single(f, episode_stem="Show.S01E01") # Only "english" remains as meaningful token → confidence == 1.0 assert track.language.code == "eng" assert track.confidence == 1.0 # --------------------------------------------------------------------------- # # size_and_count post-processing # # --------------------------------------------------------------------------- # class TestSizeDisambiguation: @pytest.fixture def pattern_size(self): return _pattern( ScanStrategy.FLAT, root_folder="Subs", detection=TypeDetectionMethod.SIZE_AND_COUNT, ) def _track(self, lang_code: str, entries: int) -> SubtitleCandidate: return SubtitleCandidate( language=SubtitleLanguage(code=lang_code, tokens=[lang_code]), format=None, subtitle_type=SubtitleType.UNKNOWN, entry_count=entries, ) def test_two_tracks_split_into_standard_and_sdh(self, identifier, pattern_size): t1 = self._track("eng", 800) t2 = self._track("eng", 1200) result = identifier._disambiguate_by_size([t1, t2]) # Sorted ascending → smaller=standard, larger=sdh types = sorted([t.subtitle_type for t in result], key=lambda s: s.value) assert SubtitleType.STANDARD in types assert SubtitleType.SDH in types def test_three_tracks_split_into_forced_standard_sdh(self, identifier): t_small = self._track("eng", 50) t_mid = self._track("eng", 600) t_large = self._track("eng", 1200) result = identifier._disambiguate_by_size([t_large, t_small, t_mid]) # Sorted ascending → smallest=forced, middle=standard, largest=sdh by_count = sorted(result, key=lambda t: t.entry_count) assert by_count[0].subtitle_type == SubtitleType.FORCED assert by_count[1].subtitle_type == SubtitleType.STANDARD assert by_count[2].subtitle_type == SubtitleType.SDH def test_single_track_untouched(self, identifier): t = self._track("eng", 800) result = identifier._disambiguate_by_size([t]) assert result == [t] assert t.subtitle_type == SubtitleType.UNKNOWN def test_different_languages_grouped_independently(self, identifier): # Two eng + one fra → fra is alone, eng pair gets split. eng_small = self._track("eng", 800) eng_large = self._track("eng", 1500) fra_solo = self._track("fra", 1000) result = identifier._disambiguate_by_size([eng_small, eng_large, fra_solo]) # fra solo stays UNKNOWN assert fra_solo.subtitle_type == SubtitleType.UNKNOWN # eng pair gets STANDARD + SDH assert eng_small.subtitle_type == SubtitleType.STANDARD assert eng_large.subtitle_type == SubtitleType.SDH