Merge branch 'feat/release-inspect-orchestrator'
Inspection pipeline groundwork: - MediaProber.probe() port extension (full media inspection on the port) - inspect_release orchestrator + InspectedResult frozen VO - enrich_from_probe now refreshes tech_string - resolve_*_destination use cases consume inspect_release - detect_media_type & enrich_from_probe moved to application/release
This commit is contained in:
@@ -15,6 +15,77 @@ callers).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Internal
|
||||
|
||||
- **Moved `detect_media_type` and `enrich_from_probe` from
|
||||
`alfred.application.filesystem` to `alfred.application.release`**.
|
||||
They are inspection-pipeline helpers — their natural home is next to
|
||||
`inspect_release`, not next to the filesystem use cases. The move
|
||||
also eliminates a circular-import workaround in
|
||||
`resolve_destination.py`: `inspect_release` can now be imported at
|
||||
module top instead of lazily inside `_resolve_parsed`. Public
|
||||
surface is unchanged for callers that imported the helpers from
|
||||
their full module paths (the only call sites — `inspect.py`, two
|
||||
tests, one testing script — were updated in this commit).
|
||||
|
||||
### Added
|
||||
|
||||
- **`resolve_*_destination` use cases now consume `inspect_release`**.
|
||||
`resolve_episode_destination` and `resolve_movie_destination` reuse
|
||||
their existing `source_file` parameter as the inspection target;
|
||||
`resolve_season_destination` and `resolve_series_destination` gain
|
||||
a new **optional** `source_path` parameter (also threaded through
|
||||
the tool wrappers and YAML specs). When the path exists, ffprobe
|
||||
data fills tokens missing from the release name (e.g. quality) and
|
||||
refreshes `tech_string`, so the destination folder / file names
|
||||
end up more accurate. When the path is missing or absent (back-compat
|
||||
callers), the use cases fall back to parse-only — same behavior as
|
||||
before.
|
||||
|
||||
### Fixed
|
||||
|
||||
- **`enrich_from_probe` now refreshes `tech_string`** after filling
|
||||
`quality` / `source` / `codec`. Previously the field stayed at its
|
||||
parser-time value, so filename builders saw stale tech tokens even
|
||||
after a successful probe. New `TestTechString` class in
|
||||
`tests/application/test_enrich_from_probe.py` locks the behavior.
|
||||
|
||||
### Added
|
||||
|
||||
- **`inspect_release` orchestrator + `InspectedResult` VO**
|
||||
(`alfred/application/release/inspect.py`). Single composition of the
|
||||
four inspection layers: `parse_release` → `detect_media_type` (patches
|
||||
`parsed.media_type`) → `find_main_video` (top-level scan) →
|
||||
`prober.probe` + `enrich_from_probe` when a video exists and the
|
||||
refined media type isn't in `{"unknown", "other"}`. Returns a frozen
|
||||
`InspectedResult(parsed, report, source_path, main_video, media_info,
|
||||
probe_used)` that downstream callers consume directly instead of
|
||||
rebuilding the same chain. `kb` and `prober` are injected — no
|
||||
module-level singletons. Never raises.
|
||||
|
||||
### Changed
|
||||
|
||||
- **`analyze_release` tool now delegates to `inspect_release`** — same
|
||||
output shape, plus two new fields: `confidence` (0–100) and `road`
|
||||
(`"easy"` / `"shitty"` / `"path_of_pain"`) surfaced from the parser's
|
||||
`ParseReport`. The tool spec (`specs/analyze_release.yaml`) documents
|
||||
both fields so the LLM can route releases by confidence.
|
||||
|
||||
- **`MediaProber` port now covers full media probing**: added
|
||||
`probe(video) -> MediaInfo | None` alongside the existing
|
||||
`list_subtitle_streams`. `FfprobeMediaProber` (in
|
||||
`alfred/infrastructure/probe/`) implements both methods and is now
|
||||
the single adapter shelling out to `ffprobe`. The standalone
|
||||
`alfred/infrastructure/filesystem/ffprobe.py` module was removed —
|
||||
all callers (tools, testing scripts) instantiate
|
||||
`FfprobeMediaProber` instead. Unblocks the upcoming
|
||||
`inspect_release` orchestrator, which depends on the port.
|
||||
|
||||
### Removed
|
||||
|
||||
- `alfred/infrastructure/filesystem/ffprobe.py` (folded into the
|
||||
`FfprobeMediaProber` adapter).
|
||||
|
||||
---
|
||||
|
||||
## [2026-05-20] — Release parser confidence scoring + exclusion
|
||||
|
||||
@@ -13,8 +13,6 @@ from alfred.application.filesystem import (
|
||||
MoveMediaUseCase,
|
||||
SetFolderPathUseCase,
|
||||
)
|
||||
from alfred.application.filesystem.detect_media_type import detect_media_type
|
||||
from alfred.application.filesystem.enrich_from_probe import enrich_from_probe
|
||||
from alfred.application.filesystem.resolve_destination import (
|
||||
resolve_episode_destination as _resolve_episode_destination,
|
||||
)
|
||||
@@ -28,10 +26,11 @@ from alfred.application.filesystem.resolve_destination import (
|
||||
resolve_series_destination as _resolve_series_destination,
|
||||
)
|
||||
from alfred.infrastructure.filesystem import FileManager, create_folder, move
|
||||
from alfred.infrastructure.filesystem.ffprobe import probe
|
||||
from alfred.infrastructure.filesystem.find_video import find_video_file
|
||||
from alfred.infrastructure.metadata import MetadataStore
|
||||
from alfred.infrastructure.persistence import get_memory
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
_PROBER = FfprobeMediaProber()
|
||||
|
||||
_LEARNED_ROOT = Path(_alfred_pkg.__file__).parent.parent / "data" / "knowledge"
|
||||
|
||||
@@ -57,10 +56,11 @@ def resolve_season_destination(
|
||||
tmdb_title: str,
|
||||
tmdb_year: int,
|
||||
confirmed_folder: str | None = None,
|
||||
source_path: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/resolve_season_destination.yaml."""
|
||||
return _resolve_season_destination(
|
||||
release_name, tmdb_title, tmdb_year, confirmed_folder
|
||||
release_name, tmdb_title, tmdb_year, confirmed_folder, source_path
|
||||
).to_dict()
|
||||
|
||||
|
||||
@@ -100,10 +100,11 @@ def resolve_series_destination(
|
||||
tmdb_title: str,
|
||||
tmdb_year: int,
|
||||
confirmed_folder: str | None = None,
|
||||
source_path: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/resolve_series_destination.yaml."""
|
||||
return _resolve_series_destination(
|
||||
release_name, tmdb_title, tmdb_year, confirmed_folder
|
||||
release_name, tmdb_title, tmdb_year, confirmed_folder, source_path
|
||||
).to_dict()
|
||||
|
||||
|
||||
@@ -191,21 +192,10 @@ def set_path_for_folder(folder_name: str, path_value: str) -> dict[str, Any]:
|
||||
def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
|
||||
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/analyze_release.yaml."""
|
||||
from alfred.application.filesystem.resolve_destination import _KB # noqa: PLC0415
|
||||
from alfred.domain.release.services import parse_release # noqa: PLC0415
|
||||
|
||||
path = Path(source_path)
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
parsed.media_type = detect_media_type(parsed, path, _KB)
|
||||
|
||||
probe_used = False
|
||||
if parsed.media_type not in ("unknown", "other"):
|
||||
video_file = find_video_file(path, _KB)
|
||||
if video_file:
|
||||
media_info = probe(video_file)
|
||||
if media_info:
|
||||
enrich_from_probe(parsed, media_info)
|
||||
probe_used = True
|
||||
from alfred.application.release import inspect_release # noqa: PLC0415
|
||||
|
||||
result = inspect_release(release_name, Path(source_path), _KB, _PROBER)
|
||||
parsed = result.parsed
|
||||
return {
|
||||
"status": "ok",
|
||||
"media_type": parsed.media_type,
|
||||
@@ -227,7 +217,9 @@ def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
|
||||
"edition": parsed.edition,
|
||||
"site_tag": parsed.site_tag,
|
||||
"is_season_pack": parsed.is_season_pack,
|
||||
"probe_used": probe_used,
|
||||
"probe_used": result.probe_used,
|
||||
"confidence": result.report.confidence,
|
||||
"road": result.report.road,
|
||||
}
|
||||
|
||||
|
||||
@@ -241,7 +233,7 @@ def probe_media(source_path: str) -> dict[str, Any]:
|
||||
"message": f"{source_path} does not exist",
|
||||
}
|
||||
|
||||
media_info = probe(path)
|
||||
media_info = _PROBER.probe(path)
|
||||
if media_info is None:
|
||||
return {
|
||||
"status": "error",
|
||||
|
||||
@@ -80,3 +80,5 @@ returns:
|
||||
site_tag: Source-site tag if present.
|
||||
is_season_pack: True when the folder contains a full season.
|
||||
probe_used: True when ffprobe successfully enriched the result.
|
||||
confidence: Parser confidence score, 0–100 (higher = more reliable).
|
||||
road: "Parser road: 'easy' (group schema matched), 'shitty' (heuristic but acceptable), or 'path_of_pain' (low confidence — ask the user before auto-routing)."
|
||||
|
||||
@@ -61,6 +61,17 @@ parameters:
|
||||
one.
|
||||
example: Oz.1997.1080p.WEBRip.x265-KONTRAST
|
||||
|
||||
source_path:
|
||||
description: |
|
||||
Absolute path to the release folder on disk. Optional.
|
||||
why_needed: |
|
||||
When provided, the tool runs ffprobe on the main video inside the
|
||||
folder and uses the probe data to fill quality/codec tokens that
|
||||
may be missing from the release name. The enriched tech tokens
|
||||
end up in the destination folder name, so providing source_path
|
||||
gives more accurate names for releases with sparse metadata.
|
||||
example: /downloads/Oz.S03.1080p.WEBRip.x265-KONTRAST
|
||||
|
||||
returns:
|
||||
ok:
|
||||
description: Paths resolved unambiguously; ready to move.
|
||||
|
||||
@@ -56,6 +56,16 @@ parameters:
|
||||
Forces the use case to use this exact folder name and skip detection.
|
||||
example: The.Wire.2002.1080p.BluRay.x265-GROUP
|
||||
|
||||
source_path:
|
||||
description: |
|
||||
Absolute path to the release folder on disk. Optional.
|
||||
why_needed: |
|
||||
When provided, the tool runs ffprobe on the main video inside the
|
||||
folder and uses probe data to fill quality/codec tokens that may
|
||||
be missing from the release name, producing a more accurate
|
||||
destination folder name.
|
||||
example: /downloads/The.Wire.S01-S05.1080p.BluRay.x265-GROUP
|
||||
|
||||
returns:
|
||||
ok:
|
||||
description: Path resolved; ready to move the pack.
|
||||
|
||||
@@ -22,10 +22,13 @@ import logging
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.application.release import inspect_release
|
||||
from alfred.domain.release import parse_release
|
||||
from alfred.domain.release.ports import ReleaseKnowledge
|
||||
from alfred.domain.release.value_objects import ParsedRelease
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
from alfred.infrastructure.persistence import get_memory
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -33,6 +36,26 @@ logger = logging.getLogger(__name__)
|
||||
# Tests that need a custom KB can monkeypatch this attribute.
|
||||
_KB: ReleaseKnowledge = YamlReleaseKnowledge()
|
||||
|
||||
# Module-level prober — same singleton style as _KB. Tests that need a custom
|
||||
# adapter can monkeypatch this attribute.
|
||||
_PROBER = FfprobeMediaProber()
|
||||
|
||||
|
||||
def _resolve_parsed(release_name: str, source_path: str | None) -> ParsedRelease:
|
||||
"""Pick the right entry point depending on whether we have a path.
|
||||
|
||||
When ``source_path`` is provided and points to something that exists,
|
||||
we run the full inspection pipeline so probe data can refresh
|
||||
``tech_string`` (which feeds every filename builder). Otherwise we
|
||||
fall back to a parse-only path — same behavior as before.
|
||||
"""
|
||||
if source_path:
|
||||
path = Path(source_path)
|
||||
if path.exists():
|
||||
return inspect_release(release_name, path, _KB, _PROBER).parsed
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
return parsed
|
||||
|
||||
|
||||
def _find_existing_tvshow_folders(
|
||||
tv_root: Path, tmdb_title_safe: str, tmdb_year: int
|
||||
@@ -237,12 +260,17 @@ def resolve_season_destination(
|
||||
tmdb_title: str,
|
||||
tmdb_year: int,
|
||||
confirmed_folder: str | None = None,
|
||||
source_path: str | None = None,
|
||||
) -> ResolvedSeasonDestination:
|
||||
"""
|
||||
Compute destination paths for a season pack.
|
||||
|
||||
Returns series_folder + season_folder. No file paths — the whole
|
||||
source folder is moved as-is into season_folder.
|
||||
|
||||
When ``source_path`` points to the release on disk, the parser is
|
||||
augmented with ffprobe data so tech tokens missing from the release
|
||||
name (quality / codec) end up in the folder names.
|
||||
"""
|
||||
tv_root = _get_tv_root()
|
||||
if not tv_root:
|
||||
@@ -252,7 +280,7 @@ def resolve_season_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
parsed = _resolve_parsed(release_name, source_path)
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
|
||||
|
||||
@@ -293,6 +321,8 @@ def resolve_episode_destination(
|
||||
Compute destination paths for a single episode file.
|
||||
|
||||
Returns series_folder + season_folder + library_file (full path to .mkv).
|
||||
``source_file`` doubles as the inspection target — when it exists,
|
||||
ffprobe enrichment refreshes tech tokens missing from the release name.
|
||||
"""
|
||||
tv_root = _get_tv_root()
|
||||
if not tv_root:
|
||||
@@ -302,7 +332,7 @@ def resolve_episode_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
parsed = _resolve_parsed(release_name, source_file)
|
||||
ext = Path(source_file).suffix
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
tmdb_episode_title_safe = (
|
||||
@@ -350,6 +380,8 @@ def resolve_movie_destination(
|
||||
Compute destination paths for a movie file.
|
||||
|
||||
Returns movie_folder + library_file (full path to .mkv).
|
||||
``source_file`` doubles as the inspection target — when it exists,
|
||||
ffprobe enrichment refreshes tech tokens missing from the release name.
|
||||
"""
|
||||
memory = get_memory()
|
||||
movies_root = memory.ltm.library_paths.get("movie")
|
||||
@@ -360,7 +392,7 @@ def resolve_movie_destination(
|
||||
message="Movie library path is not configured.",
|
||||
)
|
||||
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
parsed = _resolve_parsed(release_name, source_file)
|
||||
ext = Path(source_file).suffix
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
|
||||
@@ -385,11 +417,15 @@ def resolve_series_destination(
|
||||
tmdb_title: str,
|
||||
tmdb_year: int,
|
||||
confirmed_folder: str | None = None,
|
||||
source_path: str | None = None,
|
||||
) -> ResolvedSeriesDestination:
|
||||
"""
|
||||
Compute destination path for a complete multi-season series pack.
|
||||
|
||||
Returns only series_folder — the whole pack lands directly inside it.
|
||||
|
||||
When ``source_path`` points to the release on disk, ffprobe
|
||||
enrichment refreshes tech tokens missing from the release name.
|
||||
"""
|
||||
tv_root = _get_tv_root()
|
||||
if not tv_root:
|
||||
@@ -399,7 +435,7 @@ def resolve_series_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
parsed = _resolve_parsed(release_name, source_path)
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
|
||||
|
||||
|
||||
@@ -1,11 +1,20 @@
|
||||
"""Release application layer — orchestrators sitting between domain
|
||||
parsing and infrastructure I/O.
|
||||
|
||||
Today it exposes the pre-pipeline exclusion helpers
|
||||
(:mod:`supported_media`). Phase C will add the ``inspect_release``
|
||||
orchestrator here.
|
||||
Public surface:
|
||||
|
||||
- :func:`is_supported_video` / :func:`find_main_video` — pre-pipeline
|
||||
filesystem helpers (extension-only filtering, top-level video pick).
|
||||
- :func:`inspect_release` / :class:`InspectedResult` — full inspection
|
||||
pipeline combining parse + filesystem refinement + probe enrichment.
|
||||
"""
|
||||
|
||||
from .inspect import InspectedResult, inspect_release
|
||||
from .supported_media import find_main_video, is_supported_video
|
||||
|
||||
__all__ = ["find_main_video", "is_supported_video"]
|
||||
__all__ = [
|
||||
"InspectedResult",
|
||||
"find_main_video",
|
||||
"inspect_release",
|
||||
"is_supported_video",
|
||||
]
|
||||
|
||||
+7
@@ -80,3 +80,10 @@ def enrich_from_probe(parsed: ParsedRelease, info: MediaInfo) -> None:
|
||||
for lang in info.audio_languages:
|
||||
if lang.lower() != "und" and lang.upper() not in existing:
|
||||
parsed.languages.append(lang)
|
||||
|
||||
# Re-derive tech_string so filename builders see the enriched
|
||||
# quality/source/codec. Built the same way as in the parser pipeline:
|
||||
# the non-None parts joined by dots, in order.
|
||||
parsed.tech_string = ".".join(
|
||||
p for p in (parsed.quality, parsed.source, parsed.codec) if p
|
||||
)
|
||||
@@ -0,0 +1,140 @@
|
||||
"""Release inspection orchestrator — the canonical "look at this thing"
|
||||
entry point.
|
||||
|
||||
``inspect_release`` is the single composition of the four layers we
|
||||
care about for a freshly-arrived release:
|
||||
|
||||
1. **Parse the name** — :func:`alfred.domain.release.services.parse_release`
|
||||
gives a ``ParsedRelease`` plus a ``ParseReport`` (confidence + road).
|
||||
2. **Pick the main video** — :func:`find_main_video` runs a top-level
|
||||
scan over the source path. If nothing qualifies the result still
|
||||
completes; downstream callers decide what to do with a videoless
|
||||
release.
|
||||
3. **Refine the media type** — :func:`detect_media_type` uses the
|
||||
on-disk extension mix to override any token-level guess (e.g. a
|
||||
bare ``.iso`` folder becomes ``"other"``). The refined value is
|
||||
patched onto ``parsed`` in place — same convention as
|
||||
``analyze_release`` had before.
|
||||
4. **Probe the video** — the injected :class:`MediaProber` fills in
|
||||
missing technical fields via :func:`enrich_from_probe`. Skipped
|
||||
when there is no main video or when ``media_type`` ended up in
|
||||
``{"unknown", "other"}`` (the probe would tell us nothing useful).
|
||||
|
||||
The return type is :class:`InspectedResult`, a frozen VO that bundles
|
||||
everything downstream callers need (``analyze_release`` tool,
|
||||
``resolve_destination``, future workflow stages) without forcing them
|
||||
to redo the same four calls.
|
||||
|
||||
Design notes:
|
||||
|
||||
- **Application layer.** This module touches both domain
|
||||
(``parse_release``) and infrastructure (``MediaProber`` port). That
|
||||
is exactly application's job — orchestrate.
|
||||
- **Knowledge base is injected.** ``inspect_release`` takes ``kb`` and
|
||||
``prober`` as parameters; no module-level singletons here. Callers
|
||||
(the tool wrapper, tests) decide what to plug in.
|
||||
- **Mutation is contained.** We still mutate ``parsed.media_type`` and
|
||||
let ``enrich_from_probe`` fill its ``None`` fields, because
|
||||
``ParsedRelease`` is intentionally a mutable dataclass. The outer
|
||||
``InspectedResult`` is frozen so the *bundle* is immutable from the
|
||||
caller's perspective.
|
||||
- **Never raises.** Filesystem / probe errors surface as ``None``
|
||||
fields on the result, never as exceptions — same contract as the
|
||||
underlying adapters.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.application.release.detect_media_type import detect_media_type
|
||||
from alfred.application.release.enrich_from_probe import enrich_from_probe
|
||||
from alfred.application.release.supported_media import find_main_video
|
||||
from alfred.domain.release.ports import ReleaseKnowledge
|
||||
from alfred.domain.release.services import parse_release
|
||||
from alfred.domain.release.value_objects import ParsedRelease, ParseReport
|
||||
from alfred.domain.shared.media import MediaInfo
|
||||
from alfred.domain.shared.ports import MediaProber
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class InspectedResult:
|
||||
"""The full picture of a release: parsed name + filesystem reality.
|
||||
|
||||
Bundles everything the downstream pipeline needs after a single
|
||||
inspection pass:
|
||||
|
||||
- ``parsed`` — :class:`ParsedRelease`, with ``media_type`` already
|
||||
refined by :func:`detect_media_type` and ``None`` tech fields
|
||||
filled in by :func:`enrich_from_probe` when a probe ran.
|
||||
- ``report`` — :class:`ParseReport` from the parser (confidence +
|
||||
road, untouched by inspection).
|
||||
- ``source_path`` — the path the inspector was pointed at (file or
|
||||
folder), as supplied by the caller.
|
||||
- ``main_video`` — the canonical video file inside ``source_path``,
|
||||
or ``None`` if no eligible file was found.
|
||||
- ``media_info`` — the :class:`MediaInfo` snapshot when a probe
|
||||
succeeded; ``None`` when no video was probed (no main video, or
|
||||
``media_type`` in ``{"unknown", "other"}``) or when ffprobe
|
||||
failed.
|
||||
- ``probe_used`` — ``True`` iff ``media_info`` is non-``None`` and
|
||||
``enrich_from_probe`` actually ran. Explicit flag so callers
|
||||
don't have to re-derive the condition.
|
||||
"""
|
||||
|
||||
parsed: ParsedRelease
|
||||
report: ParseReport
|
||||
source_path: Path
|
||||
main_video: Path | None
|
||||
media_info: MediaInfo | None
|
||||
probe_used: bool
|
||||
|
||||
|
||||
# Media types for which a probe carries no useful information.
|
||||
_NON_PROBABLE_MEDIA_TYPES = frozenset({"unknown", "other"})
|
||||
|
||||
|
||||
def inspect_release(
|
||||
release_name: str,
|
||||
source_path: Path,
|
||||
kb: ReleaseKnowledge,
|
||||
prober: MediaProber,
|
||||
) -> InspectedResult:
|
||||
"""Run the full inspection pipeline on ``release_name`` /
|
||||
``source_path``.
|
||||
|
||||
See module docstring for the four-step flow. ``kb`` and ``prober``
|
||||
are injected so the caller controls the knowledge base layering
|
||||
and the probe adapter (real ffprobe in production, stubs in tests).
|
||||
|
||||
Never raises. A missing or unreadable ``source_path`` simply
|
||||
results in ``main_video=None`` and ``media_info=None``.
|
||||
"""
|
||||
parsed, report = parse_release(release_name, kb)
|
||||
|
||||
# Step 2: refine media_type from the on-disk extension mix.
|
||||
# detect_media_type tolerates non-existent paths (returns parsed.media_type
|
||||
# untouched), so no need to guard here.
|
||||
parsed.media_type = detect_media_type(parsed, source_path, kb)
|
||||
|
||||
# Step 3: pick the canonical main video (top-level scan only).
|
||||
main_video = find_main_video(source_path, kb)
|
||||
|
||||
# Step 4: probe + enrich, when it makes sense.
|
||||
media_info: MediaInfo | None = None
|
||||
probe_used = False
|
||||
if main_video is not None and parsed.media_type not in _NON_PROBABLE_MEDIA_TYPES:
|
||||
media_info = prober.probe(main_video)
|
||||
if media_info is not None:
|
||||
enrich_from_probe(parsed, media_info)
|
||||
probe_used = True
|
||||
|
||||
return InspectedResult(
|
||||
parsed=parsed,
|
||||
report=report,
|
||||
source_path=source_path,
|
||||
main_video=main_video,
|
||||
media_info=media_info,
|
||||
probe_used=probe_used,
|
||||
)
|
||||
@@ -9,7 +9,10 @@ from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Protocol
|
||||
from typing import TYPE_CHECKING, Protocol
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from alfred.domain.shared.media import MediaInfo
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -37,3 +40,13 @@ class MediaProber(Protocol):
|
||||
no subtitle streams. Adapters must not raise.
|
||||
"""
|
||||
...
|
||||
|
||||
def probe(self, video: Path) -> MediaInfo | None:
|
||||
"""Return the full :class:`MediaInfo` for ``video``, or ``None``.
|
||||
|
||||
Covers all stream families (video, audio, subtitle) plus
|
||||
file-level duration / bitrate. ``None`` signals that ffprobe is
|
||||
unavailable or the file can't be read — adapters must not
|
||||
raise.
|
||||
"""
|
||||
...
|
||||
|
||||
@@ -1,121 +0,0 @@
|
||||
"""ffprobe — infrastructure adapter for extracting MediaInfo from a video file."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.shared.media import AudioTrack, MediaInfo, SubtitleTrack, VideoTrack
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_FFPROBE_CMD = [
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"quiet",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_streams",
|
||||
"-show_format",
|
||||
]
|
||||
|
||||
|
||||
def probe(path: Path) -> MediaInfo | None:
|
||||
"""
|
||||
Run ffprobe on path and return a MediaInfo.
|
||||
|
||||
Returns None if ffprobe is not available or the file cannot be probed.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[*_FFPROBE_CMD, str(path)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
check=False,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning("ffprobe timed out on %s", path)
|
||||
return None
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.warning("ffprobe failed on %s: %s", path, result.stderr.strip())
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(result.stdout)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("ffprobe returned invalid JSON for %s", path)
|
||||
return None
|
||||
|
||||
return _parse(data)
|
||||
|
||||
|
||||
def _parse(data: dict) -> MediaInfo:
|
||||
streams = data.get("streams", [])
|
||||
fmt = data.get("format", {})
|
||||
|
||||
# File-level duration/bitrate (ffprobe ``format`` block — independent of streams)
|
||||
duration_seconds: float | None = None
|
||||
bitrate_kbps: int | None = None
|
||||
if "duration" in fmt:
|
||||
try:
|
||||
duration_seconds = float(fmt["duration"])
|
||||
except ValueError:
|
||||
pass
|
||||
if "bit_rate" in fmt:
|
||||
try:
|
||||
bitrate_kbps = int(fmt["bit_rate"]) // 1000
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
video_tracks: list[VideoTrack] = []
|
||||
audio_tracks: list[AudioTrack] = []
|
||||
subtitle_tracks: list[SubtitleTrack] = []
|
||||
|
||||
for stream in streams:
|
||||
codec_type = stream.get("codec_type")
|
||||
|
||||
if codec_type == "video":
|
||||
video_tracks.append(
|
||||
VideoTrack(
|
||||
index=stream.get("index", len(video_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
width=stream.get("width"),
|
||||
height=stream.get("height"),
|
||||
is_default=stream.get("disposition", {}).get("default", 0) == 1,
|
||||
)
|
||||
)
|
||||
|
||||
elif codec_type == "audio":
|
||||
audio_tracks.append(
|
||||
AudioTrack(
|
||||
index=stream.get("index", len(audio_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
channels=stream.get("channels"),
|
||||
channel_layout=stream.get("channel_layout"),
|
||||
language=stream.get("tags", {}).get("language"),
|
||||
is_default=stream.get("disposition", {}).get("default", 0) == 1,
|
||||
)
|
||||
)
|
||||
|
||||
elif codec_type == "subtitle":
|
||||
subtitle_tracks.append(
|
||||
SubtitleTrack(
|
||||
index=stream.get("index", len(subtitle_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
language=stream.get("tags", {}).get("language"),
|
||||
is_default=stream.get("disposition", {}).get("default", 0) == 1,
|
||||
is_forced=stream.get("disposition", {}).get("forced", 0) == 1,
|
||||
)
|
||||
)
|
||||
|
||||
return MediaInfo(
|
||||
video_tracks=tuple(video_tracks),
|
||||
audio_tracks=tuple(audio_tracks),
|
||||
subtitle_tracks=tuple(subtitle_tracks),
|
||||
duration_seconds=duration_seconds,
|
||||
bitrate_kbps=bitrate_kbps,
|
||||
)
|
||||
@@ -7,12 +7,23 @@ import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.shared.media import AudioTrack, MediaInfo, SubtitleTrack, VideoTrack
|
||||
from alfred.domain.shared.ports import SubtitleStreamInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_FFPROBE_TIMEOUT_SECONDS = 30
|
||||
|
||||
_FFPROBE_FULL_CMD = [
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"quiet",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_streams",
|
||||
"-show_format",
|
||||
]
|
||||
|
||||
|
||||
class FfprobeMediaProber:
|
||||
"""Inspect media files by shelling out to ``ffprobe``.
|
||||
@@ -63,3 +74,101 @@ class FfprobeMediaProber:
|
||||
)
|
||||
)
|
||||
return streams
|
||||
|
||||
def probe(self, video: Path) -> MediaInfo | None:
|
||||
"""Run ffprobe on ``video`` and return a :class:`MediaInfo`.
|
||||
|
||||
Returns ``None`` when ffprobe is not available, times out, or
|
||||
the file cannot be parsed. Never raises.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[*_FFPROBE_FULL_CMD, str(video)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=_FFPROBE_TIMEOUT_SECONDS,
|
||||
check=False,
|
||||
)
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError) as e:
|
||||
logger.warning("ffprobe failed on %s: %s", video, e)
|
||||
return None
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.warning("ffprobe failed on %s: %s", video, result.stderr.strip())
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(result.stdout)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("ffprobe returned invalid JSON for %s", video)
|
||||
return None
|
||||
|
||||
return _parse_media_info(data)
|
||||
|
||||
|
||||
def _parse_media_info(data: dict) -> MediaInfo:
|
||||
"""Translate raw ffprobe JSON into a :class:`MediaInfo` snapshot."""
|
||||
streams = data.get("streams", [])
|
||||
fmt = data.get("format", {})
|
||||
|
||||
duration_seconds: float | None = None
|
||||
bitrate_kbps: int | None = None
|
||||
if "duration" in fmt:
|
||||
try:
|
||||
duration_seconds = float(fmt["duration"])
|
||||
except ValueError:
|
||||
pass
|
||||
if "bit_rate" in fmt:
|
||||
try:
|
||||
bitrate_kbps = int(fmt["bit_rate"]) // 1000
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
video_tracks: list[VideoTrack] = []
|
||||
audio_tracks: list[AudioTrack] = []
|
||||
subtitle_tracks: list[SubtitleTrack] = []
|
||||
|
||||
for stream in streams:
|
||||
codec_type = stream.get("codec_type")
|
||||
|
||||
if codec_type == "video":
|
||||
video_tracks.append(
|
||||
VideoTrack(
|
||||
index=stream.get("index", len(video_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
width=stream.get("width"),
|
||||
height=stream.get("height"),
|
||||
is_default=stream.get("disposition", {}).get("default", 0) == 1,
|
||||
)
|
||||
)
|
||||
|
||||
elif codec_type == "audio":
|
||||
audio_tracks.append(
|
||||
AudioTrack(
|
||||
index=stream.get("index", len(audio_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
channels=stream.get("channels"),
|
||||
channel_layout=stream.get("channel_layout"),
|
||||
language=stream.get("tags", {}).get("language"),
|
||||
is_default=stream.get("disposition", {}).get("default", 0) == 1,
|
||||
)
|
||||
)
|
||||
|
||||
elif codec_type == "subtitle":
|
||||
subtitle_tracks.append(
|
||||
SubtitleTrack(
|
||||
index=stream.get("index", len(subtitle_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
language=stream.get("tags", {}).get("language"),
|
||||
is_default=stream.get("disposition", {}).get("default", 0) == 1,
|
||||
is_forced=stream.get("disposition", {}).get("forced", 0) == 1,
|
||||
)
|
||||
)
|
||||
|
||||
return MediaInfo(
|
||||
video_tracks=tuple(video_tracks),
|
||||
audio_tracks=tuple(audio_tracks),
|
||||
subtitle_tracks=tuple(subtitle_tracks),
|
||||
duration_seconds=duration_seconds,
|
||||
bitrate_kbps=bitrate_kbps,
|
||||
)
|
||||
|
||||
@@ -88,13 +88,13 @@ def analyze(release_name: str, source_path: str | None = None) -> None:
|
||||
if not path.exists():
|
||||
print(" (chemin inexistant, probe skipped)")
|
||||
else:
|
||||
from alfred.infrastructure.filesystem.ffprobe import probe
|
||||
from alfred.infrastructure.filesystem.find_video import find_video_file
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
video = find_video_file(path) if path.is_dir() else path
|
||||
if video:
|
||||
print(f" video file: {video.name}")
|
||||
info = probe(video)
|
||||
info = FfprobeMediaProber().probe(video)
|
||||
if info:
|
||||
print(f" codec: {info.video_codec}")
|
||||
print(f" resolution: {info.resolution}")
|
||||
|
||||
@@ -98,9 +98,9 @@ def main() -> None:
|
||||
print(c(f"Error: {path} does not exist", RED), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
from alfred.infrastructure.filesystem.ffprobe import probe
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
info = probe(path)
|
||||
info = FfprobeMediaProber().probe(path)
|
||||
if info is None:
|
||||
print(c("Error: ffprobe failed to probe the file", RED), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
@@ -100,11 +100,13 @@ def main() -> None:
|
||||
print(c(f"Error: {downloads} does not exist", RED), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
from alfred.application.filesystem.detect_media_type import detect_media_type
|
||||
from alfred.application.filesystem.enrich_from_probe import enrich_from_probe
|
||||
from alfred.application.release.detect_media_type import detect_media_type
|
||||
from alfred.application.release.enrich_from_probe import enrich_from_probe
|
||||
from alfred.domain.release.services import parse_release
|
||||
from alfred.infrastructure.filesystem.ffprobe import probe
|
||||
from alfred.infrastructure.filesystem.find_video import find_video_file
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
_prober = FfprobeMediaProber()
|
||||
|
||||
entries = sorted(downloads.iterdir(), key=lambda p: p.name.lower())
|
||||
total = len(entries)
|
||||
@@ -126,7 +128,7 @@ def main() -> None:
|
||||
if p.media_type not in ("unknown", "other"):
|
||||
video_file = find_video_file(entry)
|
||||
if video_file:
|
||||
media_info = probe(video_file)
|
||||
media_info = _prober.probe(video_file)
|
||||
if media_info:
|
||||
enrich_from_probe(p, media_info)
|
||||
warnings = _assess(p)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Tests for ``alfred.application.filesystem.detect_media_type``.
|
||||
"""Tests for ``alfred.application.release.detect_media_type``.
|
||||
|
||||
The function refines a ``ParsedRelease.media_type`` using filesystem evidence.
|
||||
|
||||
@@ -18,7 +18,7 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from alfred.application.filesystem.detect_media_type import detect_media_type
|
||||
from alfred.application.release.detect_media_type import detect_media_type
|
||||
from alfred.domain.release.services import parse_release
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Tests for ``alfred.application.filesystem.enrich_from_probe``.
|
||||
"""Tests for ``alfred.application.release.enrich_from_probe``.
|
||||
|
||||
The function mutates a ``ParsedRelease`` in place using ffprobe ``MediaInfo``.
|
||||
Token-level values from the release name always win — only ``None`` fields
|
||||
@@ -18,7 +18,7 @@ Uses real ``ParsedRelease`` / ``MediaInfo`` instances — no mocking needed.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from alfred.application.filesystem.enrich_from_probe import enrich_from_probe
|
||||
from alfred.application.release.enrich_from_probe import enrich_from_probe
|
||||
from alfred.domain.release.value_objects import ParsedRelease
|
||||
from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack
|
||||
|
||||
@@ -210,3 +210,42 @@ class TestLanguages:
|
||||
p = _bare()
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
assert p.languages == []
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# tech_string #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestTechString:
|
||||
"""tech_string drives the filename builders; it must be re-derived
|
||||
whenever quality / source / codec change."""
|
||||
|
||||
def test_rebuilt_from_filled_quality_and_codec(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(
|
||||
p, _info_with_video(width=1920, height=1080, codec="hevc")
|
||||
)
|
||||
assert p.quality == "1080p"
|
||||
assert p.codec == "x265"
|
||||
assert p.tech_string == "1080p.x265"
|
||||
|
||||
def test_keeps_existing_source_when_enriching(self):
|
||||
# Token-level source must stay; probe fills only None fields.
|
||||
p = _bare(source="BluRay")
|
||||
enrich_from_probe(
|
||||
p, _info_with_video(width=1920, height=1080, codec="hevc")
|
||||
)
|
||||
assert p.tech_string == "1080p.BluRay.x265"
|
||||
|
||||
def test_unchanged_when_no_enrichable_video_info(self):
|
||||
# No video info → nothing to fill → tech_string stays as it was.
|
||||
p = _bare(quality="2160p", source="WEB-DL", codec="x265")
|
||||
p.tech_string = "2160p.WEB-DL.x265"
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
assert p.tech_string == "2160p.WEB-DL.x265"
|
||||
|
||||
def test_empty_when_nothing_known(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
assert p.tech_string == ""
|
||||
|
||||
@@ -0,0 +1,265 @@
|
||||
"""Tests for the ``inspect_release`` orchestrator (Phase C).
|
||||
|
||||
Covers the four composition steps as a black box: a real
|
||||
``YamlReleaseKnowledge``, real on-disk filesystem under ``tmp_path``,
|
||||
and a stubbed ``MediaProber`` so we don't depend on a system ``ffprobe``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.application.release import InspectedResult, inspect_release
|
||||
from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
|
||||
_KB = YamlReleaseKnowledge()
|
||||
|
||||
_MOVIE_NAME = "Inception.2010.1080p.BluRay.x264-GROUP"
|
||||
_TV_NAME = "Dexter.S01E01.1080p.WEB-DL.x264-GROUP"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Test doubles #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class _StubProber:
|
||||
"""Minimal MediaProber stub. Records the path it was asked to probe."""
|
||||
|
||||
def __init__(self, info: MediaInfo | None) -> None:
|
||||
self._info = info
|
||||
self.calls: list[Path] = []
|
||||
|
||||
def list_subtitle_streams(self, video: Path): # pragma: no cover - unused here
|
||||
return []
|
||||
|
||||
def probe(self, video: Path) -> MediaInfo | None:
|
||||
self.calls.append(video)
|
||||
return self._info
|
||||
|
||||
|
||||
class _RaisingProber:
|
||||
"""A prober that would explode if called — used to assert no probe."""
|
||||
|
||||
def list_subtitle_streams(self, video: Path): # pragma: no cover
|
||||
raise AssertionError("list_subtitle_streams must not be called")
|
||||
|
||||
def probe(self, video: Path): # pragma: no cover
|
||||
raise AssertionError("probe must not be called")
|
||||
|
||||
|
||||
def _media_info_1080p_h264() -> MediaInfo:
|
||||
return MediaInfo(
|
||||
video_tracks=(VideoTrack(index=0, codec="h264", width=1920, height=1080),),
|
||||
audio_tracks=(
|
||||
AudioTrack(
|
||||
index=1,
|
||||
codec="ac3",
|
||||
channels=6,
|
||||
channel_layout="5.1",
|
||||
language="eng",
|
||||
is_default=True,
|
||||
),
|
||||
),
|
||||
subtitle_tracks=(),
|
||||
duration_seconds=7200.0,
|
||||
bitrate_kbps=8000,
|
||||
)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Happy paths #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestInspectMovieFolder:
|
||||
def test_returns_inspected_result_with_all_fields(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
video = folder / "movie.mkv"
|
||||
video.write_bytes(b"")
|
||||
prober = _StubProber(_media_info_1080p_h264())
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
||||
|
||||
assert isinstance(result, InspectedResult)
|
||||
assert result.source_path == folder
|
||||
assert result.main_video == video
|
||||
assert result.media_info is not None
|
||||
assert result.probe_used is True
|
||||
assert prober.calls == [video]
|
||||
|
||||
def test_parsed_carries_token_level_fields(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
prober = _StubProber(_media_info_1080p_h264())
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
||||
|
||||
assert result.parsed.title.lower().startswith("inception")
|
||||
assert result.parsed.year == 2010
|
||||
assert result.parsed.group == "GROUP"
|
||||
assert result.parsed.media_type == "movie"
|
||||
|
||||
def test_report_has_confidence_and_road(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
prober = _StubProber(None)
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
||||
|
||||
assert 0 <= result.report.confidence <= 100
|
||||
assert result.report.road in ("easy", "shitty", "path_of_pain")
|
||||
|
||||
|
||||
class TestInspectSingleFile:
|
||||
def test_file_is_its_own_main_video(self, tmp_path: Path) -> None:
|
||||
f = tmp_path / f"{_MOVIE_NAME}.mkv"
|
||||
f.write_bytes(b"")
|
||||
prober = _StubProber(_media_info_1080p_h264())
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, f, _KB, prober)
|
||||
|
||||
assert result.main_video == f
|
||||
assert result.probe_used is True
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Probe-gating logic #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestProbeGating:
|
||||
def test_no_video_means_no_probe(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
# Only a non-video file present.
|
||||
(folder / "readme.txt").write_text("hi")
|
||||
prober = _RaisingProber()
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
||||
|
||||
assert result.main_video is None
|
||||
assert result.media_info is None
|
||||
assert result.probe_used is False
|
||||
|
||||
def test_media_type_other_means_no_probe(self, tmp_path: Path) -> None:
|
||||
# An ISO-only folder gets detect_media_type → "other".
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "disc.iso").write_bytes(b"")
|
||||
prober = _RaisingProber()
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
||||
|
||||
assert result.parsed.media_type == "other"
|
||||
assert result.media_info is None
|
||||
assert result.probe_used is False
|
||||
|
||||
def test_probe_failure_keeps_probe_used_false(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
prober = _StubProber(None) # ffprobe simulated as failing
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
||||
|
||||
assert result.main_video is not None
|
||||
assert result.media_info is None
|
||||
assert result.probe_used is False
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Mutation contract #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestMutationContract:
|
||||
def test_detect_media_type_refines_parsed(self, tmp_path: Path) -> None:
|
||||
# Release name parses to "movie", but folder mixes video + non_video
|
||||
# (e.g. an ISO sitting next to an mkv) → detect_media_type returns
|
||||
# "unknown", which is in _NON_PROBABLE_MEDIA_TYPES → no probe.
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
(folder / "extras.iso").write_bytes(b"")
|
||||
prober = _RaisingProber()
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
||||
|
||||
assert result.parsed.media_type == "unknown"
|
||||
assert result.probe_used is False
|
||||
|
||||
def test_enrich_runs_when_probe_succeeds(self, tmp_path: Path) -> None:
|
||||
# Build a release name with no codec; probe should fill it in.
|
||||
name = "Inception.2010.1080p.BluRay-GROUP"
|
||||
folder = tmp_path / name
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
prober = _StubProber(_media_info_1080p_h264())
|
||||
|
||||
result = inspect_release(name, folder, _KB, prober)
|
||||
|
||||
assert result.probe_used is True
|
||||
# enrich_from_probe should have filled the missing codec field.
|
||||
assert result.parsed.codec is not None
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Resilience #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestResilience:
|
||||
def test_nonexistent_path_does_not_raise(self, tmp_path: Path) -> None:
|
||||
ghost = tmp_path / "does-not-exist"
|
||||
prober = _RaisingProber()
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, ghost, _KB, prober)
|
||||
|
||||
assert result.main_video is None
|
||||
assert result.media_info is None
|
||||
assert result.probe_used is False
|
||||
|
||||
def test_tv_release_inspection(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _TV_NAME
|
||||
folder.mkdir()
|
||||
video = folder / "episode.mkv"
|
||||
video.write_bytes(b"")
|
||||
prober = _StubProber(_media_info_1080p_h264())
|
||||
|
||||
result = inspect_release(_TV_NAME, folder, _KB, prober)
|
||||
|
||||
assert result.parsed.media_type == "tv_show"
|
||||
assert result.parsed.season == 1
|
||||
assert result.parsed.episode == 1
|
||||
assert result.main_video == video
|
||||
assert result.probe_used is True
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Frozen contract #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestFrozen:
|
||||
def test_inspected_result_is_frozen(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
prober = _StubProber(None)
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, prober)
|
||||
|
||||
# frozen=True → assigning a field raises FrozenInstanceError.
|
||||
import dataclasses
|
||||
|
||||
try:
|
||||
result.probe_used = True # type: ignore[misc]
|
||||
except dataclasses.FrozenInstanceError:
|
||||
pass
|
||||
else: # pragma: no cover
|
||||
raise AssertionError("InspectedResult should be frozen")
|
||||
@@ -322,6 +322,104 @@ class TestSeries:
|
||||
assert out.status == "needs_clarification"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Probe enrichment wiring #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class _StubProber:
|
||||
"""Minimal MediaProber stub used to drive enrich_from_probe."""
|
||||
|
||||
def __init__(self, info):
|
||||
self._info = info
|
||||
|
||||
def list_subtitle_streams(self, video): # pragma: no cover - unused here
|
||||
return []
|
||||
|
||||
def probe(self, video):
|
||||
return self._info
|
||||
|
||||
|
||||
def _stereo_movie_info():
|
||||
"""A MediaInfo that fills quality+codec when the release name omits them."""
|
||||
from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack
|
||||
|
||||
return MediaInfo(
|
||||
video_tracks=(VideoTrack(index=0, codec="hevc", width=1920, height=1080),),
|
||||
audio_tracks=(
|
||||
AudioTrack(
|
||||
index=1,
|
||||
codec="aac",
|
||||
channels=2,
|
||||
channel_layout="stereo",
|
||||
language="eng",
|
||||
is_default=True,
|
||||
),
|
||||
),
|
||||
subtitle_tracks=(),
|
||||
)
|
||||
|
||||
|
||||
class TestProbeEnrichmentWiring:
|
||||
"""When source_path/source_file points to a real file, the resolver
|
||||
should pick up ffprobe data via inspect_release and let the enriched
|
||||
tech_string land in the destination name."""
|
||||
|
||||
def test_movie_picks_up_probe_quality(
|
||||
self, cfg_memory, tmp_path, monkeypatch
|
||||
):
|
||||
from alfred.application.filesystem import resolve_destination as rd
|
||||
|
||||
monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info()))
|
||||
# Release name parses to "movie" but is missing the quality token;
|
||||
# probe must supply 1080p and refresh tech_string.
|
||||
bare_name = "Inception.2010.BluRay.x264-GROUP"
|
||||
video = tmp_path / "movie.mkv"
|
||||
video.write_bytes(b"")
|
||||
|
||||
out = resolve_movie_destination(bare_name, str(video), "Inception", 2010)
|
||||
|
||||
assert out.status == "ok"
|
||||
# tech_string -> "1080p.BluRay.x264" -> "1080p" shows up in names.
|
||||
assert "1080p" in out.movie_folder_name
|
||||
assert "1080p" in out.filename
|
||||
|
||||
def test_movie_skips_probe_when_path_missing(self, cfg_memory, monkeypatch):
|
||||
# If the file doesn't exist, no probe runs (the stub would have
|
||||
# injected 1080p — its absence proves the skip).
|
||||
from alfred.application.filesystem import resolve_destination as rd
|
||||
|
||||
monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info()))
|
||||
out = resolve_movie_destination(
|
||||
"Inception.2010.BluRay.x264-GROUP",
|
||||
"/nowhere/m.mkv",
|
||||
"Inception",
|
||||
2010,
|
||||
)
|
||||
assert out.status == "ok"
|
||||
assert "1080p" not in out.movie_folder_name
|
||||
|
||||
def test_season_picks_up_probe_via_source_path(
|
||||
self, cfg_memory, tmp_path, monkeypatch
|
||||
):
|
||||
from alfred.application.filesystem import resolve_destination as rd
|
||||
|
||||
monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info()))
|
||||
# Season pack name missing quality token; probe must add it.
|
||||
bare_name = "Oz.S03.BluRay.x265-KONTRAST"
|
||||
release_dir = tmp_path / bare_name
|
||||
release_dir.mkdir()
|
||||
(release_dir / "episode.mkv").write_bytes(b"")
|
||||
|
||||
out = resolve_season_destination(
|
||||
bare_name, "Oz", 1997, source_path=str(release_dir)
|
||||
)
|
||||
|
||||
assert out.status == "ok"
|
||||
# Series folder name embeds tech_string -> "1080p" surfaced by probe.
|
||||
assert "1080p" in out.series_folder_name
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# DTO to_dict() #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
@@ -0,0 +1,155 @@
|
||||
"""Tests for :class:`FfprobeMediaProber`.
|
||||
|
||||
Covers the full-probe path (``probe()`` returning a ``MediaInfo``) by
|
||||
patching ``subprocess.run`` at the adapter module level. The
|
||||
subtitle-streams path is exercised by the subtitle domain tests via
|
||||
the same adapter.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
_PROBER = FfprobeMediaProber()
|
||||
_PATCH_TARGET = "alfred.infrastructure.probe.ffprobe_prober.subprocess.run"
|
||||
|
||||
|
||||
def _ffprobe_result(returncode=0, stdout="{}", stderr="") -> MagicMock:
|
||||
return MagicMock(returncode=returncode, stdout=stdout, stderr=stderr)
|
||||
|
||||
|
||||
class TestProbe:
|
||||
def test_timeout_returns_none(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
with patch(
|
||||
_PATCH_TARGET,
|
||||
side_effect=subprocess.TimeoutExpired(cmd="ffprobe", timeout=30),
|
||||
):
|
||||
assert _PROBER.probe(f) is None
|
||||
|
||||
def test_nonzero_returncode_returns_none(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
with patch(
|
||||
_PATCH_TARGET,
|
||||
return_value=_ffprobe_result(returncode=1, stderr="not a media file"),
|
||||
):
|
||||
assert _PROBER.probe(f) is None
|
||||
|
||||
def test_invalid_json_returns_none(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
with patch(
|
||||
_PATCH_TARGET,
|
||||
return_value=_ffprobe_result(stdout="not json {"),
|
||||
):
|
||||
assert _PROBER.probe(f) is None
|
||||
|
||||
def test_parses_format_duration_and_bitrate(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
payload = {
|
||||
"format": {"duration": "1234.5", "bit_rate": "5000000"},
|
||||
"streams": [],
|
||||
}
|
||||
with patch(
|
||||
_PATCH_TARGET,
|
||||
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
||||
):
|
||||
info = _PROBER.probe(f)
|
||||
assert info is not None
|
||||
assert info.duration_seconds == 1234.5
|
||||
assert info.bitrate_kbps == 5000 # bit_rate // 1000
|
||||
|
||||
def test_invalid_numeric_format_fields_skipped(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
payload = {
|
||||
"format": {"duration": "garbage", "bit_rate": "also-bad"},
|
||||
"streams": [],
|
||||
}
|
||||
with patch(
|
||||
_PATCH_TARGET,
|
||||
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
||||
):
|
||||
info = _PROBER.probe(f)
|
||||
assert info is not None
|
||||
assert info.duration_seconds is None
|
||||
assert info.bitrate_kbps is None
|
||||
|
||||
def test_parses_streams(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
payload = {
|
||||
"format": {},
|
||||
"streams": [
|
||||
{
|
||||
"index": 0,
|
||||
"codec_type": "video",
|
||||
"codec_name": "h264",
|
||||
"width": 1920,
|
||||
"height": 1080,
|
||||
},
|
||||
{
|
||||
"index": 1,
|
||||
"codec_type": "audio",
|
||||
"codec_name": "ac3",
|
||||
"channels": 6,
|
||||
"channel_layout": "5.1",
|
||||
"tags": {"language": "eng"},
|
||||
"disposition": {"default": 1},
|
||||
},
|
||||
{
|
||||
"index": 2,
|
||||
"codec_type": "audio",
|
||||
"codec_name": "aac",
|
||||
"channels": 2,
|
||||
"tags": {"language": "fra"},
|
||||
},
|
||||
{
|
||||
"index": 3,
|
||||
"codec_type": "subtitle",
|
||||
"codec_name": "subrip",
|
||||
"tags": {"language": "fra"},
|
||||
"disposition": {"forced": 1},
|
||||
},
|
||||
],
|
||||
}
|
||||
with patch(
|
||||
_PATCH_TARGET,
|
||||
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
||||
):
|
||||
info = _PROBER.probe(f)
|
||||
assert info.video_codec == "h264"
|
||||
assert info.width == 1920 and info.height == 1080
|
||||
assert len(info.audio_tracks) == 2
|
||||
eng = info.audio_tracks[0]
|
||||
assert eng.language == "eng"
|
||||
assert eng.is_default is True
|
||||
assert info.audio_tracks[1].is_default is False
|
||||
assert len(info.subtitle_tracks) == 1
|
||||
assert info.subtitle_tracks[0].is_forced is True
|
||||
|
||||
def test_first_video_stream_wins(self, tmp_path):
|
||||
# The implementation only fills video_codec on the FIRST video stream.
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
payload = {
|
||||
"format": {},
|
||||
"streams": [
|
||||
{"codec_type": "video", "codec_name": "h264", "width": 1920},
|
||||
{"codec_type": "video", "codec_name": "hevc", "width": 3840},
|
||||
],
|
||||
}
|
||||
with patch(
|
||||
_PATCH_TARGET,
|
||||
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
||||
):
|
||||
info = _PROBER.probe(f)
|
||||
assert info.video_codec == "h264"
|
||||
assert info.width == 1920
|
||||
@@ -1,21 +1,19 @@
|
||||
"""Tests for the smaller ``alfred.infrastructure.filesystem`` helpers.
|
||||
|
||||
Covers four siblings of ``FileManager`` that had near-zero coverage:
|
||||
Covers three siblings of ``FileManager`` that had near-zero coverage:
|
||||
|
||||
- ``ffprobe.probe`` — wraps ``ffprobe`` JSON output into a ``MediaInfo``.
|
||||
- ``filesystem_operations.create_folder`` / ``move`` — thin
|
||||
``mkdir`` / ``mv`` wrappers returning dict-shaped responses.
|
||||
- ``organizer.MediaOrganizer`` — computes destination paths for movies
|
||||
and TV episodes; creates folders for them.
|
||||
- ``find_video.find_video_file`` — first-video lookup in a folder.
|
||||
|
||||
External commands (``ffprobe`` / ``mv``) are patched via ``subprocess.run``.
|
||||
(``ffprobe`` coverage now lives in ``test_ffprobe_prober.py`` alongside
|
||||
its adapter.)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from alfred.domain.movies.entities import Movie
|
||||
@@ -27,7 +25,6 @@ from alfred.domain.tv_shows.value_objects import (
|
||||
SeasonNumber,
|
||||
ShowStatus,
|
||||
)
|
||||
from alfred.infrastructure.filesystem import ffprobe
|
||||
from alfred.infrastructure.filesystem.filesystem_operations import (
|
||||
create_folder,
|
||||
move,
|
||||
@@ -38,147 +35,6 @@ from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
|
||||
_KB = YamlReleaseKnowledge()
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# ffprobe.probe #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
def _ffprobe_result(returncode=0, stdout="{}", stderr="") -> MagicMock:
|
||||
return MagicMock(returncode=returncode, stdout=stdout, stderr=stderr)
|
||||
|
||||
|
||||
class TestFfprobe:
|
||||
def test_timeout_returns_none(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
with patch(
|
||||
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
||||
side_effect=subprocess.TimeoutExpired(cmd="ffprobe", timeout=30),
|
||||
):
|
||||
assert ffprobe.probe(f) is None
|
||||
|
||||
def test_nonzero_returncode_returns_none(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
with patch(
|
||||
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
||||
return_value=_ffprobe_result(returncode=1, stderr="not a media file"),
|
||||
):
|
||||
assert ffprobe.probe(f) is None
|
||||
|
||||
def test_invalid_json_returns_none(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
with patch(
|
||||
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
||||
return_value=_ffprobe_result(stdout="not json {"),
|
||||
):
|
||||
assert ffprobe.probe(f) is None
|
||||
|
||||
def test_parses_format_duration_and_bitrate(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
payload = {
|
||||
"format": {"duration": "1234.5", "bit_rate": "5000000"},
|
||||
"streams": [],
|
||||
}
|
||||
with patch(
|
||||
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
||||
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
||||
):
|
||||
info = ffprobe.probe(f)
|
||||
assert info is not None
|
||||
assert info.duration_seconds == 1234.5
|
||||
assert info.bitrate_kbps == 5000 # bit_rate // 1000
|
||||
|
||||
def test_invalid_numeric_format_fields_skipped(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
payload = {
|
||||
"format": {"duration": "garbage", "bit_rate": "also-bad"},
|
||||
"streams": [],
|
||||
}
|
||||
with patch(
|
||||
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
||||
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
||||
):
|
||||
info = ffprobe.probe(f)
|
||||
assert info is not None
|
||||
assert info.duration_seconds is None
|
||||
assert info.bitrate_kbps is None
|
||||
|
||||
def test_parses_streams(self, tmp_path):
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
payload = {
|
||||
"format": {},
|
||||
"streams": [
|
||||
{
|
||||
"index": 0,
|
||||
"codec_type": "video",
|
||||
"codec_name": "h264",
|
||||
"width": 1920,
|
||||
"height": 1080,
|
||||
},
|
||||
{
|
||||
"index": 1,
|
||||
"codec_type": "audio",
|
||||
"codec_name": "ac3",
|
||||
"channels": 6,
|
||||
"channel_layout": "5.1",
|
||||
"tags": {"language": "eng"},
|
||||
"disposition": {"default": 1},
|
||||
},
|
||||
{
|
||||
"index": 2,
|
||||
"codec_type": "audio",
|
||||
"codec_name": "aac",
|
||||
"channels": 2,
|
||||
"tags": {"language": "fra"},
|
||||
},
|
||||
{
|
||||
"index": 3,
|
||||
"codec_type": "subtitle",
|
||||
"codec_name": "subrip",
|
||||
"tags": {"language": "fra"},
|
||||
"disposition": {"forced": 1},
|
||||
},
|
||||
],
|
||||
}
|
||||
with patch(
|
||||
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
||||
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
||||
):
|
||||
info = ffprobe.probe(f)
|
||||
assert info.video_codec == "h264"
|
||||
assert info.width == 1920 and info.height == 1080
|
||||
assert len(info.audio_tracks) == 2
|
||||
eng = info.audio_tracks[0]
|
||||
assert eng.language == "eng"
|
||||
assert eng.is_default is True
|
||||
assert info.audio_tracks[1].is_default is False
|
||||
assert len(info.subtitle_tracks) == 1
|
||||
assert info.subtitle_tracks[0].is_forced is True
|
||||
|
||||
def test_first_video_stream_wins(self, tmp_path):
|
||||
# The implementation only fills video_codec on the FIRST video stream.
|
||||
f = tmp_path / "x.mkv"
|
||||
f.write_bytes(b"")
|
||||
payload = {
|
||||
"format": {},
|
||||
"streams": [
|
||||
{"codec_type": "video", "codec_name": "h264", "width": 1920},
|
||||
{"codec_type": "video", "codec_name": "hevc", "width": 3840},
|
||||
],
|
||||
}
|
||||
with patch(
|
||||
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
||||
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
||||
):
|
||||
info = ffprobe.probe(f)
|
||||
assert info.video_codec == "h264"
|
||||
assert info.width == 1920
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# filesystem_operations #
|
||||
|
||||
Reference in New Issue
Block a user