diff --git a/CHANGELOG.md b/CHANGELOG.md index d46a86a..bcc0ff1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,77 @@ callers). ## [Unreleased] +### Internal + +- **Moved `detect_media_type` and `enrich_from_probe` from + `alfred.application.filesystem` to `alfred.application.release`**. + They are inspection-pipeline helpers — their natural home is next to + `inspect_release`, not next to the filesystem use cases. The move + also eliminates a circular-import workaround in + `resolve_destination.py`: `inspect_release` can now be imported at + module top instead of lazily inside `_resolve_parsed`. Public + surface is unchanged for callers that imported the helpers from + their full module paths (the only call sites — `inspect.py`, two + tests, one testing script — were updated in this commit). + +### Added + +- **`resolve_*_destination` use cases now consume `inspect_release`**. + `resolve_episode_destination` and `resolve_movie_destination` reuse + their existing `source_file` parameter as the inspection target; + `resolve_season_destination` and `resolve_series_destination` gain + a new **optional** `source_path` parameter (also threaded through + the tool wrappers and YAML specs). When the path exists, ffprobe + data fills tokens missing from the release name (e.g. quality) and + refreshes `tech_string`, so the destination folder / file names + end up more accurate. When the path is missing or absent (back-compat + callers), the use cases fall back to parse-only — same behavior as + before. + +### Fixed + +- **`enrich_from_probe` now refreshes `tech_string`** after filling + `quality` / `source` / `codec`. Previously the field stayed at its + parser-time value, so filename builders saw stale tech tokens even + after a successful probe. New `TestTechString` class in + `tests/application/test_enrich_from_probe.py` locks the behavior. + +### Added + +- **`inspect_release` orchestrator + `InspectedResult` VO** + (`alfred/application/release/inspect.py`). Single composition of the + four inspection layers: `parse_release` → `detect_media_type` (patches + `parsed.media_type`) → `find_main_video` (top-level scan) → + `prober.probe` + `enrich_from_probe` when a video exists and the + refined media type isn't in `{"unknown", "other"}`. Returns a frozen + `InspectedResult(parsed, report, source_path, main_video, media_info, + probe_used)` that downstream callers consume directly instead of + rebuilding the same chain. `kb` and `prober` are injected — no + module-level singletons. Never raises. + +### Changed + +- **`analyze_release` tool now delegates to `inspect_release`** — same + output shape, plus two new fields: `confidence` (0–100) and `road` + (`"easy"` / `"shitty"` / `"path_of_pain"`) surfaced from the parser's + `ParseReport`. The tool spec (`specs/analyze_release.yaml`) documents + both fields so the LLM can route releases by confidence. + +- **`MediaProber` port now covers full media probing**: added + `probe(video) -> MediaInfo | None` alongside the existing + `list_subtitle_streams`. `FfprobeMediaProber` (in + `alfred/infrastructure/probe/`) implements both methods and is now + the single adapter shelling out to `ffprobe`. The standalone + `alfred/infrastructure/filesystem/ffprobe.py` module was removed — + all callers (tools, testing scripts) instantiate + `FfprobeMediaProber` instead. Unblocks the upcoming + `inspect_release` orchestrator, which depends on the port. + +### Removed + +- `alfred/infrastructure/filesystem/ffprobe.py` (folded into the + `FfprobeMediaProber` adapter). + --- ## [2026-05-20] — Release parser confidence scoring + exclusion diff --git a/alfred/agent/tools/filesystem.py b/alfred/agent/tools/filesystem.py index 3f73f17..5eb29a2 100644 --- a/alfred/agent/tools/filesystem.py +++ b/alfred/agent/tools/filesystem.py @@ -13,8 +13,6 @@ from alfred.application.filesystem import ( MoveMediaUseCase, SetFolderPathUseCase, ) -from alfred.application.filesystem.detect_media_type import detect_media_type -from alfred.application.filesystem.enrich_from_probe import enrich_from_probe from alfred.application.filesystem.resolve_destination import ( resolve_episode_destination as _resolve_episode_destination, ) @@ -28,10 +26,11 @@ from alfred.application.filesystem.resolve_destination import ( resolve_series_destination as _resolve_series_destination, ) from alfred.infrastructure.filesystem import FileManager, create_folder, move -from alfred.infrastructure.filesystem.ffprobe import probe -from alfred.infrastructure.filesystem.find_video import find_video_file from alfred.infrastructure.metadata import MetadataStore from alfred.infrastructure.persistence import get_memory +from alfred.infrastructure.probe import FfprobeMediaProber + +_PROBER = FfprobeMediaProber() _LEARNED_ROOT = Path(_alfred_pkg.__file__).parent.parent / "data" / "knowledge" @@ -57,10 +56,11 @@ def resolve_season_destination( tmdb_title: str, tmdb_year: int, confirmed_folder: str | None = None, + source_path: str | None = None, ) -> dict[str, Any]: """Thin tool wrapper — semantics live in alfred/agent/tools/specs/resolve_season_destination.yaml.""" return _resolve_season_destination( - release_name, tmdb_title, tmdb_year, confirmed_folder + release_name, tmdb_title, tmdb_year, confirmed_folder, source_path ).to_dict() @@ -100,10 +100,11 @@ def resolve_series_destination( tmdb_title: str, tmdb_year: int, confirmed_folder: str | None = None, + source_path: str | None = None, ) -> dict[str, Any]: """Thin tool wrapper — semantics live in alfred/agent/tools/specs/resolve_series_destination.yaml.""" return _resolve_series_destination( - release_name, tmdb_title, tmdb_year, confirmed_folder + release_name, tmdb_title, tmdb_year, confirmed_folder, source_path ).to_dict() @@ -191,21 +192,10 @@ def set_path_for_folder(folder_name: str, path_value: str) -> dict[str, Any]: def analyze_release(release_name: str, source_path: str) -> dict[str, Any]: """Thin tool wrapper — semantics live in alfred/agent/tools/specs/analyze_release.yaml.""" from alfred.application.filesystem.resolve_destination import _KB # noqa: PLC0415 - from alfred.domain.release.services import parse_release # noqa: PLC0415 - - path = Path(source_path) - parsed, _ = parse_release(release_name, _KB) - parsed.media_type = detect_media_type(parsed, path, _KB) - - probe_used = False - if parsed.media_type not in ("unknown", "other"): - video_file = find_video_file(path, _KB) - if video_file: - media_info = probe(video_file) - if media_info: - enrich_from_probe(parsed, media_info) - probe_used = True + from alfred.application.release import inspect_release # noqa: PLC0415 + result = inspect_release(release_name, Path(source_path), _KB, _PROBER) + parsed = result.parsed return { "status": "ok", "media_type": parsed.media_type, @@ -227,7 +217,9 @@ def analyze_release(release_name: str, source_path: str) -> dict[str, Any]: "edition": parsed.edition, "site_tag": parsed.site_tag, "is_season_pack": parsed.is_season_pack, - "probe_used": probe_used, + "probe_used": result.probe_used, + "confidence": result.report.confidence, + "road": result.report.road, } @@ -241,7 +233,7 @@ def probe_media(source_path: str) -> dict[str, Any]: "message": f"{source_path} does not exist", } - media_info = probe(path) + media_info = _PROBER.probe(path) if media_info is None: return { "status": "error", diff --git a/alfred/agent/tools/specs/analyze_release.yaml b/alfred/agent/tools/specs/analyze_release.yaml index c701fc6..17d4ba2 100644 --- a/alfred/agent/tools/specs/analyze_release.yaml +++ b/alfred/agent/tools/specs/analyze_release.yaml @@ -80,3 +80,5 @@ returns: site_tag: Source-site tag if present. is_season_pack: True when the folder contains a full season. probe_used: True when ffprobe successfully enriched the result. + confidence: Parser confidence score, 0–100 (higher = more reliable). + road: "Parser road: 'easy' (group schema matched), 'shitty' (heuristic but acceptable), or 'path_of_pain' (low confidence — ask the user before auto-routing)." diff --git a/alfred/agent/tools/specs/resolve_season_destination.yaml b/alfred/agent/tools/specs/resolve_season_destination.yaml index 38b9806..f13c8f2 100644 --- a/alfred/agent/tools/specs/resolve_season_destination.yaml +++ b/alfred/agent/tools/specs/resolve_season_destination.yaml @@ -61,6 +61,17 @@ parameters: one. example: Oz.1997.1080p.WEBRip.x265-KONTRAST + source_path: + description: | + Absolute path to the release folder on disk. Optional. + why_needed: | + When provided, the tool runs ffprobe on the main video inside the + folder and uses the probe data to fill quality/codec tokens that + may be missing from the release name. The enriched tech tokens + end up in the destination folder name, so providing source_path + gives more accurate names for releases with sparse metadata. + example: /downloads/Oz.S03.1080p.WEBRip.x265-KONTRAST + returns: ok: description: Paths resolved unambiguously; ready to move. diff --git a/alfred/agent/tools/specs/resolve_series_destination.yaml b/alfred/agent/tools/specs/resolve_series_destination.yaml index e4224a6..a9db5ba 100644 --- a/alfred/agent/tools/specs/resolve_series_destination.yaml +++ b/alfred/agent/tools/specs/resolve_series_destination.yaml @@ -56,6 +56,16 @@ parameters: Forces the use case to use this exact folder name and skip detection. example: The.Wire.2002.1080p.BluRay.x265-GROUP + source_path: + description: | + Absolute path to the release folder on disk. Optional. + why_needed: | + When provided, the tool runs ffprobe on the main video inside the + folder and uses probe data to fill quality/codec tokens that may + be missing from the release name, producing a more accurate + destination folder name. + example: /downloads/The.Wire.S01-S05.1080p.BluRay.x265-GROUP + returns: ok: description: Path resolved; ready to move the pack. diff --git a/alfred/application/filesystem/resolve_destination.py b/alfred/application/filesystem/resolve_destination.py index de106b8..437b768 100644 --- a/alfred/application/filesystem/resolve_destination.py +++ b/alfred/application/filesystem/resolve_destination.py @@ -22,10 +22,13 @@ import logging from dataclasses import dataclass from pathlib import Path +from alfred.application.release import inspect_release from alfred.domain.release import parse_release from alfred.domain.release.ports import ReleaseKnowledge +from alfred.domain.release.value_objects import ParsedRelease from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge from alfred.infrastructure.persistence import get_memory +from alfred.infrastructure.probe import FfprobeMediaProber logger = logging.getLogger(__name__) @@ -33,6 +36,26 @@ logger = logging.getLogger(__name__) # Tests that need a custom KB can monkeypatch this attribute. _KB: ReleaseKnowledge = YamlReleaseKnowledge() +# Module-level prober — same singleton style as _KB. Tests that need a custom +# adapter can monkeypatch this attribute. +_PROBER = FfprobeMediaProber() + + +def _resolve_parsed(release_name: str, source_path: str | None) -> ParsedRelease: + """Pick the right entry point depending on whether we have a path. + + When ``source_path`` is provided and points to something that exists, + we run the full inspection pipeline so probe data can refresh + ``tech_string`` (which feeds every filename builder). Otherwise we + fall back to a parse-only path — same behavior as before. + """ + if source_path: + path = Path(source_path) + if path.exists(): + return inspect_release(release_name, path, _KB, _PROBER).parsed + parsed, _ = parse_release(release_name, _KB) + return parsed + def _find_existing_tvshow_folders( tv_root: Path, tmdb_title_safe: str, tmdb_year: int @@ -237,12 +260,17 @@ def resolve_season_destination( tmdb_title: str, tmdb_year: int, confirmed_folder: str | None = None, + source_path: str | None = None, ) -> ResolvedSeasonDestination: """ Compute destination paths for a season pack. Returns series_folder + season_folder. No file paths — the whole source folder is moved as-is into season_folder. + + When ``source_path`` points to the release on disk, the parser is + augmented with ffprobe data so tech tokens missing from the release + name (quality / codec) end up in the folder names. """ tv_root = _get_tv_root() if not tv_root: @@ -252,7 +280,7 @@ def resolve_season_destination( message="TV show library path is not configured.", ) - parsed, _ = parse_release(release_name, _KB) + parsed = _resolve_parsed(release_name, source_path) tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title) computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year) @@ -293,6 +321,8 @@ def resolve_episode_destination( Compute destination paths for a single episode file. Returns series_folder + season_folder + library_file (full path to .mkv). + ``source_file`` doubles as the inspection target — when it exists, + ffprobe enrichment refreshes tech tokens missing from the release name. """ tv_root = _get_tv_root() if not tv_root: @@ -302,7 +332,7 @@ def resolve_episode_destination( message="TV show library path is not configured.", ) - parsed, _ = parse_release(release_name, _KB) + parsed = _resolve_parsed(release_name, source_file) ext = Path(source_file).suffix tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title) tmdb_episode_title_safe = ( @@ -350,6 +380,8 @@ def resolve_movie_destination( Compute destination paths for a movie file. Returns movie_folder + library_file (full path to .mkv). + ``source_file`` doubles as the inspection target — when it exists, + ffprobe enrichment refreshes tech tokens missing from the release name. """ memory = get_memory() movies_root = memory.ltm.library_paths.get("movie") @@ -360,7 +392,7 @@ def resolve_movie_destination( message="Movie library path is not configured.", ) - parsed, _ = parse_release(release_name, _KB) + parsed = _resolve_parsed(release_name, source_file) ext = Path(source_file).suffix tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title) @@ -385,11 +417,15 @@ def resolve_series_destination( tmdb_title: str, tmdb_year: int, confirmed_folder: str | None = None, + source_path: str | None = None, ) -> ResolvedSeriesDestination: """ Compute destination path for a complete multi-season series pack. Returns only series_folder — the whole pack lands directly inside it. + + When ``source_path`` points to the release on disk, ffprobe + enrichment refreshes tech tokens missing from the release name. """ tv_root = _get_tv_root() if not tv_root: @@ -399,7 +435,7 @@ def resolve_series_destination( message="TV show library path is not configured.", ) - parsed, _ = parse_release(release_name, _KB) + parsed = _resolve_parsed(release_name, source_path) tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title) computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year) diff --git a/alfred/application/release/__init__.py b/alfred/application/release/__init__.py index c00e603..1d75168 100644 --- a/alfred/application/release/__init__.py +++ b/alfred/application/release/__init__.py @@ -1,11 +1,20 @@ """Release application layer — orchestrators sitting between domain parsing and infrastructure I/O. -Today it exposes the pre-pipeline exclusion helpers -(:mod:`supported_media`). Phase C will add the ``inspect_release`` -orchestrator here. +Public surface: + +- :func:`is_supported_video` / :func:`find_main_video` — pre-pipeline + filesystem helpers (extension-only filtering, top-level video pick). +- :func:`inspect_release` / :class:`InspectedResult` — full inspection + pipeline combining parse + filesystem refinement + probe enrichment. """ +from .inspect import InspectedResult, inspect_release from .supported_media import find_main_video, is_supported_video -__all__ = ["find_main_video", "is_supported_video"] +__all__ = [ + "InspectedResult", + "find_main_video", + "inspect_release", + "is_supported_video", +] diff --git a/alfred/application/filesystem/detect_media_type.py b/alfred/application/release/detect_media_type.py similarity index 100% rename from alfred/application/filesystem/detect_media_type.py rename to alfred/application/release/detect_media_type.py diff --git a/alfred/application/filesystem/enrich_from_probe.py b/alfred/application/release/enrich_from_probe.py similarity index 89% rename from alfred/application/filesystem/enrich_from_probe.py rename to alfred/application/release/enrich_from_probe.py index 5aba9ee..dd66400 100644 --- a/alfred/application/filesystem/enrich_from_probe.py +++ b/alfred/application/release/enrich_from_probe.py @@ -80,3 +80,10 @@ def enrich_from_probe(parsed: ParsedRelease, info: MediaInfo) -> None: for lang in info.audio_languages: if lang.lower() != "und" and lang.upper() not in existing: parsed.languages.append(lang) + + # Re-derive tech_string so filename builders see the enriched + # quality/source/codec. Built the same way as in the parser pipeline: + # the non-None parts joined by dots, in order. + parsed.tech_string = ".".join( + p for p in (parsed.quality, parsed.source, parsed.codec) if p + ) diff --git a/alfred/application/release/inspect.py b/alfred/application/release/inspect.py new file mode 100644 index 0000000..9361fa6 --- /dev/null +++ b/alfred/application/release/inspect.py @@ -0,0 +1,140 @@ +"""Release inspection orchestrator — the canonical "look at this thing" +entry point. + +``inspect_release`` is the single composition of the four layers we +care about for a freshly-arrived release: + +1. **Parse the name** — :func:`alfred.domain.release.services.parse_release` + gives a ``ParsedRelease`` plus a ``ParseReport`` (confidence + road). +2. **Pick the main video** — :func:`find_main_video` runs a top-level + scan over the source path. If nothing qualifies the result still + completes; downstream callers decide what to do with a videoless + release. +3. **Refine the media type** — :func:`detect_media_type` uses the + on-disk extension mix to override any token-level guess (e.g. a + bare ``.iso`` folder becomes ``"other"``). The refined value is + patched onto ``parsed`` in place — same convention as + ``analyze_release`` had before. +4. **Probe the video** — the injected :class:`MediaProber` fills in + missing technical fields via :func:`enrich_from_probe`. Skipped + when there is no main video or when ``media_type`` ended up in + ``{"unknown", "other"}`` (the probe would tell us nothing useful). + +The return type is :class:`InspectedResult`, a frozen VO that bundles +everything downstream callers need (``analyze_release`` tool, +``resolve_destination``, future workflow stages) without forcing them +to redo the same four calls. + +Design notes: + +- **Application layer.** This module touches both domain + (``parse_release``) and infrastructure (``MediaProber`` port). That + is exactly application's job — orchestrate. +- **Knowledge base is injected.** ``inspect_release`` takes ``kb`` and + ``prober`` as parameters; no module-level singletons here. Callers + (the tool wrapper, tests) decide what to plug in. +- **Mutation is contained.** We still mutate ``parsed.media_type`` and + let ``enrich_from_probe`` fill its ``None`` fields, because + ``ParsedRelease`` is intentionally a mutable dataclass. The outer + ``InspectedResult`` is frozen so the *bundle* is immutable from the + caller's perspective. +- **Never raises.** Filesystem / probe errors surface as ``None`` + fields on the result, never as exceptions — same contract as the + underlying adapters. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from alfred.application.release.detect_media_type import detect_media_type +from alfred.application.release.enrich_from_probe import enrich_from_probe +from alfred.application.release.supported_media import find_main_video +from alfred.domain.release.ports import ReleaseKnowledge +from alfred.domain.release.services import parse_release +from alfred.domain.release.value_objects import ParsedRelease, ParseReport +from alfred.domain.shared.media import MediaInfo +from alfred.domain.shared.ports import MediaProber + + +@dataclass(frozen=True) +class InspectedResult: + """The full picture of a release: parsed name + filesystem reality. + + Bundles everything the downstream pipeline needs after a single + inspection pass: + + - ``parsed`` — :class:`ParsedRelease`, with ``media_type`` already + refined by :func:`detect_media_type` and ``None`` tech fields + filled in by :func:`enrich_from_probe` when a probe ran. + - ``report`` — :class:`ParseReport` from the parser (confidence + + road, untouched by inspection). + - ``source_path`` — the path the inspector was pointed at (file or + folder), as supplied by the caller. + - ``main_video`` — the canonical video file inside ``source_path``, + or ``None`` if no eligible file was found. + - ``media_info`` — the :class:`MediaInfo` snapshot when a probe + succeeded; ``None`` when no video was probed (no main video, or + ``media_type`` in ``{"unknown", "other"}``) or when ffprobe + failed. + - ``probe_used`` — ``True`` iff ``media_info`` is non-``None`` and + ``enrich_from_probe`` actually ran. Explicit flag so callers + don't have to re-derive the condition. + """ + + parsed: ParsedRelease + report: ParseReport + source_path: Path + main_video: Path | None + media_info: MediaInfo | None + probe_used: bool + + +# Media types for which a probe carries no useful information. +_NON_PROBABLE_MEDIA_TYPES = frozenset({"unknown", "other"}) + + +def inspect_release( + release_name: str, + source_path: Path, + kb: ReleaseKnowledge, + prober: MediaProber, +) -> InspectedResult: + """Run the full inspection pipeline on ``release_name`` / + ``source_path``. + + See module docstring for the four-step flow. ``kb`` and ``prober`` + are injected so the caller controls the knowledge base layering + and the probe adapter (real ffprobe in production, stubs in tests). + + Never raises. A missing or unreadable ``source_path`` simply + results in ``main_video=None`` and ``media_info=None``. + """ + parsed, report = parse_release(release_name, kb) + + # Step 2: refine media_type from the on-disk extension mix. + # detect_media_type tolerates non-existent paths (returns parsed.media_type + # untouched), so no need to guard here. + parsed.media_type = detect_media_type(parsed, source_path, kb) + + # Step 3: pick the canonical main video (top-level scan only). + main_video = find_main_video(source_path, kb) + + # Step 4: probe + enrich, when it makes sense. + media_info: MediaInfo | None = None + probe_used = False + if main_video is not None and parsed.media_type not in _NON_PROBABLE_MEDIA_TYPES: + media_info = prober.probe(main_video) + if media_info is not None: + enrich_from_probe(parsed, media_info) + probe_used = True + + return InspectedResult( + parsed=parsed, + report=report, + source_path=source_path, + main_video=main_video, + media_info=media_info, + probe_used=probe_used, + ) diff --git a/alfred/domain/shared/ports/media_prober.py b/alfred/domain/shared/ports/media_prober.py index d06df09..8dca6c2 100644 --- a/alfred/domain/shared/ports/media_prober.py +++ b/alfred/domain/shared/ports/media_prober.py @@ -9,7 +9,10 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path -from typing import Protocol +from typing import TYPE_CHECKING, Protocol + +if TYPE_CHECKING: + from alfred.domain.shared.media import MediaInfo @dataclass(frozen=True) @@ -37,3 +40,13 @@ class MediaProber(Protocol): no subtitle streams. Adapters must not raise. """ ... + + def probe(self, video: Path) -> MediaInfo | None: + """Return the full :class:`MediaInfo` for ``video``, or ``None``. + + Covers all stream families (video, audio, subtitle) plus + file-level duration / bitrate. ``None`` signals that ffprobe is + unavailable or the file can't be read — adapters must not + raise. + """ + ... diff --git a/alfred/infrastructure/filesystem/ffprobe.py b/alfred/infrastructure/filesystem/ffprobe.py deleted file mode 100644 index df12878..0000000 --- a/alfred/infrastructure/filesystem/ffprobe.py +++ /dev/null @@ -1,121 +0,0 @@ -"""ffprobe — infrastructure adapter for extracting MediaInfo from a video file.""" - -from __future__ import annotations - -import json -import logging -import subprocess -from pathlib import Path - -from alfred.domain.shared.media import AudioTrack, MediaInfo, SubtitleTrack, VideoTrack - -logger = logging.getLogger(__name__) - -_FFPROBE_CMD = [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_streams", - "-show_format", -] - - -def probe(path: Path) -> MediaInfo | None: - """ - Run ffprobe on path and return a MediaInfo. - - Returns None if ffprobe is not available or the file cannot be probed. - """ - try: - result = subprocess.run( - [*_FFPROBE_CMD, str(path)], - capture_output=True, - text=True, - timeout=30, - check=False, - ) - except subprocess.TimeoutExpired: - logger.warning("ffprobe timed out on %s", path) - return None - - if result.returncode != 0: - logger.warning("ffprobe failed on %s: %s", path, result.stderr.strip()) - return None - - try: - data = json.loads(result.stdout) - except json.JSONDecodeError: - logger.warning("ffprobe returned invalid JSON for %s", path) - return None - - return _parse(data) - - -def _parse(data: dict) -> MediaInfo: - streams = data.get("streams", []) - fmt = data.get("format", {}) - - # File-level duration/bitrate (ffprobe ``format`` block — independent of streams) - duration_seconds: float | None = None - bitrate_kbps: int | None = None - if "duration" in fmt: - try: - duration_seconds = float(fmt["duration"]) - except ValueError: - pass - if "bit_rate" in fmt: - try: - bitrate_kbps = int(fmt["bit_rate"]) // 1000 - except ValueError: - pass - - video_tracks: list[VideoTrack] = [] - audio_tracks: list[AudioTrack] = [] - subtitle_tracks: list[SubtitleTrack] = [] - - for stream in streams: - codec_type = stream.get("codec_type") - - if codec_type == "video": - video_tracks.append( - VideoTrack( - index=stream.get("index", len(video_tracks)), - codec=stream.get("codec_name"), - width=stream.get("width"), - height=stream.get("height"), - is_default=stream.get("disposition", {}).get("default", 0) == 1, - ) - ) - - elif codec_type == "audio": - audio_tracks.append( - AudioTrack( - index=stream.get("index", len(audio_tracks)), - codec=stream.get("codec_name"), - channels=stream.get("channels"), - channel_layout=stream.get("channel_layout"), - language=stream.get("tags", {}).get("language"), - is_default=stream.get("disposition", {}).get("default", 0) == 1, - ) - ) - - elif codec_type == "subtitle": - subtitle_tracks.append( - SubtitleTrack( - index=stream.get("index", len(subtitle_tracks)), - codec=stream.get("codec_name"), - language=stream.get("tags", {}).get("language"), - is_default=stream.get("disposition", {}).get("default", 0) == 1, - is_forced=stream.get("disposition", {}).get("forced", 0) == 1, - ) - ) - - return MediaInfo( - video_tracks=tuple(video_tracks), - audio_tracks=tuple(audio_tracks), - subtitle_tracks=tuple(subtitle_tracks), - duration_seconds=duration_seconds, - bitrate_kbps=bitrate_kbps, - ) diff --git a/alfred/infrastructure/probe/ffprobe_prober.py b/alfred/infrastructure/probe/ffprobe_prober.py index 5ae4017..a8cea9b 100644 --- a/alfred/infrastructure/probe/ffprobe_prober.py +++ b/alfred/infrastructure/probe/ffprobe_prober.py @@ -7,12 +7,23 @@ import logging import subprocess from pathlib import Path +from alfred.domain.shared.media import AudioTrack, MediaInfo, SubtitleTrack, VideoTrack from alfred.domain.shared.ports import SubtitleStreamInfo logger = logging.getLogger(__name__) _FFPROBE_TIMEOUT_SECONDS = 30 +_FFPROBE_FULL_CMD = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_streams", + "-show_format", +] + class FfprobeMediaProber: """Inspect media files by shelling out to ``ffprobe``. @@ -63,3 +74,101 @@ class FfprobeMediaProber: ) ) return streams + + def probe(self, video: Path) -> MediaInfo | None: + """Run ffprobe on ``video`` and return a :class:`MediaInfo`. + + Returns ``None`` when ffprobe is not available, times out, or + the file cannot be parsed. Never raises. + """ + try: + result = subprocess.run( + [*_FFPROBE_FULL_CMD, str(video)], + capture_output=True, + text=True, + timeout=_FFPROBE_TIMEOUT_SECONDS, + check=False, + ) + except (subprocess.TimeoutExpired, FileNotFoundError) as e: + logger.warning("ffprobe failed on %s: %s", video, e) + return None + + if result.returncode != 0: + logger.warning("ffprobe failed on %s: %s", video, result.stderr.strip()) + return None + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + logger.warning("ffprobe returned invalid JSON for %s", video) + return None + + return _parse_media_info(data) + + +def _parse_media_info(data: dict) -> MediaInfo: + """Translate raw ffprobe JSON into a :class:`MediaInfo` snapshot.""" + streams = data.get("streams", []) + fmt = data.get("format", {}) + + duration_seconds: float | None = None + bitrate_kbps: int | None = None + if "duration" in fmt: + try: + duration_seconds = float(fmt["duration"]) + except ValueError: + pass + if "bit_rate" in fmt: + try: + bitrate_kbps = int(fmt["bit_rate"]) // 1000 + except ValueError: + pass + + video_tracks: list[VideoTrack] = [] + audio_tracks: list[AudioTrack] = [] + subtitle_tracks: list[SubtitleTrack] = [] + + for stream in streams: + codec_type = stream.get("codec_type") + + if codec_type == "video": + video_tracks.append( + VideoTrack( + index=stream.get("index", len(video_tracks)), + codec=stream.get("codec_name"), + width=stream.get("width"), + height=stream.get("height"), + is_default=stream.get("disposition", {}).get("default", 0) == 1, + ) + ) + + elif codec_type == "audio": + audio_tracks.append( + AudioTrack( + index=stream.get("index", len(audio_tracks)), + codec=stream.get("codec_name"), + channels=stream.get("channels"), + channel_layout=stream.get("channel_layout"), + language=stream.get("tags", {}).get("language"), + is_default=stream.get("disposition", {}).get("default", 0) == 1, + ) + ) + + elif codec_type == "subtitle": + subtitle_tracks.append( + SubtitleTrack( + index=stream.get("index", len(subtitle_tracks)), + codec=stream.get("codec_name"), + language=stream.get("tags", {}).get("language"), + is_default=stream.get("disposition", {}).get("default", 0) == 1, + is_forced=stream.get("disposition", {}).get("forced", 0) == 1, + ) + ) + + return MediaInfo( + video_tracks=tuple(video_tracks), + audio_tracks=tuple(audio_tracks), + subtitle_tracks=tuple(subtitle_tracks), + duration_seconds=duration_seconds, + bitrate_kbps=bitrate_kbps, + ) diff --git a/testing/debug_release.py b/testing/debug_release.py index fa28591..e639b4a 100644 --- a/testing/debug_release.py +++ b/testing/debug_release.py @@ -88,13 +88,13 @@ def analyze(release_name: str, source_path: str | None = None) -> None: if not path.exists(): print(" (chemin inexistant, probe skipped)") else: - from alfred.infrastructure.filesystem.ffprobe import probe from alfred.infrastructure.filesystem.find_video import find_video_file + from alfred.infrastructure.probe import FfprobeMediaProber video = find_video_file(path) if path.is_dir() else path if video: print(f" video file: {video.name}") - info = probe(video) + info = FfprobeMediaProber().probe(video) if info: print(f" codec: {info.video_codec}") print(f" resolution: {info.resolution}") diff --git a/testing/probe_video.py b/testing/probe_video.py index 078a6bd..6dd84e2 100644 --- a/testing/probe_video.py +++ b/testing/probe_video.py @@ -98,9 +98,9 @@ def main() -> None: print(c(f"Error: {path} does not exist", RED), file=sys.stderr) sys.exit(1) - from alfred.infrastructure.filesystem.ffprobe import probe + from alfred.infrastructure.probe import FfprobeMediaProber - info = probe(path) + info = FfprobeMediaProber().probe(path) if info is None: print(c("Error: ffprobe failed to probe the file", RED), file=sys.stderr) sys.exit(1) diff --git a/testing/recognize_folders_in_downloads.py b/testing/recognize_folders_in_downloads.py index 498716d..8dd2d41 100644 --- a/testing/recognize_folders_in_downloads.py +++ b/testing/recognize_folders_in_downloads.py @@ -100,11 +100,13 @@ def main() -> None: print(c(f"Error: {downloads} does not exist", RED), file=sys.stderr) sys.exit(1) - from alfred.application.filesystem.detect_media_type import detect_media_type - from alfred.application.filesystem.enrich_from_probe import enrich_from_probe + from alfred.application.release.detect_media_type import detect_media_type + from alfred.application.release.enrich_from_probe import enrich_from_probe from alfred.domain.release.services import parse_release - from alfred.infrastructure.filesystem.ffprobe import probe from alfred.infrastructure.filesystem.find_video import find_video_file + from alfred.infrastructure.probe import FfprobeMediaProber + + _prober = FfprobeMediaProber() entries = sorted(downloads.iterdir(), key=lambda p: p.name.lower()) total = len(entries) @@ -126,7 +128,7 @@ def main() -> None: if p.media_type not in ("unknown", "other"): video_file = find_video_file(entry) if video_file: - media_info = probe(video_file) + media_info = _prober.probe(video_file) if media_info: enrich_from_probe(p, media_info) warnings = _assess(p) diff --git a/tests/application/test_detect_media_type.py b/tests/application/test_detect_media_type.py index 468675b..1d55a62 100644 --- a/tests/application/test_detect_media_type.py +++ b/tests/application/test_detect_media_type.py @@ -1,4 +1,4 @@ -"""Tests for ``alfred.application.filesystem.detect_media_type``. +"""Tests for ``alfred.application.release.detect_media_type``. The function refines a ``ParsedRelease.media_type`` using filesystem evidence. @@ -18,7 +18,7 @@ from pathlib import Path import pytest -from alfred.application.filesystem.detect_media_type import detect_media_type +from alfred.application.release.detect_media_type import detect_media_type from alfred.domain.release.services import parse_release from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge diff --git a/tests/application/test_enrich_from_probe.py b/tests/application/test_enrich_from_probe.py index c192912..cf77ff9 100644 --- a/tests/application/test_enrich_from_probe.py +++ b/tests/application/test_enrich_from_probe.py @@ -1,4 +1,4 @@ -"""Tests for ``alfred.application.filesystem.enrich_from_probe``. +"""Tests for ``alfred.application.release.enrich_from_probe``. The function mutates a ``ParsedRelease`` in place using ffprobe ``MediaInfo``. Token-level values from the release name always win — only ``None`` fields @@ -18,7 +18,7 @@ Uses real ``ParsedRelease`` / ``MediaInfo`` instances — no mocking needed. from __future__ import annotations -from alfred.application.filesystem.enrich_from_probe import enrich_from_probe +from alfred.application.release.enrich_from_probe import enrich_from_probe from alfred.domain.release.value_objects import ParsedRelease from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack @@ -210,3 +210,42 @@ class TestLanguages: p = _bare() enrich_from_probe(p, MediaInfo()) assert p.languages == [] + + +# --------------------------------------------------------------------------- # +# tech_string # +# --------------------------------------------------------------------------- # + + +class TestTechString: + """tech_string drives the filename builders; it must be re-derived + whenever quality / source / codec change.""" + + def test_rebuilt_from_filled_quality_and_codec(self): + p = _bare() + enrich_from_probe( + p, _info_with_video(width=1920, height=1080, codec="hevc") + ) + assert p.quality == "1080p" + assert p.codec == "x265" + assert p.tech_string == "1080p.x265" + + def test_keeps_existing_source_when_enriching(self): + # Token-level source must stay; probe fills only None fields. + p = _bare(source="BluRay") + enrich_from_probe( + p, _info_with_video(width=1920, height=1080, codec="hevc") + ) + assert p.tech_string == "1080p.BluRay.x265" + + def test_unchanged_when_no_enrichable_video_info(self): + # No video info → nothing to fill → tech_string stays as it was. + p = _bare(quality="2160p", source="WEB-DL", codec="x265") + p.tech_string = "2160p.WEB-DL.x265" + enrich_from_probe(p, MediaInfo()) + assert p.tech_string == "2160p.WEB-DL.x265" + + def test_empty_when_nothing_known(self): + p = _bare() + enrich_from_probe(p, MediaInfo()) + assert p.tech_string == "" diff --git a/tests/application/test_inspect.py b/tests/application/test_inspect.py new file mode 100644 index 0000000..f095a1f --- /dev/null +++ b/tests/application/test_inspect.py @@ -0,0 +1,265 @@ +"""Tests for the ``inspect_release`` orchestrator (Phase C). + +Covers the four composition steps as a black box: a real +``YamlReleaseKnowledge``, real on-disk filesystem under ``tmp_path``, +and a stubbed ``MediaProber`` so we don't depend on a system ``ffprobe``. +""" + +from __future__ import annotations + +from pathlib import Path + +from alfred.application.release import InspectedResult, inspect_release +from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack +from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge + +_KB = YamlReleaseKnowledge() + +_MOVIE_NAME = "Inception.2010.1080p.BluRay.x264-GROUP" +_TV_NAME = "Dexter.S01E01.1080p.WEB-DL.x264-GROUP" + + +# --------------------------------------------------------------------------- # +# Test doubles # +# --------------------------------------------------------------------------- # + + +class _StubProber: + """Minimal MediaProber stub. Records the path it was asked to probe.""" + + def __init__(self, info: MediaInfo | None) -> None: + self._info = info + self.calls: list[Path] = [] + + def list_subtitle_streams(self, video: Path): # pragma: no cover - unused here + return [] + + def probe(self, video: Path) -> MediaInfo | None: + self.calls.append(video) + return self._info + + +class _RaisingProber: + """A prober that would explode if called — used to assert no probe.""" + + def list_subtitle_streams(self, video: Path): # pragma: no cover + raise AssertionError("list_subtitle_streams must not be called") + + def probe(self, video: Path): # pragma: no cover + raise AssertionError("probe must not be called") + + +def _media_info_1080p_h264() -> MediaInfo: + return MediaInfo( + video_tracks=(VideoTrack(index=0, codec="h264", width=1920, height=1080),), + audio_tracks=( + AudioTrack( + index=1, + codec="ac3", + channels=6, + channel_layout="5.1", + language="eng", + is_default=True, + ), + ), + subtitle_tracks=(), + duration_seconds=7200.0, + bitrate_kbps=8000, + ) + + +# --------------------------------------------------------------------------- # +# Happy paths # +# --------------------------------------------------------------------------- # + + +class TestInspectMovieFolder: + def test_returns_inspected_result_with_all_fields(self, tmp_path: Path) -> None: + folder = tmp_path / _MOVIE_NAME + folder.mkdir() + video = folder / "movie.mkv" + video.write_bytes(b"") + prober = _StubProber(_media_info_1080p_h264()) + + result = inspect_release(_MOVIE_NAME, folder, _KB, prober) + + assert isinstance(result, InspectedResult) + assert result.source_path == folder + assert result.main_video == video + assert result.media_info is not None + assert result.probe_used is True + assert prober.calls == [video] + + def test_parsed_carries_token_level_fields(self, tmp_path: Path) -> None: + folder = tmp_path / _MOVIE_NAME + folder.mkdir() + (folder / "movie.mkv").write_bytes(b"") + prober = _StubProber(_media_info_1080p_h264()) + + result = inspect_release(_MOVIE_NAME, folder, _KB, prober) + + assert result.parsed.title.lower().startswith("inception") + assert result.parsed.year == 2010 + assert result.parsed.group == "GROUP" + assert result.parsed.media_type == "movie" + + def test_report_has_confidence_and_road(self, tmp_path: Path) -> None: + folder = tmp_path / _MOVIE_NAME + folder.mkdir() + (folder / "movie.mkv").write_bytes(b"") + prober = _StubProber(None) + + result = inspect_release(_MOVIE_NAME, folder, _KB, prober) + + assert 0 <= result.report.confidence <= 100 + assert result.report.road in ("easy", "shitty", "path_of_pain") + + +class TestInspectSingleFile: + def test_file_is_its_own_main_video(self, tmp_path: Path) -> None: + f = tmp_path / f"{_MOVIE_NAME}.mkv" + f.write_bytes(b"") + prober = _StubProber(_media_info_1080p_h264()) + + result = inspect_release(_MOVIE_NAME, f, _KB, prober) + + assert result.main_video == f + assert result.probe_used is True + + +# --------------------------------------------------------------------------- # +# Probe-gating logic # +# --------------------------------------------------------------------------- # + + +class TestProbeGating: + def test_no_video_means_no_probe(self, tmp_path: Path) -> None: + folder = tmp_path / _MOVIE_NAME + folder.mkdir() + # Only a non-video file present. + (folder / "readme.txt").write_text("hi") + prober = _RaisingProber() + + result = inspect_release(_MOVIE_NAME, folder, _KB, prober) + + assert result.main_video is None + assert result.media_info is None + assert result.probe_used is False + + def test_media_type_other_means_no_probe(self, tmp_path: Path) -> None: + # An ISO-only folder gets detect_media_type → "other". + folder = tmp_path / _MOVIE_NAME + folder.mkdir() + (folder / "disc.iso").write_bytes(b"") + prober = _RaisingProber() + + result = inspect_release(_MOVIE_NAME, folder, _KB, prober) + + assert result.parsed.media_type == "other" + assert result.media_info is None + assert result.probe_used is False + + def test_probe_failure_keeps_probe_used_false(self, tmp_path: Path) -> None: + folder = tmp_path / _MOVIE_NAME + folder.mkdir() + (folder / "movie.mkv").write_bytes(b"") + prober = _StubProber(None) # ffprobe simulated as failing + + result = inspect_release(_MOVIE_NAME, folder, _KB, prober) + + assert result.main_video is not None + assert result.media_info is None + assert result.probe_used is False + + +# --------------------------------------------------------------------------- # +# Mutation contract # +# --------------------------------------------------------------------------- # + + +class TestMutationContract: + def test_detect_media_type_refines_parsed(self, tmp_path: Path) -> None: + # Release name parses to "movie", but folder mixes video + non_video + # (e.g. an ISO sitting next to an mkv) → detect_media_type returns + # "unknown", which is in _NON_PROBABLE_MEDIA_TYPES → no probe. + folder = tmp_path / _MOVIE_NAME + folder.mkdir() + (folder / "movie.mkv").write_bytes(b"") + (folder / "extras.iso").write_bytes(b"") + prober = _RaisingProber() + + result = inspect_release(_MOVIE_NAME, folder, _KB, prober) + + assert result.parsed.media_type == "unknown" + assert result.probe_used is False + + def test_enrich_runs_when_probe_succeeds(self, tmp_path: Path) -> None: + # Build a release name with no codec; probe should fill it in. + name = "Inception.2010.1080p.BluRay-GROUP" + folder = tmp_path / name + folder.mkdir() + (folder / "movie.mkv").write_bytes(b"") + prober = _StubProber(_media_info_1080p_h264()) + + result = inspect_release(name, folder, _KB, prober) + + assert result.probe_used is True + # enrich_from_probe should have filled the missing codec field. + assert result.parsed.codec is not None + + +# --------------------------------------------------------------------------- # +# Resilience # +# --------------------------------------------------------------------------- # + + +class TestResilience: + def test_nonexistent_path_does_not_raise(self, tmp_path: Path) -> None: + ghost = tmp_path / "does-not-exist" + prober = _RaisingProber() + + result = inspect_release(_MOVIE_NAME, ghost, _KB, prober) + + assert result.main_video is None + assert result.media_info is None + assert result.probe_used is False + + def test_tv_release_inspection(self, tmp_path: Path) -> None: + folder = tmp_path / _TV_NAME + folder.mkdir() + video = folder / "episode.mkv" + video.write_bytes(b"") + prober = _StubProber(_media_info_1080p_h264()) + + result = inspect_release(_TV_NAME, folder, _KB, prober) + + assert result.parsed.media_type == "tv_show" + assert result.parsed.season == 1 + assert result.parsed.episode == 1 + assert result.main_video == video + assert result.probe_used is True + + +# --------------------------------------------------------------------------- # +# Frozen contract # +# --------------------------------------------------------------------------- # + + +class TestFrozen: + def test_inspected_result_is_frozen(self, tmp_path: Path) -> None: + folder = tmp_path / _MOVIE_NAME + folder.mkdir() + (folder / "movie.mkv").write_bytes(b"") + prober = _StubProber(None) + + result = inspect_release(_MOVIE_NAME, folder, _KB, prober) + + # frozen=True → assigning a field raises FrozenInstanceError. + import dataclasses + + try: + result.probe_used = True # type: ignore[misc] + except dataclasses.FrozenInstanceError: + pass + else: # pragma: no cover + raise AssertionError("InspectedResult should be frozen") diff --git a/tests/application/test_resolve_destination.py b/tests/application/test_resolve_destination.py index 1c67359..8cd57fa 100644 --- a/tests/application/test_resolve_destination.py +++ b/tests/application/test_resolve_destination.py @@ -322,6 +322,104 @@ class TestSeries: assert out.status == "needs_clarification" +# --------------------------------------------------------------------------- # +# Probe enrichment wiring # +# --------------------------------------------------------------------------- # + + +class _StubProber: + """Minimal MediaProber stub used to drive enrich_from_probe.""" + + def __init__(self, info): + self._info = info + + def list_subtitle_streams(self, video): # pragma: no cover - unused here + return [] + + def probe(self, video): + return self._info + + +def _stereo_movie_info(): + """A MediaInfo that fills quality+codec when the release name omits them.""" + from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack + + return MediaInfo( + video_tracks=(VideoTrack(index=0, codec="hevc", width=1920, height=1080),), + audio_tracks=( + AudioTrack( + index=1, + codec="aac", + channels=2, + channel_layout="stereo", + language="eng", + is_default=True, + ), + ), + subtitle_tracks=(), + ) + + +class TestProbeEnrichmentWiring: + """When source_path/source_file points to a real file, the resolver + should pick up ffprobe data via inspect_release and let the enriched + tech_string land in the destination name.""" + + def test_movie_picks_up_probe_quality( + self, cfg_memory, tmp_path, monkeypatch + ): + from alfred.application.filesystem import resolve_destination as rd + + monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info())) + # Release name parses to "movie" but is missing the quality token; + # probe must supply 1080p and refresh tech_string. + bare_name = "Inception.2010.BluRay.x264-GROUP" + video = tmp_path / "movie.mkv" + video.write_bytes(b"") + + out = resolve_movie_destination(bare_name, str(video), "Inception", 2010) + + assert out.status == "ok" + # tech_string -> "1080p.BluRay.x264" -> "1080p" shows up in names. + assert "1080p" in out.movie_folder_name + assert "1080p" in out.filename + + def test_movie_skips_probe_when_path_missing(self, cfg_memory, monkeypatch): + # If the file doesn't exist, no probe runs (the stub would have + # injected 1080p — its absence proves the skip). + from alfred.application.filesystem import resolve_destination as rd + + monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info())) + out = resolve_movie_destination( + "Inception.2010.BluRay.x264-GROUP", + "/nowhere/m.mkv", + "Inception", + 2010, + ) + assert out.status == "ok" + assert "1080p" not in out.movie_folder_name + + def test_season_picks_up_probe_via_source_path( + self, cfg_memory, tmp_path, monkeypatch + ): + from alfred.application.filesystem import resolve_destination as rd + + monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info())) + # Season pack name missing quality token; probe must add it. + bare_name = "Oz.S03.BluRay.x265-KONTRAST" + release_dir = tmp_path / bare_name + release_dir.mkdir() + (release_dir / "episode.mkv").write_bytes(b"") + + out = resolve_season_destination( + bare_name, "Oz", 1997, source_path=str(release_dir) + ) + + assert out.status == "ok" + # Series folder name embeds tech_string -> "1080p" surfaced by probe. + assert "1080p" in out.series_folder_name + + # --------------------------------------------------------------------------- # # DTO to_dict() # # --------------------------------------------------------------------------- # diff --git a/tests/infrastructure/test_ffprobe_prober.py b/tests/infrastructure/test_ffprobe_prober.py new file mode 100644 index 0000000..c791208 --- /dev/null +++ b/tests/infrastructure/test_ffprobe_prober.py @@ -0,0 +1,155 @@ +"""Tests for :class:`FfprobeMediaProber`. + +Covers the full-probe path (``probe()`` returning a ``MediaInfo``) by +patching ``subprocess.run`` at the adapter module level. The +subtitle-streams path is exercised by the subtitle domain tests via +the same adapter. +""" + +from __future__ import annotations + +import json +import subprocess +from unittest.mock import MagicMock, patch + +from alfred.infrastructure.probe import FfprobeMediaProber + +_PROBER = FfprobeMediaProber() +_PATCH_TARGET = "alfred.infrastructure.probe.ffprobe_prober.subprocess.run" + + +def _ffprobe_result(returncode=0, stdout="{}", stderr="") -> MagicMock: + return MagicMock(returncode=returncode, stdout=stdout, stderr=stderr) + + +class TestProbe: + def test_timeout_returns_none(self, tmp_path): + f = tmp_path / "x.mkv" + f.write_bytes(b"") + with patch( + _PATCH_TARGET, + side_effect=subprocess.TimeoutExpired(cmd="ffprobe", timeout=30), + ): + assert _PROBER.probe(f) is None + + def test_nonzero_returncode_returns_none(self, tmp_path): + f = tmp_path / "x.mkv" + f.write_bytes(b"") + with patch( + _PATCH_TARGET, + return_value=_ffprobe_result(returncode=1, stderr="not a media file"), + ): + assert _PROBER.probe(f) is None + + def test_invalid_json_returns_none(self, tmp_path): + f = tmp_path / "x.mkv" + f.write_bytes(b"") + with patch( + _PATCH_TARGET, + return_value=_ffprobe_result(stdout="not json {"), + ): + assert _PROBER.probe(f) is None + + def test_parses_format_duration_and_bitrate(self, tmp_path): + f = tmp_path / "x.mkv" + f.write_bytes(b"") + payload = { + "format": {"duration": "1234.5", "bit_rate": "5000000"}, + "streams": [], + } + with patch( + _PATCH_TARGET, + return_value=_ffprobe_result(stdout=json.dumps(payload)), + ): + info = _PROBER.probe(f) + assert info is not None + assert info.duration_seconds == 1234.5 + assert info.bitrate_kbps == 5000 # bit_rate // 1000 + + def test_invalid_numeric_format_fields_skipped(self, tmp_path): + f = tmp_path / "x.mkv" + f.write_bytes(b"") + payload = { + "format": {"duration": "garbage", "bit_rate": "also-bad"}, + "streams": [], + } + with patch( + _PATCH_TARGET, + return_value=_ffprobe_result(stdout=json.dumps(payload)), + ): + info = _PROBER.probe(f) + assert info is not None + assert info.duration_seconds is None + assert info.bitrate_kbps is None + + def test_parses_streams(self, tmp_path): + f = tmp_path / "x.mkv" + f.write_bytes(b"") + payload = { + "format": {}, + "streams": [ + { + "index": 0, + "codec_type": "video", + "codec_name": "h264", + "width": 1920, + "height": 1080, + }, + { + "index": 1, + "codec_type": "audio", + "codec_name": "ac3", + "channels": 6, + "channel_layout": "5.1", + "tags": {"language": "eng"}, + "disposition": {"default": 1}, + }, + { + "index": 2, + "codec_type": "audio", + "codec_name": "aac", + "channels": 2, + "tags": {"language": "fra"}, + }, + { + "index": 3, + "codec_type": "subtitle", + "codec_name": "subrip", + "tags": {"language": "fra"}, + "disposition": {"forced": 1}, + }, + ], + } + with patch( + _PATCH_TARGET, + return_value=_ffprobe_result(stdout=json.dumps(payload)), + ): + info = _PROBER.probe(f) + assert info.video_codec == "h264" + assert info.width == 1920 and info.height == 1080 + assert len(info.audio_tracks) == 2 + eng = info.audio_tracks[0] + assert eng.language == "eng" + assert eng.is_default is True + assert info.audio_tracks[1].is_default is False + assert len(info.subtitle_tracks) == 1 + assert info.subtitle_tracks[0].is_forced is True + + def test_first_video_stream_wins(self, tmp_path): + # The implementation only fills video_codec on the FIRST video stream. + f = tmp_path / "x.mkv" + f.write_bytes(b"") + payload = { + "format": {}, + "streams": [ + {"codec_type": "video", "codec_name": "h264", "width": 1920}, + {"codec_type": "video", "codec_name": "hevc", "width": 3840}, + ], + } + with patch( + _PATCH_TARGET, + return_value=_ffprobe_result(stdout=json.dumps(payload)), + ): + info = _PROBER.probe(f) + assert info.video_codec == "h264" + assert info.width == 1920 diff --git a/tests/infrastructure/test_filesystem_extras.py b/tests/infrastructure/test_filesystem_extras.py index 6988489..ba867ae 100644 --- a/tests/infrastructure/test_filesystem_extras.py +++ b/tests/infrastructure/test_filesystem_extras.py @@ -1,21 +1,19 @@ """Tests for the smaller ``alfred.infrastructure.filesystem`` helpers. -Covers four siblings of ``FileManager`` that had near-zero coverage: +Covers three siblings of ``FileManager`` that had near-zero coverage: -- ``ffprobe.probe`` — wraps ``ffprobe`` JSON output into a ``MediaInfo``. - ``filesystem_operations.create_folder`` / ``move`` — thin ``mkdir`` / ``mv`` wrappers returning dict-shaped responses. - ``organizer.MediaOrganizer`` — computes destination paths for movies and TV episodes; creates folders for them. - ``find_video.find_video_file`` — first-video lookup in a folder. -External commands (``ffprobe`` / ``mv``) are patched via ``subprocess.run``. +(``ffprobe`` coverage now lives in ``test_ffprobe_prober.py`` alongside +its adapter.) """ from __future__ import annotations -import json -import subprocess from unittest.mock import MagicMock, patch from alfred.domain.movies.entities import Movie @@ -27,7 +25,6 @@ from alfred.domain.tv_shows.value_objects import ( SeasonNumber, ShowStatus, ) -from alfred.infrastructure.filesystem import ffprobe from alfred.infrastructure.filesystem.filesystem_operations import ( create_folder, move, @@ -38,147 +35,6 @@ from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge _KB = YamlReleaseKnowledge() -# --------------------------------------------------------------------------- # -# ffprobe.probe # -# --------------------------------------------------------------------------- # - - -def _ffprobe_result(returncode=0, stdout="{}", stderr="") -> MagicMock: - return MagicMock(returncode=returncode, stdout=stdout, stderr=stderr) - - -class TestFfprobe: - def test_timeout_returns_none(self, tmp_path): - f = tmp_path / "x.mkv" - f.write_bytes(b"") - with patch( - "alfred.infrastructure.filesystem.ffprobe.subprocess.run", - side_effect=subprocess.TimeoutExpired(cmd="ffprobe", timeout=30), - ): - assert ffprobe.probe(f) is None - - def test_nonzero_returncode_returns_none(self, tmp_path): - f = tmp_path / "x.mkv" - f.write_bytes(b"") - with patch( - "alfred.infrastructure.filesystem.ffprobe.subprocess.run", - return_value=_ffprobe_result(returncode=1, stderr="not a media file"), - ): - assert ffprobe.probe(f) is None - - def test_invalid_json_returns_none(self, tmp_path): - f = tmp_path / "x.mkv" - f.write_bytes(b"") - with patch( - "alfred.infrastructure.filesystem.ffprobe.subprocess.run", - return_value=_ffprobe_result(stdout="not json {"), - ): - assert ffprobe.probe(f) is None - - def test_parses_format_duration_and_bitrate(self, tmp_path): - f = tmp_path / "x.mkv" - f.write_bytes(b"") - payload = { - "format": {"duration": "1234.5", "bit_rate": "5000000"}, - "streams": [], - } - with patch( - "alfred.infrastructure.filesystem.ffprobe.subprocess.run", - return_value=_ffprobe_result(stdout=json.dumps(payload)), - ): - info = ffprobe.probe(f) - assert info is not None - assert info.duration_seconds == 1234.5 - assert info.bitrate_kbps == 5000 # bit_rate // 1000 - - def test_invalid_numeric_format_fields_skipped(self, tmp_path): - f = tmp_path / "x.mkv" - f.write_bytes(b"") - payload = { - "format": {"duration": "garbage", "bit_rate": "also-bad"}, - "streams": [], - } - with patch( - "alfred.infrastructure.filesystem.ffprobe.subprocess.run", - return_value=_ffprobe_result(stdout=json.dumps(payload)), - ): - info = ffprobe.probe(f) - assert info is not None - assert info.duration_seconds is None - assert info.bitrate_kbps is None - - def test_parses_streams(self, tmp_path): - f = tmp_path / "x.mkv" - f.write_bytes(b"") - payload = { - "format": {}, - "streams": [ - { - "index": 0, - "codec_type": "video", - "codec_name": "h264", - "width": 1920, - "height": 1080, - }, - { - "index": 1, - "codec_type": "audio", - "codec_name": "ac3", - "channels": 6, - "channel_layout": "5.1", - "tags": {"language": "eng"}, - "disposition": {"default": 1}, - }, - { - "index": 2, - "codec_type": "audio", - "codec_name": "aac", - "channels": 2, - "tags": {"language": "fra"}, - }, - { - "index": 3, - "codec_type": "subtitle", - "codec_name": "subrip", - "tags": {"language": "fra"}, - "disposition": {"forced": 1}, - }, - ], - } - with patch( - "alfred.infrastructure.filesystem.ffprobe.subprocess.run", - return_value=_ffprobe_result(stdout=json.dumps(payload)), - ): - info = ffprobe.probe(f) - assert info.video_codec == "h264" - assert info.width == 1920 and info.height == 1080 - assert len(info.audio_tracks) == 2 - eng = info.audio_tracks[0] - assert eng.language == "eng" - assert eng.is_default is True - assert info.audio_tracks[1].is_default is False - assert len(info.subtitle_tracks) == 1 - assert info.subtitle_tracks[0].is_forced is True - - def test_first_video_stream_wins(self, tmp_path): - # The implementation only fills video_codec on the FIRST video stream. - f = tmp_path / "x.mkv" - f.write_bytes(b"") - payload = { - "format": {}, - "streams": [ - {"codec_type": "video", "codec_name": "h264", "width": 1920}, - {"codec_type": "video", "codec_name": "hevc", "width": 3840}, - ], - } - with patch( - "alfred.infrastructure.filesystem.ffprobe.subprocess.run", - return_value=_ffprobe_result(stdout=json.dumps(payload)), - ): - info = ffprobe.probe(f) - assert info.video_codec == "h264" - assert info.width == 1920 - # --------------------------------------------------------------------------- # # filesystem_operations #