From e6ee7008259878ac80cc54be8ed71c1c0b944433 Mon Sep 17 00:00:00 2001 From: Francwa Date: Tue, 19 May 2026 14:52:24 +0200 Subject: [PATCH] refactor(subtitles): inject MediaProber/FilesystemScanner ports into domain services MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Domain services no longer call subprocess or pathlib directly. Introduces two Protocol ports in domain/shared/ports/: MediaProber.list_subtitle_streams(video) -> list[SubtitleStreamInfo] FilesystemScanner.scan_dir / stat / read_text -> list[FileEntry] | ... Concrete adapters live in infrastructure/: FfprobeMediaProber (wraps subprocess + ffprobe + JSON) PathlibFilesystemScanner (wraps pathlib + os reads) SubtitleIdentifier and PatternDetector now take (kb, prober, scanner) at construction time. Their internals work over FileEntry snapshots and SubtitleStreamInfo records — no more ad-hoc Path.is_file/iterdir/stat or embedded subprocess.run loops. _count_entries now takes raw SRT text (returned by scanner.read_text) so SRT-only entry counting stays out of the FS layer. manage_subtitles use case instantiates the two adapters once and injects them into both services. Tests pass real adapters and patch `alfred.infrastructure.probe.ffprobe_prober.subprocess.run` for the ffprobe-failure cases. _classify_single tests build FileEntry via a small helper. Domain is now free of subprocess / direct filesystem reads in the subtitle pipeline. The only remaining I/O hooks are FilePath VO convenience methods (exists/is_file/is_dir) which stay as a deliberate affordance on the value object. --- .../filesystem/manage_subtitles.py | 22 ++- alfred/domain/shared/ports/__init__.py | 17 ++ .../domain/shared/ports/filesystem_scanner.py | 59 ++++++ alfred/domain/shared/ports/media_prober.py | 39 ++++ .../domain/subtitles/services/identifier.py | 183 +++++++----------- .../subtitles/services/pattern_detector.py | 130 ++++++------- alfred/infrastructure/filesystem/scanner.py | 66 +++++++ alfred/infrastructure/probe/__init__.py | 5 + alfred/infrastructure/probe/ffprobe_prober.py | 65 +++++++ tests/domain/test_subtitle_identifier.py | 51 +++-- .../domain/test_subtitle_pattern_detector.py | 6 +- 11 files changed, 432 insertions(+), 211 deletions(-) create mode 100644 alfred/domain/shared/ports/__init__.py create mode 100644 alfred/domain/shared/ports/filesystem_scanner.py create mode 100644 alfred/domain/shared/ports/media_prober.py create mode 100644 alfred/infrastructure/filesystem/scanner.py create mode 100644 alfred/infrastructure/probe/__init__.py create mode 100644 alfred/infrastructure/probe/ffprobe_prober.py diff --git a/alfred/application/filesystem/manage_subtitles.py b/alfred/application/filesystem/manage_subtitles.py index f37f923..66a68b9 100644 --- a/alfred/application/filesystem/manage_subtitles.py +++ b/alfred/application/filesystem/manage_subtitles.py @@ -5,8 +5,6 @@ from pathlib import Path from alfred.domain.shared.value_objects import ImdbId from alfred.domain.subtitles.entities import SubtitleCandidate -from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase -from alfred.infrastructure.knowledge.subtitles.loader import KnowledgeLoader from alfred.domain.subtitles.services.identifier import SubtitleIdentifier from alfred.domain.subtitles.services.matcher import SubtitleMatcher from alfred.domain.subtitles.services.pattern_detector import PatternDetector @@ -17,7 +15,11 @@ from alfred.domain.subtitles.services.placer import ( ) from alfred.domain.subtitles.services.utils import available_subtitles from alfred.domain.subtitles.value_objects import ScanStrategy +from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner +from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase +from alfred.infrastructure.knowledge.subtitles.loader import KnowledgeLoader from alfred.infrastructure.persistence.context import get_memory +from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber from alfred.infrastructure.subtitle.metadata_store import SubtitleMetadataStore from alfred.infrastructure.subtitle.rule_repository import RuleSetRepository @@ -91,13 +93,21 @@ class ManageSubtitlesUseCase: ) kb = SubtitleKnowledgeBase(KnowledgeLoader()) + prober = FfprobeMediaProber() + scanner = PathlibFilesystemScanner() library_root = _infer_library_root(dest_path, media_type) store = SubtitleMetadataStore(library_root) repo = RuleSetRepository(library_root) # --- Pattern resolution --- pattern = self._resolve_pattern( - kb, store, source_path, confirmed_pattern_id, release_group + kb, + prober, + scanner, + store, + source_path, + confirmed_pattern_id, + release_group, ) if pattern is None: return ManageSubtitlesResponse( @@ -108,7 +118,7 @@ class ManageSubtitlesUseCase: # --- Identify --- media_id = _to_imdb_id(imdb_id) - identifier = SubtitleIdentifier(kb) + identifier = SubtitleIdentifier(kb, prober, scanner) metadata = identifier.identify( video_path=source_path, pattern=pattern, @@ -228,6 +238,8 @@ class ManageSubtitlesUseCase: def _resolve_pattern( self, kb: SubtitleKnowledgeBase, + prober: FfprobeMediaProber, + scanner: PathlibFilesystemScanner, store: SubtitleMetadataStore, source_path: Path, confirmed_pattern_id: str | None, @@ -250,7 +262,7 @@ class ManageSubtitlesUseCase: # 3. Auto-detect release_root = source_path.parent - detector = PatternDetector(kb) + detector = PatternDetector(kb, prober, scanner) result = detector.detect(release_root, source_path) if result["detected"] and result["confidence"] >= 0.6: diff --git a/alfred/domain/shared/ports/__init__.py b/alfred/domain/shared/ports/__init__.py new file mode 100644 index 0000000..ee7b70c --- /dev/null +++ b/alfred/domain/shared/ports/__init__.py @@ -0,0 +1,17 @@ +"""Ports — Protocol interfaces the domain depends on. + +Adapters live in ``alfred/infrastructure/`` and implement these protocols. +Domain code never imports infrastructure; it accepts a port via constructor +injection and calls it. Tests can pass in-memory fakes that satisfy the +Protocol without going through real I/O. +""" + +from .filesystem_scanner import FileEntry, FilesystemScanner +from .media_prober import MediaProber, SubtitleStreamInfo + +__all__ = [ + "FileEntry", + "FilesystemScanner", + "MediaProber", + "SubtitleStreamInfo", +] diff --git a/alfred/domain/shared/ports/filesystem_scanner.py b/alfred/domain/shared/ports/filesystem_scanner.py new file mode 100644 index 0000000..9cadbdc --- /dev/null +++ b/alfred/domain/shared/ports/filesystem_scanner.py @@ -0,0 +1,59 @@ +"""FilesystemScanner port — abstracts filesystem inspection. + +The domain never calls ``Path.iterdir``, ``Path.is_file``, ``Path.stat`` or +``open()`` directly. It asks the scanner for a ``FileEntry`` snapshot and +reasons from there. One scan = one I/O round-trip; no callbacks back to disk. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Protocol + + +@dataclass(frozen=True) +class FileEntry: + """Frozen snapshot of one filesystem entry, taken at scan time. + + The entry carries enough metadata for the domain to classify and order + files without re-querying the OS. ``size_kb`` is ``None`` for directories + and for files whose size could not be read. + """ + + path: Path + is_file: bool + is_dir: bool + size_kb: float | None + + @property + def name(self) -> str: + return self.path.name + + @property + def stem(self) -> str: + return self.path.stem + + @property + def suffix(self) -> str: + return self.path.suffix + + +class FilesystemScanner(Protocol): + """Read-only filesystem inspection.""" + + def scan_dir(self, path: Path) -> list[FileEntry]: + """Return sorted entries directly inside ``path``. + + Returns an empty list when ``path`` is not a directory or is + unreadable. Adapters must not raise. + """ + ... + + def stat(self, path: Path) -> FileEntry | None: + """Stat a single path; ``None`` when it doesn't exist or is unreadable.""" + ... + + def read_text(self, path: Path, encoding: str = "utf-8") -> str | None: + """Read a text file in one go; ``None`` on any error.""" + ... diff --git a/alfred/domain/shared/ports/media_prober.py b/alfred/domain/shared/ports/media_prober.py new file mode 100644 index 0000000..d06df09 --- /dev/null +++ b/alfred/domain/shared/ports/media_prober.py @@ -0,0 +1,39 @@ +"""MediaProber port — abstracts media stream inspection (e.g. ffprobe). + +The adapter (typically wrapping ffprobe) maps low-level container metadata +into the small set of stream attributes the domain reasons about. Replacing +ffprobe with another tool only requires a new adapter — domain stays put. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Protocol + + +@dataclass(frozen=True) +class SubtitleStreamInfo: + """A single embedded subtitle stream, as seen by the prober. + + ``language`` is the raw language tag emitted by the container (typically + ISO 639-2 like ``"fre"``, ``"eng"``); may be empty/None when the stream + has no language tag. The domain resolves it to a canonical ``Language`` + via the knowledge base. + """ + + language: str | None + is_hearing_impaired: bool + is_forced: bool + + +class MediaProber(Protocol): + """Inspect a media file's stream metadata.""" + + def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]: + """Return all subtitle streams in ``video``. + + Returns an empty list when the file is missing, unreadable, or has + no subtitle streams. Adapters must not raise. + """ + ... diff --git a/alfred/domain/subtitles/services/identifier.py b/alfred/domain/subtitles/services/identifier.py index f529332..f5248c6 100644 --- a/alfred/domain/subtitles/services/identifier.py +++ b/alfred/domain/subtitles/services/identifier.py @@ -1,13 +1,12 @@ """SubtitleIdentifier — finds and classifies all subtitle tracks for a video file.""" -import json import logging import re -import subprocess from pathlib import Path from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase +from ...shared.ports import FilesystemScanner, MediaProber from ...shared.value_objects import ImdbId from ..entities import MediaSubtitleMetadata, SubtitleCandidate from ..value_objects import ScanStrategy, SubtitlePattern, SubtitleType @@ -38,17 +37,14 @@ def _tokenize_suffix(stem: str, episode_stem: str) -> list[str]: return _tokenize(stem) -def _count_entries(path: Path) -> int: - """Return the entry count of an SRT file by finding the last cue number.""" - try: - with open(path, encoding="utf-8", errors="replace") as f: - lines = f.read().splitlines() - for line in reversed(lines): - if line.strip().isdigit(): - return int(line.strip()) - return 0 - except Exception: - return 0 +def _count_entries(text: str | None) -> int | None: + """Return the entry count of an SRT body by finding the last cue number.""" + if text is None: + return None + for line in reversed(text.splitlines()): + if line.strip().isdigit(): + return int(line.strip()) + return 0 class SubtitleIdentifier: @@ -61,8 +57,15 @@ class SubtitleIdentifier: the caller (use case) decides whether to ask the user for clarification. """ - def __init__(self, kb: SubtitleKnowledgeBase): + def __init__( + self, + kb: SubtitleKnowledgeBase, + prober: MediaProber, + scanner: FilesystemScanner, + ): self.kb = kb + self.prober = prober + self.scanner = scanner def identify( self, @@ -89,52 +92,21 @@ class SubtitleIdentifier: return metadata # ------------------------------------------------------------------ - # Embedded tracks — ffprobe + # Embedded tracks — via MediaProber # ------------------------------------------------------------------ def _scan_embedded(self, video_path: Path) -> list[SubtitleCandidate]: - if not video_path.exists(): - return [] - try: - result = subprocess.run( - [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_streams", - "-select_streams", - "s", - str(video_path), - ], - capture_output=True, - text=True, - timeout=30, - check=False, - ) - data = json.loads(result.stdout) - except ( - subprocess.TimeoutExpired, - json.JSONDecodeError, - FileNotFoundError, - ) as e: - logger.debug( - f"SubtitleIdentifier: ffprobe failed for {video_path.name}: {e}" - ) - return [] + streams = self.prober.list_subtitle_streams(video_path) tracks = [] - for stream in data.get("streams", []): - tags = stream.get("tags", {}) - disposition = stream.get("disposition", {}) - lang_code = tags.get("language", "") + for stream in streams: + lang = ( + self.kb.language_for_token(stream.language) if stream.language else None + ) - lang = self.kb.language_for_token(lang_code) if lang_code else None - - if disposition.get("hearing_impaired"): + if stream.is_hearing_impaired: stype = SubtitleType.SDH - elif disposition.get("forced"): + elif stream.is_forced: stype = SubtitleType.FORCED else: stype = SubtitleType.STANDARD @@ -145,7 +117,7 @@ class SubtitleIdentifier: format=None, subtitle_type=stype, is_embedded=True, - raw_tokens=[lang_code] if lang_code else [], + raw_tokens=[stream.language] if stream.language else [], ) ) @@ -177,57 +149,47 @@ class SubtitleIdentifier: return self._classify_files(candidates, pattern, episode_stem=episode_stem) - def _find_adjacent(self, video_path: Path) -> list[Path]: + def _find_adjacent(self, video_path: Path) -> list: + known = self.kb.known_extensions() return [ - p - for p in sorted(video_path.parent.iterdir()) - if p.is_file() - and p.suffix.lower() in self.kb.known_extensions() - and p.stem != video_path.stem + entry + for entry in self.scanner.scan_dir(video_path.parent) + if entry.is_file + and entry.suffix.lower() in known + and entry.stem != video_path.stem ] - def _find_flat(self, video_path: Path, root_folder: str) -> list[Path]: - subs_dir = video_path.parent / root_folder - if not subs_dir.is_dir(): - # Also look at release root (one level up) - subs_dir = video_path.parent.parent / root_folder - if not subs_dir.is_dir(): - return [] - return [ - p - for p in sorted(subs_dir.iterdir()) - if p.is_file() and p.suffix.lower() in self.kb.known_extensions() - ] + def _find_flat(self, video_path: Path, root_folder: str) -> list: + known = self.kb.known_extensions() + # Adjacent first, then release root (one level up) + for subs_dir in ( + video_path.parent / root_folder, + video_path.parent.parent / root_folder, + ): + entries = self.scanner.scan_dir(subs_dir) + if entries: + return [ + e for e in entries if e.is_file and e.suffix.lower() in known + ] + return [] def _find_episode_subfolder( self, video_path: Path, root_folder: str - ) -> tuple[list[Path], str]: - """ - Look for Subs/{episode_stem}/*.srt - - Checks two locations: - 1. Adjacent to the video: video_path.parent / root_folder / video_path.stem - 2. Release root (one level up): video_path.parent.parent / root_folder / video_path.stem - - Returns (files, episode_stem) so the classifier can strip the prefix. - """ + ) -> tuple[list, str]: + """Look for Subs/{episode_stem}/*.srt — adjacent or one level up.""" episode_stem = video_path.stem - candidates_dirs = [ + known = self.kb.known_extensions() + for subs_dir in ( video_path.parent / root_folder / episode_stem, video_path.parent.parent / root_folder / episode_stem, - ] - for subs_dir in candidates_dirs: - if subs_dir.is_dir(): - files = [ - p - for p in sorted(subs_dir.iterdir()) - if p.is_file() and p.suffix.lower() in self.kb.known_extensions() - ] - if files: - logger.debug( - f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}" - ) - return files, episode_stem + ): + entries = self.scanner.scan_dir(subs_dir) + files = [e for e in entries if e.is_file and e.suffix.lower() in known] + if files: + logger.debug( + f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}" + ) + return files, episode_stem return [], episode_stem # ------------------------------------------------------------------ @@ -236,14 +198,13 @@ class SubtitleIdentifier: def _classify_files( self, - paths: list[Path], + entries: list, pattern: SubtitlePattern, episode_stem: str | None = None, ) -> list[SubtitleCandidate]: - tracks = [] - for path in paths: - track = self._classify_single(path, episode_stem=episode_stem) - tracks.append(track) + tracks = [ + self._classify_single(entry, episode_stem=episode_stem) for entry in entries + ] # Post-process: if multiple tracks share same language but type is ambiguous, # apply size_and_count disambiguation @@ -253,13 +214,13 @@ class SubtitleIdentifier: return tracks def _classify_single( - self, path: Path, episode_stem: str | None = None + self, entry, episode_stem: str | None = None ) -> SubtitleCandidate: - fmt = self.kb.format_for_extension(path.suffix) + fmt = self.kb.format_for_extension(entry.suffix) tokens = ( - _tokenize_suffix(path.stem, episode_stem) + _tokenize_suffix(entry.stem, episode_stem) if episode_stem - else _tokenize(path.stem) + else _tokenize(entry.stem) ) language = None @@ -285,19 +246,21 @@ class SubtitleIdentifier: if unknown_tokens: logger.debug( - f"SubtitleIdentifier: unknown tokens in '{path.name}': {unknown_tokens}" + f"SubtitleIdentifier: unknown tokens in '{entry.name}': {unknown_tokens}" ) - size_kb = path.stat().st_size / 1024 if path.exists() else None - entry_count = _count_entries(path) if path.exists() else None + # Entry count: only meaningful for SRT files; read text on demand. + entry_count: int | None = None + if entry.suffix.lower() == ".srt": + entry_count = _count_entries(self.scanner.read_text(entry.path)) return SubtitleCandidate( language=language, format=fmt, subtitle_type=subtitle_type, is_embedded=False, - file_path=path, - file_size_kb=size_kb, + file_path=entry.path, + file_size_kb=entry.size_kb, entry_count=entry_count, confidence=confidence, raw_tokens=tokens, diff --git a/alfred/domain/subtitles/services/pattern_detector.py b/alfred/domain/subtitles/services/pattern_detector.py index cf96f0f..774f38b 100644 --- a/alfred/domain/subtitles/services/pattern_detector.py +++ b/alfred/domain/subtitles/services/pattern_detector.py @@ -1,12 +1,11 @@ """PatternDetector — discovers the subtitle structure of a release folder.""" -import json import logging -import subprocess from pathlib import Path from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase +from ...shared.ports import FilesystemScanner, MediaProber from ..value_objects import ScanStrategy, SubtitlePattern logger = logging.getLogger(__name__) @@ -21,8 +20,15 @@ class PatternDetector: a release follows. The result is proposed to the user for confirmation. """ - def __init__(self, kb: SubtitleKnowledgeBase): + def __init__( + self, + kb: SubtitleKnowledgeBase, + prober: MediaProber, + scanner: FilesystemScanner, + ): self.kb = kb + self.prober = prober + self.scanner = scanner def detect(self, release_root: Path, sample_video: Path) -> dict: """ @@ -46,29 +52,7 @@ class PatternDetector: } def _has_embedded_subtitles(self, video_path: Path) -> bool: - """Run ffprobe to check whether the video has embedded subtitle streams.""" - try: - result = subprocess.run( - [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_streams", - "-select_streams", - "s", - str(video_path), - ], - capture_output=True, - text=True, - timeout=30, - check=False, - ) - data = json.loads(result.stdout) - return len(data.get("streams", [])) > 0 - except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError): - return False + return len(self.prober.list_subtitle_streams(video_path)) > 0 def _inspect(self, release_root: Path, sample_video: Path) -> dict: """Gather structural facts about the release.""" @@ -85,61 +69,59 @@ class PatternDetector: } # Check for Subs/ folder — adjacent or at release root - for subs_candidate in [ + for subs_candidate in ( sample_video.parent / "Subs", release_root / "Subs", - ]: - if subs_candidate.is_dir(): - findings["has_subs_folder"] = True - findings["subs_root"] = str(subs_candidate) + ): + children = self.scanner.scan_dir(subs_candidate) + if not children: + continue - # Is it flat or episode_subfolder? - children = list(subs_candidate.iterdir()) - sub_files = [ - c - for c in children - if c.is_file() and c.suffix.lower() in known_exts + findings["has_subs_folder"] = True + findings["subs_root"] = str(subs_candidate) + + # Is it flat or episode_subfolder? + sub_files = [ + c for c in children if c.is_file and c.suffix.lower() in known_exts + ] + sub_dirs = [c for c in children if c.is_dir] + + if sub_dirs and not sub_files: + findings["subs_strategy"] = "episode_subfolder" + # Count files in a sample subfolder + sample_files = [ + f + for f in self.scanner.scan_dir(sub_dirs[0].path) + if f.is_file and f.suffix.lower() in known_exts ] - sub_dirs = [c for c in children if c.is_dir()] - - if sub_dirs and not sub_files: - findings["subs_strategy"] = "episode_subfolder" - # Count files in a sample subfolder - sample_sub = sub_dirs[0] - sample_files = [ - f - for f in sample_sub.iterdir() - if f.is_file() and f.suffix.lower() in known_exts - ] - findings["files_per_episode"] = len(sample_files) - # Check naming conventions - for f in sample_files: - stem = f.stem - parts = stem.split("_") - if parts[0].isdigit(): - findings["has_numeric_prefix"] = True - if any( - self.kb.is_known_lang_token(t.lower()) - for t in stem.replace("_", ".").split(".") - ): - findings["has_lang_tokens"] = True - else: - findings["subs_strategy"] = "flat" - findings["files_per_episode"] = len(sub_files) - for f in sub_files: - if any( - self.kb.is_known_lang_token(t.lower()) - for t in f.stem.replace("_", ".").split(".") - ): - findings["has_lang_tokens"] = True - break + findings["files_per_episode"] = len(sample_files) + # Check naming conventions + for f in sample_files: + parts = f.stem.split("_") + if parts[0].isdigit(): + findings["has_numeric_prefix"] = True + if any( + self.kb.is_known_lang_token(t.lower()) + for t in f.stem.replace("_", ".").split(".") + ): + findings["has_lang_tokens"] = True + else: + findings["subs_strategy"] = "flat" + findings["files_per_episode"] = len(sub_files) + for f in sub_files: + if any( + self.kb.is_known_lang_token(t.lower()) + for t in f.stem.replace("_", ".").split(".") + ): + findings["has_lang_tokens"] = True + break # Check adjacent subs (next to the video) if not findings["has_subs_folder"]: adjacent = [ - p - for p in sample_video.parent.iterdir() - if p.is_file() and p.suffix.lower() in known_exts + e + for e in self.scanner.scan_dir(sample_video.parent) + if e.is_file and e.suffix.lower() in known_exts ] if adjacent: findings["adjacent_subs"] = True @@ -222,6 +204,6 @@ class PatternDetector: parts.append("no external subtitle files found") if findings.get("has_embedded"): - parts.append("embedded tracks detected (ffprobe)") + parts.append("embedded tracks detected") return " — ".join(parts) if parts else "nothing found" diff --git a/alfred/infrastructure/filesystem/scanner.py b/alfred/infrastructure/filesystem/scanner.py new file mode 100644 index 0000000..e424b5e --- /dev/null +++ b/alfred/infrastructure/filesystem/scanner.py @@ -0,0 +1,66 @@ +"""PathlibFilesystemScanner — FilesystemScanner adapter backed by pathlib.""" + +from __future__ import annotations + +import logging +from pathlib import Path + +from alfred.domain.shared.ports import FileEntry + +logger = logging.getLogger(__name__) + + +class PathlibFilesystemScanner: + """Read-only filesystem scanner using ``pathlib``. + + Implements :class:`alfred.domain.shared.ports.FilesystemScanner` + structurally. Never raises — failures are logged and surfaced as + empty results. + """ + + def scan_dir(self, path: Path) -> list[FileEntry]: + try: + if not path.is_dir(): + return [] + children = sorted(path.iterdir()) + except OSError as e: + logger.debug(f"PathlibFilesystemScanner: scan_dir failed for {path}: {e}") + return [] + + entries: list[FileEntry] = [] + for child in children: + entry = self._make_entry(child) + if entry is not None: + entries.append(entry) + return entries + + def stat(self, path: Path) -> FileEntry | None: + return self._make_entry(path) + + def read_text(self, path: Path, encoding: str = "utf-8") -> str | None: + try: + with open(path, encoding=encoding, errors="replace") as f: + return f.read() + except OSError as e: + logger.debug(f"PathlibFilesystemScanner: read_text failed for {path}: {e}") + return None + + # ------------------------------------------------------------------ + + def _make_entry(self, path: Path) -> FileEntry | None: + try: + is_file = path.is_file() + is_dir = path.is_dir() + except OSError: + return None + if not (is_file or is_dir): + return None + + size_kb: float | None = None + if is_file: + try: + size_kb = path.stat().st_size / 1024 + except OSError: + size_kb = None + + return FileEntry(path=path, is_file=is_file, is_dir=is_dir, size_kb=size_kb) diff --git a/alfred/infrastructure/probe/__init__.py b/alfred/infrastructure/probe/__init__.py new file mode 100644 index 0000000..b487a33 --- /dev/null +++ b/alfred/infrastructure/probe/__init__.py @@ -0,0 +1,5 @@ +"""Media probing adapters — concrete implementations of MediaProber.""" + +from .ffprobe_prober import FfprobeMediaProber + +__all__ = ["FfprobeMediaProber"] diff --git a/alfred/infrastructure/probe/ffprobe_prober.py b/alfred/infrastructure/probe/ffprobe_prober.py new file mode 100644 index 0000000..5ae4017 --- /dev/null +++ b/alfred/infrastructure/probe/ffprobe_prober.py @@ -0,0 +1,65 @@ +"""FfprobeMediaProber — MediaProber adapter backed by the ffprobe CLI.""" + +from __future__ import annotations + +import json +import logging +import subprocess +from pathlib import Path + +from alfred.domain.shared.ports import SubtitleStreamInfo + +logger = logging.getLogger(__name__) + +_FFPROBE_TIMEOUT_SECONDS = 30 + + +class FfprobeMediaProber: + """Inspect media files by shelling out to ``ffprobe``. + + Implements :class:`alfred.domain.shared.ports.MediaProber` structurally. + Never raises — failures are logged and surfaced as empty results. + """ + + def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]: + if not video.exists(): + return [] + try: + result = subprocess.run( + [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_streams", + "-select_streams", + "s", + str(video), + ], + capture_output=True, + text=True, + timeout=_FFPROBE_TIMEOUT_SECONDS, + check=False, + ) + data = json.loads(result.stdout) + except ( + subprocess.TimeoutExpired, + json.JSONDecodeError, + FileNotFoundError, + ) as e: + logger.debug(f"FfprobeMediaProber: ffprobe failed for {video.name}: {e}") + return [] + + streams: list[SubtitleStreamInfo] = [] + for stream in data.get("streams", []): + tags = stream.get("tags", {}) or {} + disposition = stream.get("disposition", {}) or {} + streams.append( + SubtitleStreamInfo( + language=tags.get("language") or None, + is_hearing_impaired=bool(disposition.get("hearing_impaired")), + is_forced=bool(disposition.get("forced")), + ) + ) + return streams diff --git a/tests/domain/test_subtitle_identifier.py b/tests/domain/test_subtitle_identifier.py index d99a1e2..36251a7 100644 --- a/tests/domain/test_subtitle_identifier.py +++ b/tests/domain/test_subtitle_identifier.py @@ -22,8 +22,8 @@ from unittest.mock import patch import pytest +from alfred.domain.shared.ports import FileEntry from alfred.domain.subtitles.entities import SubtitleCandidate -from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase from alfred.domain.subtitles.services.identifier import ( SubtitleIdentifier, _count_entries, @@ -37,6 +37,19 @@ from alfred.domain.subtitles.value_objects import ( SubtitleType, TypeDetectionMethod, ) +from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner +from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase +from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber + + +def _file_entry(path) -> FileEntry: + """Helper: build a FileEntry from a real tmp_path Path.""" + return FileEntry( + path=path, + is_file=path.is_file(), + is_dir=path.is_dir(), + size_kb=(path.stat().st_size / 1024) if path.is_file() else None, + ) @pytest.fixture(scope="module") @@ -46,7 +59,7 @@ def kb(): @pytest.fixture def identifier(kb): - return SubtitleIdentifier(kb) + return SubtitleIdentifier(kb, FfprobeMediaProber(), PathlibFilesystemScanner()) def _pattern( @@ -103,23 +116,19 @@ class TestTokenize: class TestCountEntries: - def test_last_cue_number(self, tmp_path): - srt = tmp_path / "x.srt" - srt.write_text( + def test_last_cue_number(self): + text = ( "1\n00:00:01,000 --> 00:00:02,000\nHello\n\n" "2\n00:00:03,000 --> 00:00:04,000\nWorld\n\n" - "42\n00:00:05,000 --> 00:00:06,000\nLast\n", - encoding="utf-8", + "42\n00:00:05,000 --> 00:00:06,000\nLast\n" ) - assert _count_entries(srt) == 42 + assert _count_entries(text) == 42 - def test_missing_file_returns_zero(self, tmp_path): - assert _count_entries(tmp_path / "nope.srt") == 0 + def test_missing_file_returns_none(self): + assert _count_entries(None) is None - def test_empty_file_returns_zero(self, tmp_path): - f = tmp_path / "x.srt" - f.write_text("") - assert _count_entries(f) == 0 + def test_empty_file_returns_zero(self): + assert _count_entries("") == 0 # --------------------------------------------------------------------------- # @@ -135,7 +144,7 @@ class TestEmbedded: video = tmp_path / "v.mkv" video.write_bytes(b"") with patch( - "alfred.domain.subtitles.services.identifier.subprocess.run", + "alfred.infrastructure.probe.ffprobe_prober.subprocess.run", side_effect=FileNotFoundError("no ffprobe"), ): assert identifier._scan_embedded(video) == [] @@ -156,7 +165,7 @@ class TestEmbedded: stdout = fake_output with patch( - "alfred.domain.subtitles.services.identifier.subprocess.run", + "alfred.infrastructure.probe.ffprobe_prober.subprocess.run", return_value=FakeResult(), ): tracks = identifier._scan_embedded(video) @@ -256,7 +265,7 @@ class TestClassify: def test_classifies_language_and_format(self, identifier, tmp_path): f = tmp_path / "Show.S01E01.English.srt" f.write_text("1\n00:00:01,000 --> 00:00:02,000\nHi\n") - track = identifier._classify_single(f) + track = identifier._classify_single(_file_entry(f)) assert track.language.code == "eng" assert track.format.id == "srt" assert track.confidence > 0 @@ -265,13 +274,13 @@ class TestClassify: def test_classifies_type_token(self, identifier, tmp_path): f = tmp_path / "Show.S01E01.English.sdh.srt" f.write_text("") - track = identifier._classify_single(f) + track = identifier._classify_single(_file_entry(f)) assert track.subtitle_type == SubtitleType.SDH def test_unknown_tokens_lower_confidence(self, identifier, tmp_path): f = tmp_path / "Show.S01E01.gibberish.srt" f.write_text("") - track = identifier._classify_single(f) + track = identifier._classify_single(_file_entry(f)) # No lang/type recognized → confidence is 0 or very low. assert track.language is None assert track.confidence < 0.5 @@ -279,7 +288,9 @@ class TestClassify: def test_episode_stem_prefix_stripped(self, identifier, tmp_path): f = tmp_path / "Show.S01E01.English.srt" f.write_text("") - track = identifier._classify_single(f, episode_stem="Show.S01E01") + track = identifier._classify_single( + _file_entry(f), episode_stem="Show.S01E01" + ) # Only "english" remains as meaningful token → confidence == 1.0 assert track.language.code == "eng" assert track.confidence == 1.0 diff --git a/tests/domain/test_subtitle_pattern_detector.py b/tests/domain/test_subtitle_pattern_detector.py index 57c8992..3afc5f1 100644 --- a/tests/domain/test_subtitle_pattern_detector.py +++ b/tests/domain/test_subtitle_pattern_detector.py @@ -25,8 +25,10 @@ from unittest.mock import patch import pytest -from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase from alfred.domain.subtitles.services.pattern_detector import PatternDetector +from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner +from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase +from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber @pytest.fixture(scope="module") @@ -36,7 +38,7 @@ def kb(): @pytest.fixture def detector(kb): - return PatternDetector(kb) + return PatternDetector(kb, FfprobeMediaProber(), PathlibFilesystemScanner()) def _make_video(folder: Path, name: str = "Show.S01E01.mkv") -> Path: