Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 02e478a157 | |||
| 3dc73a5214 | |||
| 88f156b7a4 | |||
| 5107cb32c0 | |||
| b7979c0f8b | |||
| 9f1ce94690 | |||
| 5e0ed11672 | |||
| 0246f85ef8 | |||
| e62dc90bd1 |
+109
@@ -48,6 +48,26 @@ callers).
|
||||
|
||||
### Added
|
||||
|
||||
- **Fullwidth vertical bar `|` (U+FF5C) is now a recognized release-name
|
||||
token separator.** Added to `alfred/knowledge/release/separators.yaml`
|
||||
so CJK release names (and the occasional decorative YouTube-style use)
|
||||
tokenize cleanly instead of leaving the wide pipe glued onto an
|
||||
adjacent token. The tokenizer in
|
||||
`alfred/domain/release/parser/pipeline.py` already iterates the
|
||||
separator list as plain strings (no regex), so a multi-byte UTF-8
|
||||
separator works without any code change.
|
||||
|
||||
- **`InspectedResult.recommended_action` property** — derived hint that
|
||||
collapses the orchestrator's go / wait / skip decision into a single
|
||||
value (``"process"`` / ``"ask_user"`` / ``"skip"``). Centralizes the
|
||||
exclusion logic that was previously dispersed across road /
|
||||
media_type / main_video checks at each call site. Ordering is part of
|
||||
the contract: ``skip`` (no main video, or media_type == ``"other"``)
|
||||
wins over ``ask_user`` (media_type == ``"unknown"`` or road ==
|
||||
``"path_of_pain"``) which wins over ``process``. Surfaced through the
|
||||
``analyze_release`` tool so the LLM can route on it directly.
|
||||
6 new tests in ``tests/application/test_inspect.py`` cover the four
|
||||
branches and the precedence rules.
|
||||
- **`LanguageRepository` port** in `alfred.domain.shared.ports`. Structural
|
||||
Protocol covering `from_iso`, `from_any`, `all`, `__contains__`, `__len__`
|
||||
— the surface previously coupled to the concrete `LanguageRegistry`.
|
||||
@@ -57,6 +77,95 @@ callers).
|
||||
|
||||
### Changed
|
||||
|
||||
- **`Movie` and `Episode` are now frozen dataclasses.** Both entities
|
||||
hold their track collections as `tuple[AudioTrack, ...]` and
|
||||
`tuple[SubtitleTrack, ...]` instead of mutable lists, and are
|
||||
`@dataclass(frozen=True, eq=False)` (identity-based equality
|
||||
preserved via `__eq__`/`__hash__`). `__post_init__` coercion uses
|
||||
`object.__setattr__` for the `imdb_id` / `title` /
|
||||
`season_number` / `episode_number` normalizations. To project
|
||||
enrichment results (probe output, file metadata) callers now rebuild
|
||||
via `dataclasses.replace(...)`. Pattern aligned with the recent
|
||||
`ParsedRelease` freeze. `MediaWithTracks` mixin contract updated to
|
||||
`tuple` accordingly. `Season` and `TVShow` remain mutable for now —
|
||||
freezing the aggregate root would cascade a full reconstruction on
|
||||
every `add_episode`, deferred.
|
||||
- **`SubtitleCandidate` renamed to `SubtitleScanResult`.** The old name
|
||||
conflated "this might become a placed subtitle" with "this is what a
|
||||
scan pass produced". The class is the output of a scan/identify pass
|
||||
— language/format may still be `None`, confidence reflects how sure
|
||||
the classifier is, and `raw_tokens` holds the filename fragments
|
||||
under analysis. `SubtitleScanResult` says that directly. Pure rename
|
||||
with a refreshed docstring in `alfred/domain/subtitles/entities.py`;
|
||||
no behavior change. Touches the domain entity + `__init__` export,
|
||||
the matcher / identifier / utils services, the manage_subtitles use
|
||||
case, the placer, the metadata store, the shared-media cross-ref
|
||||
comment, and the seven test modules that imported the type.
|
||||
|
||||
- **`ParsedRelease` is now frozen; enrichment passes return new
|
||||
instances.** The VO was mutable so `detect_media_type` and
|
||||
`enrich_from_probe` could patch fields in place — a code smell in a
|
||||
value object whose identity *is* its content. `ParsedRelease` is now
|
||||
`@dataclass(frozen=True)`; `languages` is a `tuple[str, ...]`
|
||||
instead of a `list[str]`. `enrich_from_probe` returns a new
|
||||
`ParsedRelease` via `dataclasses.replace` (only allocates when at
|
||||
least one field actually changed). `inspect_release` rebinds
|
||||
`parsed` after both `detect_media_type` (wrapped in `MediaTypeToken`
|
||||
to satisfy the strict isinstance check that now also runs on
|
||||
replace) and `enrich_from_probe`. Parser pipeline now packs
|
||||
`languages` as a tuple in the assemble dict. Callers updated:
|
||||
`inspect_release`, `testing/recognize_folders_in_downloads.py`, and
|
||||
the enrichment tests (22 call sites + language assertions switched
|
||||
to tuple literals).
|
||||
- **`resolve_destination` use cases take `kb` / `prober` as required
|
||||
params; module-level singletons gone.** The four
|
||||
`resolve_{season,episode,movie,series}_destination` use cases now
|
||||
accept `kb: ReleaseKnowledge` and `prober: MediaProber` as required
|
||||
arguments, matching the shape of `inspect_release`. The module-level
|
||||
`_KB = YamlReleaseKnowledge()` and `_PROBER = FfprobeMediaProber()`
|
||||
singletons that previously lived in
|
||||
`alfred/application/filesystem/resolve_destination.py` are removed —
|
||||
the application layer no longer reaches into infrastructure. The
|
||||
singletons now live at the agent-tools frontier
|
||||
(`alfred/agent/tools/filesystem.py`), where the LLM-facing wrappers
|
||||
instantiate them once and thread them through. `analyze_release` no
|
||||
longer needs the dirty `from ... import _KB` indirection. Tests
|
||||
inject their own stubs by keyword (`prober=_StubProber(...)`) instead
|
||||
of monkeypatching a module attribute.
|
||||
- **`ParsePath` enum renamed to `TokenizationRoute`.** The old name
|
||||
collided with `pathlib.Path` in code-reading mental models, and was
|
||||
one letter from `parse_path` (the field that holds the value) — making
|
||||
it harder than it needed to be to spot the type vs the attribute.
|
||||
``TokenizationRoute`` says what it actually captures (DIRECT /
|
||||
SANITIZED / AI = how the name reached the tokenizer), and the class
|
||||
docstring now spells out the orthogonality with ``Road`` (EASY /
|
||||
SHITTY / PATH_OF_PAIN, which captures parser confidence on
|
||||
``ParseReport``). The ``parse_path`` field name stays unchanged —
|
||||
string values too — so YAML fixtures, the ``analyze_release`` tool
|
||||
spec, and any external consumer are untouched.
|
||||
- **`enrich_from_probe` codec mappings moved to YAML.** The three
|
||||
hard-coded module dicts (`_VIDEO_CODEC_MAP`, `_AUDIO_CODEC_MAP`,
|
||||
`_CHANNEL_MAP`) translating ffprobe output to scene tokens
|
||||
(`hevc → x265`, `eac3 → EAC3`, `8 → "7.1"`, …) now live in
|
||||
`alfred/knowledge/release/probe_mappings.yaml` and are loaded into
|
||||
`ReleaseKnowledge.probe_mappings` (new port field, populated by
|
||||
`YamlReleaseKnowledge`). `enrich_from_probe` gains a third `kb`
|
||||
parameter and reads the maps from there. Aligns with the CLAUDE.md
|
||||
rule that lookup tables of domain knowledge belong in YAML, not in
|
||||
Python — and opens the door to a future "learn new codec" pass.
|
||||
Callers updated: `inspect_release`, `testing/recognize_folders_in_downloads.py`,
|
||||
and all 22 sites in `tests/application/test_enrich_from_probe.py`.
|
||||
- **`ParsedRelease.tech_string` is now a derived `@property`**
|
||||
(`alfred/domain/release/value_objects.py`). It computes
|
||||
`quality.source.codec` joined by dots on every access, so it stays in
|
||||
sync with the underlying fields by construction. The stored field is
|
||||
gone from the dataclass, the dict returned by `assemble()` no longer
|
||||
carries the key, `parse_release`'s malformed-name fallback drops the
|
||||
`tech_string=""` kwarg, and `enrich_from_probe` no longer re-derives
|
||||
it after filling `quality`/`source`/`codec`. Closes the
|
||||
parser/enrichment double-source-of-truth that `e79ca46` had to fix
|
||||
reactively. The fixtures runner now injects `tech_string` alongside
|
||||
`is_season_pack` since `asdict()` skips properties.
|
||||
- **`RuleScope.level` is now an enum (`RuleScopeLevel`).** The set of
|
||||
valid levels (global, release_group, movie, show, season, episode)
|
||||
was documented only in a docstring comment and validated nowhere.
|
||||
|
||||
@@ -26,10 +26,15 @@ from alfred.application.filesystem.resolve_destination import (
|
||||
resolve_series_destination as _resolve_series_destination,
|
||||
)
|
||||
from alfred.infrastructure.filesystem import FileManager, create_folder, move
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
from alfred.infrastructure.metadata import MetadataStore
|
||||
from alfred.infrastructure.persistence import get_memory
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
# Agent-tools frontier: this is the legitimate home for the singletons that
|
||||
# back every LLM-exposed wrapper. The use cases below take ``kb`` / ``prober``
|
||||
# as required params; tests inject their own stubs.
|
||||
_KB = YamlReleaseKnowledge()
|
||||
_PROBER = FfprobeMediaProber()
|
||||
|
||||
_LEARNED_ROOT = Path(_alfred_pkg.__file__).parent.parent / "data" / "knowledge"
|
||||
@@ -60,7 +65,13 @@ def resolve_season_destination(
|
||||
) -> dict[str, Any]:
|
||||
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/resolve_season_destination.yaml."""
|
||||
return _resolve_season_destination(
|
||||
release_name, tmdb_title, tmdb_year, confirmed_folder, source_path
|
||||
release_name,
|
||||
tmdb_title,
|
||||
tmdb_year,
|
||||
_KB,
|
||||
_PROBER,
|
||||
confirmed_folder,
|
||||
source_path,
|
||||
).to_dict()
|
||||
|
||||
|
||||
@@ -78,6 +89,8 @@ def resolve_episode_destination(
|
||||
source_file,
|
||||
tmdb_title,
|
||||
tmdb_year,
|
||||
_KB,
|
||||
_PROBER,
|
||||
tmdb_episode_title,
|
||||
confirmed_folder,
|
||||
).to_dict()
|
||||
@@ -91,7 +104,7 @@ def resolve_movie_destination(
|
||||
) -> dict[str, Any]:
|
||||
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/resolve_movie_destination.yaml."""
|
||||
return _resolve_movie_destination(
|
||||
release_name, source_file, tmdb_title, tmdb_year
|
||||
release_name, source_file, tmdb_title, tmdb_year, _KB, _PROBER
|
||||
).to_dict()
|
||||
|
||||
|
||||
@@ -104,7 +117,13 @@ def resolve_series_destination(
|
||||
) -> dict[str, Any]:
|
||||
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/resolve_series_destination.yaml."""
|
||||
return _resolve_series_destination(
|
||||
release_name, tmdb_title, tmdb_year, confirmed_folder, source_path
|
||||
release_name,
|
||||
tmdb_title,
|
||||
tmdb_year,
|
||||
_KB,
|
||||
_PROBER,
|
||||
confirmed_folder,
|
||||
source_path,
|
||||
).to_dict()
|
||||
|
||||
|
||||
@@ -191,7 +210,6 @@ def set_path_for_folder(folder_name: str, path_value: str) -> dict[str, Any]:
|
||||
|
||||
def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
|
||||
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/analyze_release.yaml."""
|
||||
from alfred.application.filesystem.resolve_destination import _KB # noqa: PLC0415
|
||||
from alfred.application.release import inspect_release # noqa: PLC0415
|
||||
|
||||
result = inspect_release(release_name, Path(source_path), _KB, _PROBER)
|
||||
@@ -220,6 +238,7 @@ def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
|
||||
"probe_used": result.probe_used,
|
||||
"confidence": result.report.confidence,
|
||||
"road": result.report.road,
|
||||
"recommended_action": result.recommended_action,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -82,3 +82,4 @@ returns:
|
||||
probe_used: True when ffprobe successfully enriched the result.
|
||||
confidence: Parser confidence score, 0–100 (higher = more reliable).
|
||||
road: "Parser road: 'easy' (group schema matched), 'shitty' (heuristic but acceptable), or 'path_of_pain' (low confidence — ask the user before auto-routing)."
|
||||
recommended_action: "Orchestrator hint: 'process' (go straight to resolve_*_destination), 'ask_user' (media_type unknown or road=path_of_pain — confirm with the user first), or 'skip' (no main video, or media_type=other — nothing to organize)."
|
||||
|
||||
@@ -4,7 +4,7 @@ import logging
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.shared.value_objects import ImdbId
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import SubtitleScanResult
|
||||
from alfred.domain.subtitles.services.identifier import SubtitleIdentifier
|
||||
from alfred.domain.subtitles.services.matcher import SubtitleMatcher
|
||||
from alfred.domain.subtitles.services.pattern_detector import PatternDetector
|
||||
@@ -278,7 +278,7 @@ class ManageSubtitlesUseCase:
|
||||
|
||||
|
||||
def _to_unresolved_dto(
|
||||
track: SubtitleCandidate, min_confidence: float = 0.7
|
||||
track: SubtitleScanResult, min_confidence: float = 0.7
|
||||
) -> UnresolvedTrack:
|
||||
reason = "unknown_language" if track.language is None else "low_confidence"
|
||||
return UnresolvedTrack(
|
||||
@@ -291,10 +291,10 @@ def _to_unresolved_dto(
|
||||
|
||||
def _pair_placed_with_tracks(
|
||||
placed: list[PlacedTrack],
|
||||
tracks: list[SubtitleCandidate],
|
||||
) -> list[tuple[PlacedTrack, SubtitleCandidate]]:
|
||||
tracks: list[SubtitleScanResult],
|
||||
) -> list[tuple[PlacedTrack, SubtitleScanResult]]:
|
||||
"""
|
||||
Pair each PlacedTrack with its originating SubtitleCandidate by source path.
|
||||
Pair each PlacedTrack with its originating SubtitleScanResult by source path.
|
||||
Falls back to positional matching if paths don't align.
|
||||
"""
|
||||
track_by_path = {t.file_path: t for t in tracks if t.file_path}
|
||||
|
||||
@@ -26,34 +26,30 @@ from alfred.application.release import inspect_release
|
||||
from alfred.domain.release import parse_release
|
||||
from alfred.domain.release.ports import ReleaseKnowledge
|
||||
from alfred.domain.release.value_objects import ParsedRelease
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
from alfred.domain.shared.ports import MediaProber
|
||||
from alfred.infrastructure.persistence import get_memory
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Single module-level knowledge instance. YAML is loaded once at first import.
|
||||
# Tests that need a custom KB can monkeypatch this attribute.
|
||||
_KB: ReleaseKnowledge = YamlReleaseKnowledge()
|
||||
|
||||
# Module-level prober — same singleton style as _KB. Tests that need a custom
|
||||
# adapter can monkeypatch this attribute.
|
||||
_PROBER = FfprobeMediaProber()
|
||||
|
||||
|
||||
def _resolve_parsed(release_name: str, source_path: str | None) -> ParsedRelease:
|
||||
def _resolve_parsed(
|
||||
release_name: str,
|
||||
source_path: str | None,
|
||||
kb: ReleaseKnowledge,
|
||||
prober: MediaProber,
|
||||
) -> ParsedRelease:
|
||||
"""Pick the right entry point depending on whether we have a path.
|
||||
|
||||
When ``source_path`` is provided and points to something that exists,
|
||||
we run the full inspection pipeline so probe data can refresh
|
||||
``tech_string`` (which feeds every filename builder). Otherwise we
|
||||
fall back to a parse-only path — same behavior as before.
|
||||
we run the full inspection pipeline so probe data can refresh tech
|
||||
fields (which feed every filename builder). Otherwise we fall back
|
||||
to a parse-only path — same behavior as before.
|
||||
"""
|
||||
if source_path:
|
||||
path = Path(source_path)
|
||||
if path.exists():
|
||||
return inspect_release(release_name, path, _KB, _PROBER).parsed
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
return inspect_release(release_name, path, kb, prober).parsed
|
||||
parsed, _ = parse_release(release_name, kb)
|
||||
return parsed
|
||||
|
||||
|
||||
@@ -259,6 +255,8 @@ def resolve_season_destination(
|
||||
release_name: str,
|
||||
tmdb_title: str,
|
||||
tmdb_year: int,
|
||||
kb: ReleaseKnowledge,
|
||||
prober: MediaProber,
|
||||
confirmed_folder: str | None = None,
|
||||
source_path: str | None = None,
|
||||
) -> ResolvedSeasonDestination:
|
||||
@@ -280,8 +278,8 @@ def resolve_season_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed = _resolve_parsed(release_name, source_path)
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
parsed = _resolve_parsed(release_name, source_path, kb, prober)
|
||||
tmdb_title_safe = kb.sanitize_for_fs(tmdb_title)
|
||||
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
|
||||
|
||||
resolved = _resolve_series_folder(
|
||||
@@ -314,6 +312,8 @@ def resolve_episode_destination(
|
||||
source_file: str,
|
||||
tmdb_title: str,
|
||||
tmdb_year: int,
|
||||
kb: ReleaseKnowledge,
|
||||
prober: MediaProber,
|
||||
tmdb_episode_title: str | None = None,
|
||||
confirmed_folder: str | None = None,
|
||||
) -> ResolvedEpisodeDestination:
|
||||
@@ -332,11 +332,11 @@ def resolve_episode_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed = _resolve_parsed(release_name, source_file)
|
||||
parsed = _resolve_parsed(release_name, source_file, kb, prober)
|
||||
ext = Path(source_file).suffix
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
tmdb_title_safe = kb.sanitize_for_fs(tmdb_title)
|
||||
tmdb_episode_title_safe = (
|
||||
_KB.sanitize_for_fs(tmdb_episode_title) if tmdb_episode_title else None
|
||||
kb.sanitize_for_fs(tmdb_episode_title) if tmdb_episode_title else None
|
||||
)
|
||||
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
|
||||
|
||||
@@ -375,6 +375,8 @@ def resolve_movie_destination(
|
||||
source_file: str,
|
||||
tmdb_title: str,
|
||||
tmdb_year: int,
|
||||
kb: ReleaseKnowledge,
|
||||
prober: MediaProber,
|
||||
) -> ResolvedMovieDestination:
|
||||
"""
|
||||
Compute destination paths for a movie file.
|
||||
@@ -392,9 +394,9 @@ def resolve_movie_destination(
|
||||
message="Movie library path is not configured.",
|
||||
)
|
||||
|
||||
parsed = _resolve_parsed(release_name, source_file)
|
||||
parsed = _resolve_parsed(release_name, source_file, kb, prober)
|
||||
ext = Path(source_file).suffix
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
tmdb_title_safe = kb.sanitize_for_fs(tmdb_title)
|
||||
|
||||
folder_name = parsed.movie_folder_name(tmdb_title_safe, tmdb_year)
|
||||
filename = parsed.movie_filename(tmdb_title_safe, tmdb_year, ext)
|
||||
@@ -416,6 +418,8 @@ def resolve_series_destination(
|
||||
release_name: str,
|
||||
tmdb_title: str,
|
||||
tmdb_year: int,
|
||||
kb: ReleaseKnowledge,
|
||||
prober: MediaProber,
|
||||
confirmed_folder: str | None = None,
|
||||
source_path: str | None = None,
|
||||
) -> ResolvedSeriesDestination:
|
||||
@@ -435,8 +439,8 @@ def resolve_series_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed = _resolve_parsed(release_name, source_path)
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
parsed = _resolve_parsed(release_name, source_path, kb, prober)
|
||||
tmdb_title_safe = kb.sanitize_for_fs(tmdb_title)
|
||||
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
|
||||
|
||||
resolved = _resolve_series_folder(
|
||||
|
||||
@@ -2,61 +2,45 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import replace
|
||||
|
||||
from alfred.domain.release.ports import ReleaseKnowledge
|
||||
from alfred.domain.release.value_objects import ParsedRelease
|
||||
from alfred.domain.shared.media import MediaInfo
|
||||
|
||||
# Map ffprobe codec names to scene-style codec tokens
|
||||
_VIDEO_CODEC_MAP = {
|
||||
"hevc": "x265",
|
||||
"h264": "x264",
|
||||
"h265": "x265",
|
||||
"av1": "AV1",
|
||||
"vp9": "VP9",
|
||||
"mpeg4": "XviD",
|
||||
}
|
||||
|
||||
# Map ffprobe audio codec names to scene-style tokens
|
||||
_AUDIO_CODEC_MAP = {
|
||||
"eac3": "EAC3",
|
||||
"ac3": "AC3",
|
||||
"dts": "DTS",
|
||||
"truehd": "TrueHD",
|
||||
"aac": "AAC",
|
||||
"flac": "FLAC",
|
||||
"opus": "OPUS",
|
||||
"mp3": "MP3",
|
||||
"pcm_s16l": "PCM",
|
||||
"pcm_s24l": "PCM",
|
||||
}
|
||||
|
||||
# Map channel count to standard layout string
|
||||
_CHANNEL_MAP = {
|
||||
8: "7.1",
|
||||
6: "5.1",
|
||||
2: "2.0",
|
||||
1: "1.0",
|
||||
}
|
||||
|
||||
|
||||
def enrich_from_probe(parsed: ParsedRelease, info: MediaInfo) -> None:
|
||||
def enrich_from_probe(
|
||||
parsed: ParsedRelease, info: MediaInfo, kb: ReleaseKnowledge
|
||||
) -> ParsedRelease:
|
||||
"""
|
||||
Fill None fields in parsed using data from ffprobe MediaInfo.
|
||||
Return a new ParsedRelease with None fields filled from ffprobe MediaInfo.
|
||||
|
||||
Only overwrites fields that are currently None — token-level values
|
||||
from the release name always take priority.
|
||||
Mutates parsed in place.
|
||||
from the release name always take priority. ``ParsedRelease`` is
|
||||
frozen; this returns a new instance via :func:`dataclasses.replace`.
|
||||
|
||||
Translation tables (ffprobe codec name → scene token, channel count
|
||||
→ layout) live in ``kb.probe_mappings`` (loaded from
|
||||
``alfred/knowledge/release/probe_mappings.yaml``). When ffprobe
|
||||
reports a value with no mapping entry, the fallback is the uppercase
|
||||
raw value so unknown codecs still surface in a predictable form.
|
||||
"""
|
||||
mappings = kb.probe_mappings
|
||||
video_codec_map: dict[str, str] = mappings.get("video_codec", {})
|
||||
audio_codec_map: dict[str, str] = mappings.get("audio_codec", {})
|
||||
channel_map: dict[int, str] = mappings.get("audio_channels", {})
|
||||
|
||||
updates: dict[str, object] = {}
|
||||
|
||||
if parsed.quality is None and info.resolution:
|
||||
parsed.quality = info.resolution
|
||||
updates["quality"] = info.resolution
|
||||
|
||||
if parsed.codec is None and info.video_codec:
|
||||
parsed.codec = _VIDEO_CODEC_MAP.get(
|
||||
updates["codec"] = video_codec_map.get(
|
||||
info.video_codec.lower(), info.video_codec.upper()
|
||||
)
|
||||
|
||||
if parsed.bit_depth is None and info.video_codec:
|
||||
# ffprobe exposes bit depth via pix_fmt — not in MediaInfo yet, skip for now
|
||||
pass
|
||||
# bit_depth: ffprobe exposes it via pix_fmt — not in MediaInfo yet, skip.
|
||||
|
||||
# Audio — use the default track, fallback to first
|
||||
default_track = next((t for t in info.audio_tracks if t.is_default), None)
|
||||
@@ -64,26 +48,27 @@ def enrich_from_probe(parsed: ParsedRelease, info: MediaInfo) -> None:
|
||||
|
||||
if track:
|
||||
if parsed.audio_codec is None and track.codec:
|
||||
parsed.audio_codec = _AUDIO_CODEC_MAP.get(
|
||||
updates["audio_codec"] = audio_codec_map.get(
|
||||
track.codec.lower(), track.codec.upper()
|
||||
)
|
||||
|
||||
if parsed.audio_channels is None and track.channels:
|
||||
parsed.audio_channels = _CHANNEL_MAP.get(
|
||||
updates["audio_channels"] = channel_map.get(
|
||||
track.channels, f"{track.channels}ch"
|
||||
)
|
||||
|
||||
# Languages — merge ffprobe languages with token-level ones
|
||||
# "und" = undetermined, not useful
|
||||
if info.audio_languages:
|
||||
existing = set(parsed.languages)
|
||||
existing_upper = {lang.upper() for lang in parsed.languages}
|
||||
new_languages = list(parsed.languages)
|
||||
for lang in info.audio_languages:
|
||||
if lang.lower() != "und" and lang.upper() not in existing:
|
||||
parsed.languages.append(lang)
|
||||
if lang.lower() != "und" and lang.upper() not in existing_upper:
|
||||
new_languages.append(lang)
|
||||
existing_upper.add(lang.upper())
|
||||
if len(new_languages) != len(parsed.languages):
|
||||
updates["languages"] = tuple(new_languages)
|
||||
|
||||
# Re-derive tech_string so filename builders see the enriched
|
||||
# quality/source/codec. Built the same way as in the parser pipeline:
|
||||
# the non-None parts joined by dots, in order.
|
||||
parsed.tech_string = ".".join(
|
||||
p for p in (parsed.quality, parsed.source, parsed.codec) if p
|
||||
)
|
||||
if not updates:
|
||||
return parsed
|
||||
return replace(parsed, **updates)
|
||||
|
||||
@@ -45,7 +45,7 @@ Design notes:
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, replace
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.application.release.detect_media_type import detect_media_type
|
||||
@@ -53,11 +53,30 @@ from alfred.application.release.enrich_from_probe import enrich_from_probe
|
||||
from alfred.application.release.supported_media import find_main_video
|
||||
from alfred.domain.release.ports import ReleaseKnowledge
|
||||
from alfred.domain.release.services import parse_release
|
||||
from alfred.domain.release.value_objects import ParsedRelease, ParseReport
|
||||
from alfred.domain.release.value_objects import (
|
||||
MediaTypeToken,
|
||||
ParsedRelease,
|
||||
ParseReport,
|
||||
)
|
||||
from alfred.domain.shared.media import MediaInfo
|
||||
from alfred.domain.shared.ports import MediaProber
|
||||
|
||||
|
||||
# Media types for which a probe carries no useful information.
|
||||
_NON_PROBABLE_MEDIA_TYPES = frozenset({"unknown", "other"})
|
||||
|
||||
# Media types for which there's nothing for the organizer to do.
|
||||
# ``other`` covers things like games / ISOs / archives sitting on the
|
||||
# downloads folder. ``unknown`` does NOT belong here — those need a
|
||||
# user decision, not a skip.
|
||||
_SKIPPABLE_MEDIA_TYPES = frozenset({"other"})
|
||||
|
||||
# Roads that signal the parser couldn't reach a confident answer on its
|
||||
# own. ``Road`` values are kept as strings on the report to avoid a
|
||||
# cross-package import here.
|
||||
_ASK_USER_ROADS = frozenset({"path_of_pain"})
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class InspectedResult:
|
||||
"""The full picture of a release: parsed name + filesystem reality.
|
||||
@@ -81,6 +100,10 @@ class InspectedResult:
|
||||
- ``probe_used`` — ``True`` iff ``media_info`` is non-``None`` and
|
||||
``enrich_from_probe`` actually ran. Explicit flag so callers
|
||||
don't have to re-derive the condition.
|
||||
- ``recommended_action`` — derived hint for the orchestrator (see
|
||||
property docstring). Encodes the exclusion / clarification /
|
||||
go-ahead decision in one place so downstream callers don't
|
||||
re-implement the same checks.
|
||||
"""
|
||||
|
||||
parsed: ParsedRelease
|
||||
@@ -90,9 +113,36 @@ class InspectedResult:
|
||||
media_info: MediaInfo | None
|
||||
probe_used: bool
|
||||
|
||||
@property
|
||||
def recommended_action(self) -> str:
|
||||
"""Return one of ``"skip"`` / ``"ask_user"`` / ``"process"``.
|
||||
|
||||
# Media types for which a probe carries no useful information.
|
||||
_NON_PROBABLE_MEDIA_TYPES = frozenset({"unknown", "other"})
|
||||
- ``"skip"`` — nothing to organize:
|
||||
* the source has no main video file, **or**
|
||||
* ``media_type`` is ``"other"`` (games / ISOs / archives).
|
||||
- ``"ask_user"`` — a decision is required before any action:
|
||||
* ``media_type`` is ``"unknown"`` (parser couldn't classify), **or**
|
||||
* the parse landed on ``Road.PATH_OF_PAIN``
|
||||
(low-confidence, malformed name, etc.).
|
||||
- ``"process"`` — everything else: a confident parse with a
|
||||
usable media type and a main video on disk. The orchestrator
|
||||
can move straight to the planning step.
|
||||
|
||||
The check ordering matters: ``"skip"`` wins over ``"ask_user"``
|
||||
because if there's no video to organize, no question to the
|
||||
user can change that. ``"ask_user"`` then wins over
|
||||
``"process"`` because a confident parse alone isn't enough if
|
||||
the type or road still flag uncertainty.
|
||||
"""
|
||||
if self.main_video is None:
|
||||
return "skip"
|
||||
if self.parsed.media_type.value in _SKIPPABLE_MEDIA_TYPES:
|
||||
return "skip"
|
||||
if self.parsed.media_type.value == "unknown":
|
||||
return "ask_user"
|
||||
if self.report.road in _ASK_USER_ROADS:
|
||||
return "ask_user"
|
||||
return "process"
|
||||
|
||||
|
||||
def inspect_release(
|
||||
@@ -115,8 +165,11 @@ def inspect_release(
|
||||
|
||||
# Step 2: refine media_type from the on-disk extension mix.
|
||||
# detect_media_type tolerates non-existent paths (returns parsed.media_type
|
||||
# untouched), so no need to guard here.
|
||||
parsed.media_type = detect_media_type(parsed, source_path, kb)
|
||||
# untouched), so no need to guard here. ParsedRelease is frozen — use
|
||||
# dataclasses.replace to rebind with the refined value.
|
||||
refined_media_type = MediaTypeToken(detect_media_type(parsed, source_path, kb))
|
||||
if refined_media_type != parsed.media_type:
|
||||
parsed = replace(parsed, media_type=refined_media_type)
|
||||
|
||||
# Step 3: pick the canonical main video (top-level scan only).
|
||||
main_video = find_main_video(source_path, kb)
|
||||
@@ -127,7 +180,7 @@ def inspect_release(
|
||||
if main_video is not None and parsed.media_type not in _NON_PROBABLE_MEDIA_TYPES:
|
||||
media_info = prober.probe(main_video)
|
||||
if media_info is not None:
|
||||
enrich_from_probe(parsed, media_info)
|
||||
parsed = enrich_from_probe(parsed, media_info, kb)
|
||||
probe_used = True
|
||||
|
||||
return InspectedResult(
|
||||
|
||||
@@ -5,13 +5,13 @@ import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import SubtitleScanResult
|
||||
from alfred.domain.subtitles.value_objects import SubtitleType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _build_dest_name(track: SubtitleCandidate, video_stem: str) -> str:
|
||||
def _build_dest_name(track: SubtitleScanResult, video_stem: str) -> str:
|
||||
"""
|
||||
Build the destination filename for a subtitle track.
|
||||
|
||||
@@ -41,7 +41,7 @@ class PlacedTrack:
|
||||
@dataclass
|
||||
class PlaceResult:
|
||||
placed: list[PlacedTrack]
|
||||
skipped: list[tuple[SubtitleCandidate, str]] # (track, reason)
|
||||
skipped: list[tuple[SubtitleScanResult, str]] # (track, reason)
|
||||
|
||||
@property
|
||||
def placed_count(self) -> int:
|
||||
@@ -54,7 +54,7 @@ class PlaceResult:
|
||||
|
||||
class SubtitlePlacer:
|
||||
"""
|
||||
Hard-links matched SubtitleCandidate files next to a destination video.
|
||||
Hard-links matched SubtitleScanResult files next to a destination video.
|
||||
|
||||
Uses the same hard-link strategy as FileManager.copy_file:
|
||||
instant, no data duplication, qBittorrent keeps seeding.
|
||||
@@ -64,11 +64,11 @@ class SubtitlePlacer:
|
||||
|
||||
def place(
|
||||
self,
|
||||
tracks: list[SubtitleCandidate],
|
||||
tracks: list[SubtitleScanResult],
|
||||
destination_video: Path,
|
||||
) -> PlaceResult:
|
||||
placed: list[PlacedTrack] = []
|
||||
skipped: list[tuple[SubtitleCandidate, str]] = []
|
||||
skipped: list[tuple[SubtitleScanResult, str]] = []
|
||||
|
||||
dest_dir = destination_video.parent
|
||||
|
||||
|
||||
@@ -8,19 +8,22 @@ from ..shared.value_objects import FilePath, FileSize, ImdbId
|
||||
from .value_objects import MovieTitle, Quality, ReleaseYear
|
||||
|
||||
|
||||
@dataclass(eq=False)
|
||||
@dataclass(frozen=True, eq=False)
|
||||
class Movie(MediaWithTracks):
|
||||
"""
|
||||
Movie aggregate root for the movies domain.
|
||||
|
||||
Carries file metadata (path, size) and the tracks discovered by the
|
||||
ffprobe + subtitle scan pipeline. The track lists may be empty when the
|
||||
ffprobe + subtitle scan pipeline. The track tuples may be empty when the
|
||||
movie is known but not yet scanned, or when no file is downloaded.
|
||||
|
||||
Track helpers follow the same "C+" contract as ``Episode``: pass a
|
||||
``Language`` for cross-format matching, or a ``str`` for case-insensitive
|
||||
direct comparison.
|
||||
|
||||
Frozen: rebuild via ``dataclasses.replace`` to project enrichment results
|
||||
(audio/subtitle tracks, file metadata) onto a new instance.
|
||||
|
||||
Equality is identity-based: two ``Movie`` instances are equal iff they
|
||||
share the same ``imdb_id``, regardless of file/track contents. This is
|
||||
the DDD aggregate invariant — the aggregate is identified by its root id.
|
||||
@@ -34,15 +37,15 @@ class Movie(MediaWithTracks):
|
||||
file_size: FileSize | None = None
|
||||
tmdb_id: int | None = None
|
||||
added_at: datetime = field(default_factory=datetime.now)
|
||||
audio_tracks: list[AudioTrack] = field(default_factory=list)
|
||||
subtitle_tracks: list[SubtitleTrack] = field(default_factory=list)
|
||||
audio_tracks: tuple[AudioTrack, ...] = field(default_factory=tuple)
|
||||
subtitle_tracks: tuple[SubtitleTrack, ...] = field(default_factory=tuple)
|
||||
|
||||
def __post_init__(self):
|
||||
"""Validate movie entity."""
|
||||
# Ensure ImdbId is actually an ImdbId instance
|
||||
if not isinstance(self.imdb_id, ImdbId):
|
||||
if isinstance(self.imdb_id, str):
|
||||
self.imdb_id = ImdbId(self.imdb_id)
|
||||
object.__setattr__(self, "imdb_id", ImdbId(self.imdb_id))
|
||||
else:
|
||||
raise ValueError(
|
||||
f"imdb_id must be ImdbId or str, got {type(self.imdb_id)}"
|
||||
@@ -51,7 +54,7 @@ class Movie(MediaWithTracks):
|
||||
# Ensure MovieTitle is actually a MovieTitle instance
|
||||
if not isinstance(self.title, MovieTitle):
|
||||
if isinstance(self.title, str):
|
||||
self.title = MovieTitle(self.title)
|
||||
object.__setattr__(self, "title", MovieTitle(self.title))
|
||||
else:
|
||||
raise ValueError(
|
||||
f"title must be MovieTitle or str, got {type(self.title)}"
|
||||
|
||||
@@ -713,9 +713,6 @@ def assemble(
|
||||
if distributor is None:
|
||||
distributor = tok.text.upper()
|
||||
|
||||
tech_parts = [p for p in (quality, source, codec) if p]
|
||||
tech_string = ".".join(tech_parts)
|
||||
|
||||
# Media type heuristic. Doc/concert/integrale tokens win over the
|
||||
# generic tech-based fallback. We look across all tokens (not just
|
||||
# annotated ones) because these markers may be tagged UNKNOWN by the
|
||||
@@ -754,10 +751,9 @@ def assemble(
|
||||
"source": source,
|
||||
"codec": codec,
|
||||
"group": group,
|
||||
"tech_string": tech_string,
|
||||
"media_type": media_type,
|
||||
"site_tag": site_tag,
|
||||
"languages": languages,
|
||||
"languages": tuple(languages),
|
||||
"audio_codec": audio_codec,
|
||||
"audio_channels": audio_channels,
|
||||
"bit_depth": bit_depth,
|
||||
|
||||
@@ -34,7 +34,7 @@ from .tokens import Token, TokenRole
|
||||
class Road(str, Enum):
|
||||
"""How the parser handled a given release name.
|
||||
|
||||
Distinct from :class:`~alfred.domain.release.value_objects.ParsePath`,
|
||||
Distinct from :class:`~alfred.domain.release.value_objects.TokenizationRoute`,
|
||||
which records the tokenization route (DIRECT / SANITIZED / AI). Road
|
||||
is about confidence in the *result*, not the *method*.
|
||||
"""
|
||||
|
||||
@@ -52,6 +52,18 @@ class ReleaseKnowledge(Protocol):
|
||||
|
||||
scoring: dict
|
||||
|
||||
# --- ffprobe → scene-token translation tables (consumed by
|
||||
# ``application.release.enrich_from_probe``). Domain parsing itself
|
||||
# doesn't touch these — exposed on the same KB to keep release
|
||||
# knowledge in a single ownership point.
|
||||
#
|
||||
# Shape:
|
||||
# - ``video_codec``: dict[str, str] ffprobe lower → scene token
|
||||
# - ``audio_codec``: dict[str, str] ffprobe lower → scene token
|
||||
# - ``audio_channels``: dict[int, str] channel count → layout ---
|
||||
|
||||
probe_mappings: dict
|
||||
|
||||
# --- File-extension sets (used by application/infra modules that work
|
||||
# directly with filesystem paths, e.g. media-type detection, video
|
||||
# lookup). Domain parsing itself doesn't touch these. ---
|
||||
|
||||
@@ -21,7 +21,7 @@ from __future__ import annotations
|
||||
from .parser import pipeline as _v2
|
||||
from .parser import scoring as _scoring
|
||||
from .ports import ReleaseKnowledge
|
||||
from .value_objects import MediaTypeToken, ParsedRelease, ParsePath, ParseReport
|
||||
from .value_objects import MediaTypeToken, ParsedRelease, ParseReport, TokenizationRoute
|
||||
|
||||
|
||||
def parse_release(
|
||||
@@ -44,7 +44,7 @@ def parse_release(
|
||||
3. Otherwise run the v2 pipeline: tokenize → annotate (EASY when a
|
||||
group schema is known, SHITTY otherwise) → assemble → score.
|
||||
"""
|
||||
parse_path = ParsePath.DIRECT
|
||||
parse_path = TokenizationRoute.DIRECT
|
||||
|
||||
# Apostrophes inside titles ("Don't", "L'avare") are common and should
|
||||
# not push the release through the AI fallback. Strip them up front so
|
||||
@@ -53,11 +53,11 @@ def parse_release(
|
||||
working_name = name
|
||||
if "'" in working_name:
|
||||
working_name = working_name.replace("'", "")
|
||||
parse_path = ParsePath.SANITIZED
|
||||
parse_path = TokenizationRoute.SANITIZED
|
||||
|
||||
clean, site_tag = _v2.strip_site_tag(working_name)
|
||||
if site_tag is not None:
|
||||
parse_path = ParsePath.SANITIZED
|
||||
parse_path = TokenizationRoute.SANITIZED
|
||||
|
||||
if not _is_well_formed(clean, kb):
|
||||
parsed = ParsedRelease(
|
||||
@@ -73,10 +73,9 @@ def parse_release(
|
||||
source=None,
|
||||
codec=None,
|
||||
group="UNKNOWN",
|
||||
tech_string="",
|
||||
media_type=MediaTypeToken.UNKNOWN,
|
||||
site_tag=site_tag,
|
||||
parse_path=ParsePath.AI,
|
||||
parse_path=TokenizationRoute.AI,
|
||||
)
|
||||
report = ParseReport(
|
||||
confidence=0,
|
||||
|
||||
@@ -15,7 +15,7 @@ calling ``kb.sanitize_for_fs(tmdb_title)`` before invoking the builders.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
from ..shared.exceptions import ValidationError
|
||||
@@ -40,9 +40,21 @@ class MediaTypeToken(str, Enum):
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
class ParsePath(str, Enum):
|
||||
"""How a ``ParsedRelease`` was produced. ``str``-backed for the same
|
||||
reasons as :class:`MediaTypeToken`."""
|
||||
class TokenizationRoute(str, Enum):
|
||||
"""How a ``ParsedRelease`` was produced.
|
||||
|
||||
Records the **tokenization route** — i.e. whether the release name
|
||||
was tokenized as-is (``DIRECT``), after a sanitization pass like
|
||||
site-tag stripping or apostrophe removal (``SANITIZED``), or whether
|
||||
structural parsing failed and an LLM rebuild is needed (``AI``).
|
||||
|
||||
This is **orthogonal** to :class:`~alfred.domain.release.parser.scoring.Road`
|
||||
(EASY / SHITTY / PATH_OF_PAIN), which captures parser confidence and
|
||||
is recorded on :class:`ParseReport`. Both can vary independently —
|
||||
a SANITIZED name can still land on the EASY road if a group schema
|
||||
matches the tokens after stripping.
|
||||
|
||||
``str``-backed for the same reasons as :class:`MediaTypeToken`."""
|
||||
|
||||
DIRECT = "direct"
|
||||
SANITIZED = "sanitized"
|
||||
@@ -102,13 +114,17 @@ class ParseReport:
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class ParsedRelease:
|
||||
"""Structured representation of a parsed release name.
|
||||
|
||||
``title_sanitized`` carries the filesystem-safe form of ``title`` (computed
|
||||
by the parser at construction time using the injected knowledge base).
|
||||
Builder methods rely on it being already-sanitized — see module docstring.
|
||||
|
||||
Frozen: enrichment passes (``detect_media_type``, ``enrich_from_probe``)
|
||||
return a **new** ``ParsedRelease`` via ``dataclasses.replace`` rather
|
||||
than mutating in place. ``languages`` is a tuple for the same reason.
|
||||
"""
|
||||
|
||||
raw: str # original release name (untouched)
|
||||
@@ -123,13 +139,12 @@ class ParsedRelease:
|
||||
source: str | None # WEBRip, BluRay, …
|
||||
codec: str | None # x265, HEVC, …
|
||||
group: str # release group, "UNKNOWN" if missing
|
||||
tech_string: str # quality.source.codec joined with dots
|
||||
media_type: MediaTypeToken = MediaTypeToken.UNKNOWN
|
||||
site_tag: str | None = (
|
||||
None # site watermark stripped from name, e.g. "TGx", "OxTorrent.vc"
|
||||
)
|
||||
parse_path: ParsePath = ParsePath.DIRECT
|
||||
languages: list[str] = field(default_factory=list) # ["MULTI", "VFF"], ["FRENCH"], …
|
||||
parse_path: TokenizationRoute = TokenizationRoute.DIRECT
|
||||
languages: tuple[str, ...] = () # ("MULTI", "VFF"), ("FRENCH",), …
|
||||
audio_codec: str | None = None # "DTS-HD.MA", "DDP", "EAC3", …
|
||||
audio_channels: str | None = None # "5.1", "7.1", "2.0", …
|
||||
bit_depth: str | None = None # "10bit", "8bit", …
|
||||
@@ -169,9 +184,9 @@ class ParsedRelease:
|
||||
f"ParsedRelease.media_type must be a MediaTypeToken, "
|
||||
f"got {type(self.media_type).__name__}: {self.media_type!r}"
|
||||
)
|
||||
if not isinstance(self.parse_path, ParsePath):
|
||||
if not isinstance(self.parse_path, TokenizationRoute):
|
||||
raise ValidationError(
|
||||
f"ParsedRelease.parse_path must be a ParsePath, "
|
||||
f"ParsedRelease.parse_path must be a TokenizationRoute, "
|
||||
f"got {type(self.parse_path).__name__}: {self.parse_path!r}"
|
||||
)
|
||||
|
||||
@@ -179,6 +194,15 @@ class ParsedRelease:
|
||||
def is_season_pack(self) -> bool:
|
||||
return self.season is not None and self.episode is None
|
||||
|
||||
@property
|
||||
def tech_string(self) -> str:
|
||||
"""``quality.source.codec`` joined by dots, skipping ``None`` parts.
|
||||
|
||||
Derived on every access so it stays in sync with the underlying
|
||||
fields — no manual refresh needed after enrichment.
|
||||
"""
|
||||
return ".".join(p for p in (self.quality, self.source, self.codec) if p)
|
||||
|
||||
def show_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str:
|
||||
"""
|
||||
Build the series root folder name.
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
These are the **container-view** dataclasses, populated from ffprobe output and
|
||||
used across the project to describe the content of a media file.
|
||||
|
||||
Not to be confused with ``alfred.domain.subtitles.entities.SubtitleCandidate``
|
||||
Not to be confused with ``alfred.domain.subtitles.entities.SubtitleScanResult``
|
||||
which models a subtitle being **scanned/matched** (with confidence, raw tokens,
|
||||
file path, etc.). The two coexist by design — they describe the same real-world
|
||||
concept seen from two different bounded contexts.
|
||||
@@ -218,8 +218,8 @@ class MediaWithTracks:
|
||||
|
||||
Hosts must expose two attributes:
|
||||
|
||||
* ``audio_tracks: list[AudioTrack]``
|
||||
* ``subtitle_tracks: list[SubtitleTrack]``
|
||||
* ``audio_tracks: tuple[AudioTrack, ...]``
|
||||
* ``subtitle_tracks: tuple[SubtitleTrack, ...]``
|
||||
|
||||
The helpers follow the "C+" matching contract: pass a :class:`Language`
|
||||
for cross-format matching, or a ``str`` for case-insensitive comparison.
|
||||
@@ -227,8 +227,8 @@ class MediaWithTracks:
|
||||
|
||||
# These attributes are provided by the host entity (Movie, Episode, …).
|
||||
# Declared here only for type-checkers and to make the contract explicit.
|
||||
audio_tracks: list[AudioTrack]
|
||||
subtitle_tracks: list[SubtitleTrack]
|
||||
audio_tracks: tuple[AudioTrack, ...]
|
||||
subtitle_tracks: tuple[SubtitleTrack, ...]
|
||||
|
||||
# ── Audio helpers ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Subtitles domain — subtitle identification, classification and placement."""
|
||||
|
||||
from .aggregates import SubtitleRuleSet
|
||||
from .entities import MediaSubtitleMetadata, SubtitleCandidate
|
||||
from .entities import MediaSubtitleMetadata, SubtitleScanResult
|
||||
from .exceptions import SubtitleNotFound
|
||||
from .services import PatternDetector, SubtitleIdentifier, SubtitleMatcher
|
||||
from .value_objects import (
|
||||
@@ -17,7 +17,7 @@ from .value_objects import (
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"SubtitleCandidate",
|
||||
"SubtitleScanResult",
|
||||
"MediaSubtitleMetadata",
|
||||
"SubtitleRuleSet",
|
||||
"SubtitleIdentifier",
|
||||
|
||||
@@ -12,16 +12,18 @@ from .value_objects import (
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubtitleCandidate:
|
||||
class SubtitleScanResult:
|
||||
"""
|
||||
A subtitle being scanned and matched — either an external file or an embedded stream.
|
||||
A subtitle observed during a scan — either an external file or an embedded stream.
|
||||
|
||||
Unlike ``alfred.domain.shared.media.SubtitleTrack`` (the pure container-view
|
||||
populated from ffprobe), a SubtitleCandidate carries the **flow state** of the
|
||||
subtitle matching pipeline: language/format are typed value objects that may
|
||||
be ``None`` while classification is in progress, ``confidence`` reflects how
|
||||
certain we are, and ``raw_tokens`` holds the filename fragments still under
|
||||
analysis. State evolves: unknown → resolved after user clarification.
|
||||
populated from ffprobe), a ``SubtitleScanResult`` carries the **flow state**
|
||||
of the subtitle matching pipeline: language/format are typed value objects
|
||||
that may be ``None`` while classification is in progress, ``confidence``
|
||||
reflects how certain we are, and ``raw_tokens`` holds the filename fragments
|
||||
still under analysis. State evolves: unknown → resolved after user
|
||||
clarification. The name reflects this — it's the **output of a scan pass**,
|
||||
not a value object.
|
||||
"""
|
||||
|
||||
# Classification (may be None if not yet resolved)
|
||||
@@ -72,7 +74,7 @@ class SubtitleCandidate:
|
||||
if self.is_embedded
|
||||
else str(self.file_path.name if self.file_path else "?")
|
||||
)
|
||||
return f"SubtitleCandidate({lang}, {self.subtitle_type.value}, {fmt}, src={src}, conf={self.confidence:.2f})"
|
||||
return f"SubtitleScanResult({lang}, {self.subtitle_type.value}, {fmt}, src={src}, conf={self.confidence:.2f})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -84,14 +86,14 @@ class MediaSubtitleMetadata:
|
||||
|
||||
media_id: ImdbId | None
|
||||
media_type: str # "movie" | "tv_show"
|
||||
embedded_tracks: list[SubtitleCandidate] = field(default_factory=list)
|
||||
external_tracks: list[SubtitleCandidate] = field(default_factory=list)
|
||||
embedded_tracks: list[SubtitleScanResult] = field(default_factory=list)
|
||||
external_tracks: list[SubtitleScanResult] = field(default_factory=list)
|
||||
release_group: str | None = None
|
||||
detected_pattern_id: str | None = None # pattern id from knowledge base
|
||||
pattern_confirmed: bool = False
|
||||
|
||||
@property
|
||||
def all_tracks(self) -> list[SubtitleCandidate]:
|
||||
def all_tracks(self) -> list[SubtitleScanResult]:
|
||||
return self.embedded_tracks + self.external_tracks
|
||||
|
||||
@property
|
||||
@@ -99,5 +101,5 @@ class MediaSubtitleMetadata:
|
||||
return len(self.embedded_tracks) + len(self.external_tracks)
|
||||
|
||||
@property
|
||||
def unresolved_tracks(self) -> list[SubtitleCandidate]:
|
||||
def unresolved_tracks(self) -> list[SubtitleScanResult]:
|
||||
return [t for t in self.external_tracks if t.language is None]
|
||||
|
||||
@@ -7,7 +7,7 @@ from pathlib import Path
|
||||
from ...shared.ports import FilesystemScanner, MediaProber
|
||||
from ..ports import SubtitleKnowledge
|
||||
from ...shared.value_objects import ImdbId
|
||||
from ..entities import MediaSubtitleMetadata, SubtitleCandidate
|
||||
from ..entities import MediaSubtitleMetadata, SubtitleScanResult
|
||||
from ..value_objects import ScanStrategy, SubtitlePattern, SubtitleType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -94,7 +94,7 @@ class SubtitleIdentifier:
|
||||
# Embedded tracks — via MediaProber
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _scan_embedded(self, video_path: Path) -> list[SubtitleCandidate]:
|
||||
def _scan_embedded(self, video_path: Path) -> list[SubtitleScanResult]:
|
||||
streams = self.prober.list_subtitle_streams(video_path)
|
||||
|
||||
tracks = []
|
||||
@@ -111,7 +111,7 @@ class SubtitleIdentifier:
|
||||
stype = SubtitleType.STANDARD
|
||||
|
||||
tracks.append(
|
||||
SubtitleCandidate(
|
||||
SubtitleScanResult(
|
||||
language=lang,
|
||||
format=None,
|
||||
subtitle_type=stype,
|
||||
@@ -131,7 +131,7 @@ class SubtitleIdentifier:
|
||||
|
||||
def _scan_external(
|
||||
self, video_path: Path, pattern: SubtitlePattern
|
||||
) -> list[SubtitleCandidate]:
|
||||
) -> list[SubtitleScanResult]:
|
||||
strategy = pattern.scan_strategy
|
||||
episode_stem: str | None = None
|
||||
|
||||
@@ -200,7 +200,7 @@ class SubtitleIdentifier:
|
||||
entries: list,
|
||||
pattern: SubtitlePattern,
|
||||
episode_stem: str | None = None,
|
||||
) -> list[SubtitleCandidate]:
|
||||
) -> list[SubtitleScanResult]:
|
||||
tracks = [
|
||||
self._classify_single(entry, episode_stem=episode_stem) for entry in entries
|
||||
]
|
||||
@@ -214,7 +214,7 @@ class SubtitleIdentifier:
|
||||
|
||||
def _classify_single(
|
||||
self, entry, episode_stem: str | None = None
|
||||
) -> SubtitleCandidate:
|
||||
) -> SubtitleScanResult:
|
||||
fmt = self.kb.format_for_extension(entry.suffix)
|
||||
tokens = (
|
||||
_tokenize_suffix(entry.stem, episode_stem)
|
||||
@@ -253,7 +253,7 @@ class SubtitleIdentifier:
|
||||
if entry.suffix.lower() == ".srt":
|
||||
entry_count = _count_entries(self.scanner.read_text(entry.path))
|
||||
|
||||
return SubtitleCandidate(
|
||||
return SubtitleScanResult(
|
||||
language=language,
|
||||
format=fmt,
|
||||
subtitle_type=subtitle_type,
|
||||
@@ -266,8 +266,8 @@ class SubtitleIdentifier:
|
||||
)
|
||||
|
||||
def _disambiguate_by_size(
|
||||
self, tracks: list[SubtitleCandidate]
|
||||
) -> list[SubtitleCandidate]:
|
||||
self, tracks: list[SubtitleScanResult]
|
||||
) -> list[SubtitleScanResult]:
|
||||
"""
|
||||
When multiple tracks share the same language and type is UNKNOWN/STANDARD,
|
||||
the one with the most entries (lines) is SDH, the smallest is FORCED if
|
||||
@@ -277,7 +277,7 @@ class SubtitleIdentifier:
|
||||
"""
|
||||
|
||||
# Group by language code
|
||||
lang_groups: dict[str, list[SubtitleCandidate]] = {}
|
||||
lang_groups: dict[str, list[SubtitleScanResult]] = {}
|
||||
for track in tracks:
|
||||
key = track.language.code if track.language else "__unknown__"
|
||||
lang_groups.setdefault(key, []).append(track)
|
||||
@@ -306,6 +306,6 @@ class SubtitleIdentifier:
|
||||
|
||||
return result
|
||||
|
||||
def _set_type(self, track: SubtitleCandidate, stype: SubtitleType) -> None:
|
||||
def _set_type(self, track: SubtitleScanResult, stype: SubtitleType) -> None:
|
||||
"""Mutate track type in-place."""
|
||||
track.subtitle_type = stype
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import logging
|
||||
|
||||
from ..entities import SubtitleCandidate
|
||||
from ..entities import SubtitleScanResult
|
||||
from ..value_objects import SubtitleMatchingRules
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -10,7 +10,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class SubtitleMatcher:
|
||||
"""
|
||||
Filters a list of SubtitleCandidate against effective SubtitleMatchingRules.
|
||||
Filters a list of SubtitleScanResult against effective SubtitleMatchingRules.
|
||||
|
||||
Returns matched tracks (pass all filters, confidence >= min_confidence)
|
||||
and unresolved tracks (need user clarification).
|
||||
@@ -21,14 +21,14 @@ class SubtitleMatcher:
|
||||
|
||||
def match(
|
||||
self,
|
||||
tracks: list[SubtitleCandidate],
|
||||
tracks: list[SubtitleScanResult],
|
||||
rules: SubtitleMatchingRules,
|
||||
) -> tuple[list[SubtitleCandidate], list[SubtitleCandidate]]:
|
||||
) -> tuple[list[SubtitleScanResult], list[SubtitleScanResult]]:
|
||||
"""
|
||||
Returns (matched, unresolved).
|
||||
"""
|
||||
matched: list[SubtitleCandidate] = []
|
||||
unresolved: list[SubtitleCandidate] = []
|
||||
matched: list[SubtitleScanResult] = []
|
||||
unresolved: list[SubtitleScanResult] = []
|
||||
|
||||
for track in tracks:
|
||||
if track.is_embedded:
|
||||
@@ -51,7 +51,7 @@ class SubtitleMatcher:
|
||||
return matched, unresolved
|
||||
|
||||
def _passes_filters(
|
||||
self, track: SubtitleCandidate, rules: SubtitleMatchingRules
|
||||
self, track: SubtitleScanResult, rules: SubtitleMatchingRules
|
||||
) -> bool:
|
||||
# Language filter
|
||||
if rules.preferred_languages:
|
||||
@@ -76,14 +76,14 @@ class SubtitleMatcher:
|
||||
|
||||
def _resolve_conflicts(
|
||||
self,
|
||||
tracks: list[SubtitleCandidate],
|
||||
tracks: list[SubtitleScanResult],
|
||||
rules: SubtitleMatchingRules,
|
||||
) -> list[SubtitleCandidate]:
|
||||
) -> list[SubtitleScanResult]:
|
||||
"""
|
||||
When multiple tracks have same language + type, keep only the best one
|
||||
according to format_priority. If no format_priority applies, keep the first.
|
||||
"""
|
||||
seen: dict[tuple, SubtitleCandidate] = {}
|
||||
seen: dict[tuple, SubtitleScanResult] = {}
|
||||
|
||||
for track in tracks:
|
||||
lang = track.language.code if track.language else None
|
||||
@@ -106,8 +106,8 @@ class SubtitleMatcher:
|
||||
|
||||
def _prefer(
|
||||
self,
|
||||
candidate: SubtitleCandidate,
|
||||
existing: SubtitleCandidate,
|
||||
candidate: SubtitleScanResult,
|
||||
existing: SubtitleScanResult,
|
||||
format_priority: list[str],
|
||||
) -> bool:
|
||||
"""Return True if candidate is preferable to existing."""
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
"""Subtitle service utilities."""
|
||||
|
||||
from ..entities import SubtitleCandidate
|
||||
from ..entities import SubtitleScanResult
|
||||
|
||||
|
||||
def available_subtitles(tracks: list[SubtitleCandidate]) -> list[SubtitleCandidate]:
|
||||
def available_subtitles(tracks: list[SubtitleScanResult]) -> list[SubtitleScanResult]:
|
||||
"""
|
||||
Return the distinct subtitle tracks available, deduped by (language, type).
|
||||
|
||||
@@ -11,7 +11,7 @@ def available_subtitles(tracks: list[SubtitleCandidate]) -> list[SubtitleCandida
|
||||
preferences — e.g. eng, eng.sdh, fra all show up as separate entries.
|
||||
"""
|
||||
seen: set[tuple] = set()
|
||||
result: list[SubtitleCandidate] = []
|
||||
result: list[SubtitleScanResult] = []
|
||||
for track in tracks:
|
||||
lang = track.language.code if track.language else None
|
||||
key = (lang, track.subtitle_type)
|
||||
|
||||
@@ -47,16 +47,19 @@ from .value_objects import (
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
|
||||
@dataclass(eq=False)
|
||||
@dataclass(frozen=True, eq=False)
|
||||
class Episode(MediaWithTracks):
|
||||
"""
|
||||
A single episode of a TV show — leaf of the TVShow aggregate.
|
||||
|
||||
Carries the file metadata (path, size) and the discovered tracks
|
||||
(audio + subtitle). Track lists are populated by the ffprobe + subtitle
|
||||
(audio + subtitle). Track tuples are populated by the ffprobe + subtitle
|
||||
scan pipeline; they may be empty when the episode is known but not yet
|
||||
scanned, or when no file is downloaded yet.
|
||||
|
||||
Frozen: rebuild via ``dataclasses.replace`` to project enrichment results
|
||||
onto a new instance.
|
||||
|
||||
Equality is identity-based within the aggregate: two ``Episode`` instances
|
||||
are equal iff they share the same ``(season_number, episode_number)``,
|
||||
regardless of title/file/track contents. The root TVShow guarantees
|
||||
@@ -68,17 +71,21 @@ class Episode(MediaWithTracks):
|
||||
title: str
|
||||
file_path: FilePath | None = None
|
||||
file_size: FileSize | None = None
|
||||
audio_tracks: list[AudioTrack] = field(default_factory=list)
|
||||
subtitle_tracks: list[SubtitleTrack] = field(default_factory=list)
|
||||
audio_tracks: tuple[AudioTrack, ...] = field(default_factory=tuple)
|
||||
subtitle_tracks: tuple[SubtitleTrack, ...] = field(default_factory=tuple)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
# Coerce numbers if raw ints were passed
|
||||
if not isinstance(self.season_number, SeasonNumber):
|
||||
if isinstance(self.season_number, int):
|
||||
self.season_number = SeasonNumber(self.season_number)
|
||||
object.__setattr__(
|
||||
self, "season_number", SeasonNumber(self.season_number)
|
||||
)
|
||||
if not isinstance(self.episode_number, EpisodeNumber):
|
||||
if isinstance(self.episode_number, int):
|
||||
self.episode_number = EpisodeNumber(self.episode_number)
|
||||
object.__setattr__(
|
||||
self, "episode_number", EpisodeNumber(self.episode_number)
|
||||
)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, Episode):
|
||||
|
||||
@@ -191,6 +191,36 @@ def load_scoring() -> dict:
|
||||
}
|
||||
|
||||
|
||||
def load_probe_mappings() -> dict:
|
||||
"""Load ffprobe→scene-token translation tables.
|
||||
|
||||
Returns a dict with three keys:
|
||||
|
||||
- ``video_codec``: ``{ffprobe_codec_lower: scene_token}``
|
||||
- ``audio_codec``: ``{ffprobe_codec_lower: scene_token}``
|
||||
- ``audio_channels``: ``{channel_count_int: layout_str}``
|
||||
|
||||
Channel-count keys are normalized to ``int`` here so the consumer can
|
||||
look up ``track.channels`` directly. Missing sections fall back to
|
||||
empty dicts — the enrichment code degrades to its uppercase-fallback
|
||||
path when a mapping is absent.
|
||||
"""
|
||||
raw = _load("probe_mappings.yaml")
|
||||
video_codec = {k.lower(): v for k, v in (raw.get("video_codec") or {}).items()}
|
||||
audio_codec = {k.lower(): v for k, v in (raw.get("audio_codec") or {}).items()}
|
||||
audio_channels: dict[int, str] = {}
|
||||
for k, v in (raw.get("audio_channels") or {}).items():
|
||||
try:
|
||||
audio_channels[int(k)] = v
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
return {
|
||||
"video_codec": video_codec,
|
||||
"audio_codec": audio_codec,
|
||||
"audio_channels": audio_channels,
|
||||
}
|
||||
|
||||
|
||||
def load_separators() -> list[str]:
|
||||
"""Single-char token separators used by the release name tokenizer.
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ from .release import (
|
||||
load_media_type_tokens,
|
||||
load_metadata_extensions,
|
||||
load_non_video_extensions,
|
||||
load_probe_mappings,
|
||||
load_resolutions,
|
||||
load_scoring,
|
||||
load_separators,
|
||||
@@ -89,6 +90,10 @@ class YamlReleaseKnowledge:
|
||||
# Parse-scoring config (weights / penalties / thresholds).
|
||||
self.scoring: dict = load_scoring()
|
||||
|
||||
# ffprobe → scene-token mapping tables (consumed by
|
||||
# ``application.release.enrich_from_probe``).
|
||||
self.probe_mappings: dict = load_probe_mappings()
|
||||
|
||||
# File-extension sets (used by application/infra modules, not by
|
||||
# the parser itself — kept here so there is a single ownership
|
||||
# point for release knowledge).
|
||||
|
||||
@@ -13,7 +13,7 @@ from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import SubtitleScanResult
|
||||
from alfred.application.subtitles.placer import PlacedTrack
|
||||
from alfred.infrastructure.metadata.store import MetadataStore
|
||||
|
||||
@@ -25,7 +25,7 @@ class SubtitleMetadataStore:
|
||||
Subtitle-pipeline view of the per-release `.alfred/metadata.yaml`.
|
||||
|
||||
Backed by a generic MetadataStore; this class only knows how to build
|
||||
a subtitle_history entry from PlacedTrack/SubtitleCandidate pairs.
|
||||
a subtitle_history entry from PlacedTrack/SubtitleScanResult pairs.
|
||||
"""
|
||||
|
||||
def __init__(self, library_root: Path):
|
||||
@@ -45,7 +45,7 @@ class SubtitleMetadataStore:
|
||||
|
||||
def append_history(
|
||||
self,
|
||||
placed_pairs: list[tuple[PlacedTrack, SubtitleCandidate]],
|
||||
placed_pairs: list[tuple[PlacedTrack, SubtitleScanResult]],
|
||||
season: int | None = None,
|
||||
episode: int | None = None,
|
||||
release_group: str | None = None,
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
# Translation table — ffprobe output → scene-style release tokens.
|
||||
#
|
||||
# Consumed by ``alfred.application.release.enrich_from_probe`` when filling
|
||||
# missing ParsedRelease fields from a probed MediaInfo. Token-level values
|
||||
# from the release name always win; these mappings only fire when the
|
||||
# corresponding ParsedRelease field is None.
|
||||
#
|
||||
# Lookup is case-insensitive on the key side (ffprobe sometimes emits
|
||||
# uppercase, sometimes lowercase). When no key matches, the fallback is
|
||||
# ``ffprobe_value.upper()`` so unknown codecs still surface in a
|
||||
# predictable form (and signal the gap to a future "learn" pass).
|
||||
#
|
||||
# Each section is a flat dict — values are the canonical scene tokens
|
||||
# Alfred uses everywhere (filename builders, ParsedRelease fields).
|
||||
|
||||
# ffprobe video codec name → scene codec token
|
||||
video_codec:
|
||||
hevc: x265
|
||||
h264: x264
|
||||
h265: x265
|
||||
av1: AV1
|
||||
vp9: VP9
|
||||
mpeg4: XviD
|
||||
|
||||
# ffprobe audio codec name → scene audio token
|
||||
audio_codec:
|
||||
eac3: EAC3
|
||||
ac3: AC3
|
||||
dts: DTS
|
||||
truehd: TrueHD
|
||||
aac: AAC
|
||||
flac: FLAC
|
||||
opus: OPUS
|
||||
mp3: MP3
|
||||
pcm_s16l: PCM
|
||||
pcm_s24l: PCM
|
||||
|
||||
# Channel count (integer) → standard layout string.
|
||||
# Keys are strings here because YAML mappings prefer string keys; the
|
||||
# loader normalizes them back to int.
|
||||
audio_channels:
|
||||
"8": "7.1"
|
||||
"6": "5.1"
|
||||
"2": "2.0"
|
||||
"1": "1.0"
|
||||
@@ -21,3 +21,4 @@ separators:
|
||||
- "(" # parenthesis-embedded (year, edition): (2020) (Director's Cut)
|
||||
- ")"
|
||||
- "_" # underscore-as-space (old usenet, some Asian releases)
|
||||
- "|" # fullwidth vertical bar U+FF5C (CJK release names, occasional decorative use)
|
||||
|
||||
@@ -124,8 +124,16 @@ def dry_run(release_name: str) -> None:
|
||||
from alfred.application.filesystem.resolve_destination import (
|
||||
resolve_season_destination,
|
||||
)
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
result = resolve_season_destination(release_name, tmdb_title, tmdb_year)
|
||||
result = resolve_season_destination(
|
||||
release_name,
|
||||
tmdb_title,
|
||||
tmdb_year,
|
||||
YamlReleaseKnowledge(),
|
||||
FfprobeMediaProber(),
|
||||
)
|
||||
d = result.to_dict()
|
||||
print()
|
||||
print(json.dumps(d, indent=2, ensure_ascii=False))
|
||||
@@ -203,8 +211,16 @@ def do_move(release_name: str, source_folder: str | None = None) -> None:
|
||||
from alfred.application.filesystem.resolve_destination import (
|
||||
resolve_season_destination,
|
||||
)
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
result = resolve_season_destination(release_name, tmdb_title, tmdb_year)
|
||||
result = resolve_season_destination(
|
||||
release_name,
|
||||
tmdb_title,
|
||||
tmdb_year,
|
||||
YamlReleaseKnowledge(),
|
||||
FfprobeMediaProber(),
|
||||
)
|
||||
d = result.to_dict()
|
||||
|
||||
if d["status"] == "needs_clarification":
|
||||
|
||||
@@ -100,12 +100,17 @@ def main() -> None:
|
||||
print(c(f"Error: {downloads} does not exist", RED), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
from dataclasses import replace
|
||||
|
||||
from alfred.application.release.detect_media_type import detect_media_type
|
||||
from alfred.application.release.enrich_from_probe import enrich_from_probe
|
||||
from alfred.domain.release.services import parse_release
|
||||
from alfred.domain.release.value_objects import MediaTypeToken
|
||||
from alfred.infrastructure.filesystem.find_video import find_video_file
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
from alfred.infrastructure.probe import FfprobeMediaProber
|
||||
|
||||
_kb = YamlReleaseKnowledge()
|
||||
_prober = FfprobeMediaProber()
|
||||
|
||||
entries = sorted(downloads.iterdir(), key=lambda p: p.name.lower())
|
||||
@@ -123,14 +128,14 @@ def main() -> None:
|
||||
name = entry.name
|
||||
|
||||
try:
|
||||
p = parse_release(name)
|
||||
p.media_type = detect_media_type(p, entry)
|
||||
p, _report = parse_release(name, _kb)
|
||||
p = replace(p, media_type=MediaTypeToken(detect_media_type(p, entry, _kb)))
|
||||
if p.media_type not in ("unknown", "other"):
|
||||
video_file = find_video_file(entry)
|
||||
if video_file:
|
||||
media_info = _prober.probe(video_file)
|
||||
if media_info:
|
||||
enrich_from_probe(p, media_info)
|
||||
p = enrich_from_probe(p, media_info, _kb)
|
||||
warnings = _assess(p)
|
||||
except Exception as e:
|
||||
warnings = [f"parse error: {e}"]
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
"""Tests for ``alfred.application.release.enrich_from_probe``.
|
||||
|
||||
The function mutates a ``ParsedRelease`` in place using ffprobe ``MediaInfo``.
|
||||
Token-level values from the release name always win — only ``None`` fields
|
||||
are filled.
|
||||
The function returns a new ``ParsedRelease`` with ``None`` fields filled
|
||||
from ffprobe ``MediaInfo``. Token-level values from the release name
|
||||
always win — only ``None`` fields are filled.
|
||||
|
||||
Coverage:
|
||||
|
||||
@@ -21,6 +21,9 @@ from __future__ import annotations
|
||||
from alfred.application.release.enrich_from_probe import enrich_from_probe
|
||||
from alfred.domain.release.value_objects import ParsedRelease
|
||||
from alfred.domain.shared.media import AudioTrack, MediaInfo, VideoTrack
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
|
||||
_KB = YamlReleaseKnowledge()
|
||||
|
||||
|
||||
def _info_with_video(*, width=None, height=None, codec=None, **rest) -> MediaInfo:
|
||||
@@ -46,7 +49,6 @@ def _bare(**overrides) -> ParsedRelease:
|
||||
source=None,
|
||||
codec=None,
|
||||
group="UNKNOWN",
|
||||
tech_string="",
|
||||
)
|
||||
defaults.update(overrides)
|
||||
return ParsedRelease(**defaults)
|
||||
@@ -60,17 +62,17 @@ def _bare(**overrides) -> ParsedRelease:
|
||||
class TestQuality:
|
||||
def test_fills_when_none(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, _info_with_video(width=1920, height=1080))
|
||||
p = enrich_from_probe(p, _info_with_video(width=1920, height=1080), _KB)
|
||||
assert p.quality == "1080p"
|
||||
|
||||
def test_does_not_overwrite_existing(self):
|
||||
p = _bare(quality="2160p")
|
||||
enrich_from_probe(p, _info_with_video(width=1920, height=1080))
|
||||
p = enrich_from_probe(p, _info_with_video(width=1920, height=1080), _KB)
|
||||
assert p.quality == "2160p"
|
||||
|
||||
def test_no_dims_leaves_none(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
p = enrich_from_probe(p, MediaInfo(), _KB)
|
||||
assert p.quality is None
|
||||
|
||||
|
||||
@@ -82,27 +84,27 @@ class TestQuality:
|
||||
class TestVideoCodec:
|
||||
def test_hevc_to_x265(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, _info_with_video(codec="hevc"))
|
||||
p = enrich_from_probe(p, _info_with_video(codec="hevc"), _KB)
|
||||
assert p.codec == "x265"
|
||||
|
||||
def test_h264_to_x264(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, _info_with_video(codec="h264"))
|
||||
p = enrich_from_probe(p, _info_with_video(codec="h264"), _KB)
|
||||
assert p.codec == "x264"
|
||||
|
||||
def test_unknown_codec_uppercased(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, _info_with_video(codec="weird"))
|
||||
p = enrich_from_probe(p, _info_with_video(codec="weird"), _KB)
|
||||
assert p.codec == "WEIRD"
|
||||
|
||||
def test_does_not_overwrite_existing(self):
|
||||
p = _bare(codec="HEVC")
|
||||
enrich_from_probe(p, _info_with_video(codec="h264"))
|
||||
p = enrich_from_probe(p, _info_with_video(codec="h264"), _KB)
|
||||
assert p.codec == "HEVC"
|
||||
|
||||
def test_no_codec_leaves_none(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
p = enrich_from_probe(p, MediaInfo(), _KB)
|
||||
assert p.codec is None
|
||||
|
||||
|
||||
@@ -120,7 +122,7 @@ class TestAudio:
|
||||
]
|
||||
)
|
||||
p = _bare()
|
||||
enrich_from_probe(p, info)
|
||||
p = enrich_from_probe(p, info, _KB)
|
||||
assert p.audio_codec == "EAC3"
|
||||
assert p.audio_channels == "5.1"
|
||||
|
||||
@@ -132,32 +134,32 @@ class TestAudio:
|
||||
]
|
||||
)
|
||||
p = _bare()
|
||||
enrich_from_probe(p, info)
|
||||
p = enrich_from_probe(p, info, _KB)
|
||||
assert p.audio_codec == "AC3"
|
||||
assert p.audio_channels == "5.1"
|
||||
|
||||
def test_channel_count_unknown_falls_back(self):
|
||||
info = MediaInfo(audio_tracks=[AudioTrack(0, "aac", 4, "quad", "eng")])
|
||||
p = _bare()
|
||||
enrich_from_probe(p, info)
|
||||
p = enrich_from_probe(p, info, _KB)
|
||||
assert p.audio_channels == "4ch"
|
||||
|
||||
def test_unknown_audio_codec_uppercased(self):
|
||||
info = MediaInfo(audio_tracks=[AudioTrack(0, "newcodec", 2, "stereo", "eng")])
|
||||
p = _bare()
|
||||
enrich_from_probe(p, info)
|
||||
p = enrich_from_probe(p, info, _KB)
|
||||
assert p.audio_codec == "NEWCODEC"
|
||||
|
||||
def test_no_audio_tracks(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
p = enrich_from_probe(p, MediaInfo(), _KB)
|
||||
assert p.audio_codec is None
|
||||
assert p.audio_channels is None
|
||||
|
||||
def test_does_not_overwrite_existing_audio_fields(self):
|
||||
info = MediaInfo(audio_tracks=[AudioTrack(0, "ac3", 6, "5.1", "eng")])
|
||||
p = _bare(audio_codec="DTS-HD.MA", audio_channels="7.1")
|
||||
enrich_from_probe(p, info)
|
||||
p = enrich_from_probe(p, info, _KB)
|
||||
assert p.audio_codec == "DTS-HD.MA"
|
||||
assert p.audio_channels == "7.1"
|
||||
|
||||
@@ -176,8 +178,8 @@ class TestLanguages:
|
||||
]
|
||||
)
|
||||
p = _bare()
|
||||
enrich_from_probe(p, info)
|
||||
assert p.languages == ["eng", "fre"]
|
||||
p = enrich_from_probe(p, info, _KB)
|
||||
assert p.languages == ("eng", "fre")
|
||||
|
||||
def test_skips_und(self):
|
||||
info = MediaInfo(
|
||||
@@ -187,8 +189,8 @@ class TestLanguages:
|
||||
]
|
||||
)
|
||||
p = _bare()
|
||||
enrich_from_probe(p, info)
|
||||
assert p.languages == ["eng"]
|
||||
p = enrich_from_probe(p, info, _KB)
|
||||
assert p.languages == ("eng",)
|
||||
|
||||
def test_dedup_against_existing_case_insensitive(self):
|
||||
# existing token-level languages are typically upper-case ("FRENCH", "ENG")
|
||||
@@ -200,16 +202,15 @@ class TestLanguages:
|
||||
AudioTrack(1, "aac", 2, "stereo", "fre"),
|
||||
]
|
||||
)
|
||||
p = _bare()
|
||||
p.languages = ["ENG"]
|
||||
enrich_from_probe(p, info)
|
||||
p = _bare(languages=("ENG",))
|
||||
p = enrich_from_probe(p, info, _KB)
|
||||
# "eng" → upper "ENG" already present → skipped. "fre" → "FRE" new → kept.
|
||||
assert p.languages == ["ENG", "fre"]
|
||||
assert p.languages == ("ENG", "fre")
|
||||
|
||||
def test_no_audio_tracks_leaves_languages_empty(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
assert p.languages == []
|
||||
p = enrich_from_probe(p, MediaInfo(), _KB)
|
||||
assert p.languages == ()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
@@ -218,13 +219,14 @@ class TestLanguages:
|
||||
|
||||
|
||||
class TestTechString:
|
||||
"""tech_string drives the filename builders; it must be re-derived
|
||||
whenever quality / source / codec change."""
|
||||
"""tech_string is a derived property on ParsedRelease: it always
|
||||
reflects the current quality/source/codec. Enrichment never writes
|
||||
it directly — it stays in sync by construction."""
|
||||
|
||||
def test_rebuilt_from_filled_quality_and_codec(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(
|
||||
p, _info_with_video(width=1920, height=1080, codec="hevc")
|
||||
p = enrich_from_probe(
|
||||
p, _info_with_video(width=1920, height=1080, codec="hevc"), _KB
|
||||
)
|
||||
assert p.quality == "1080p"
|
||||
assert p.codec == "x265"
|
||||
@@ -233,19 +235,19 @@ class TestTechString:
|
||||
def test_keeps_existing_source_when_enriching(self):
|
||||
# Token-level source must stay; probe fills only None fields.
|
||||
p = _bare(source="BluRay")
|
||||
enrich_from_probe(
|
||||
p, _info_with_video(width=1920, height=1080, codec="hevc")
|
||||
p = enrich_from_probe(
|
||||
p, _info_with_video(width=1920, height=1080, codec="hevc"), _KB
|
||||
)
|
||||
assert p.tech_string == "1080p.BluRay.x265"
|
||||
|
||||
def test_unchanged_when_no_enrichable_video_info(self):
|
||||
# No video info → nothing to fill → tech_string stays as it was.
|
||||
# No video info → nothing to fill → derived tech_string stays as it was.
|
||||
p = _bare(quality="2160p", source="WEB-DL", codec="x265")
|
||||
p.tech_string = "2160p.WEB-DL.x265"
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
assert p.tech_string == "2160p.WEB-DL.x265"
|
||||
p = enrich_from_probe(p, MediaInfo(), _KB)
|
||||
assert p.tech_string == "2160p.WEB-DL.x265"
|
||||
|
||||
def test_empty_when_nothing_known(self):
|
||||
p = _bare()
|
||||
enrich_from_probe(p, MediaInfo())
|
||||
p = enrich_from_probe(p, MediaInfo(), _KB)
|
||||
assert p.tech_string == ""
|
||||
|
||||
@@ -263,3 +263,94 @@ class TestFrozen:
|
||||
pass
|
||||
else: # pragma: no cover
|
||||
raise AssertionError("InspectedResult should be frozen")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# recommended_action #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestRecommendedAction:
|
||||
"""``recommended_action`` collapses the orchestrator's go / wait /
|
||||
skip decision into a single property. The check ordering is part
|
||||
of the contract (skip wins over ask_user, ask_user wins over
|
||||
process) — see the property docstring."""
|
||||
|
||||
def test_skip_when_no_main_video(self, tmp_path: Path) -> None:
|
||||
# Folder with no video at all → main_video is None → skip.
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "readme.txt").write_text("hi")
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, _RaisingProber())
|
||||
|
||||
assert result.main_video is None
|
||||
assert result.recommended_action == "skip"
|
||||
|
||||
def test_skip_when_media_type_other(self, tmp_path: Path) -> None:
|
||||
# Folder with only non-video files (ISO) → media_type == "other"
|
||||
# AND main_video is None (find_main_video filters by video ext).
|
||||
# Both branches resolve to "skip"; this asserts the contract holds.
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "disc.iso").write_bytes(b"")
|
||||
|
||||
result = inspect_release(_MOVIE_NAME, folder, _KB, _RaisingProber())
|
||||
|
||||
assert result.parsed.media_type == "other"
|
||||
assert result.recommended_action == "skip"
|
||||
|
||||
def test_ask_user_when_media_type_unknown(self, tmp_path: Path) -> None:
|
||||
# Mixed video + non-video → detect_media_type returns "unknown".
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
(folder / "extras.iso").write_bytes(b"")
|
||||
|
||||
result = inspect_release(
|
||||
_MOVIE_NAME, folder, _KB, _StubProber(_media_info_1080p_h264())
|
||||
)
|
||||
|
||||
assert result.parsed.media_type == "unknown"
|
||||
assert result.recommended_action == "ask_user"
|
||||
|
||||
def test_ask_user_when_path_of_pain_road(self, tmp_path: Path) -> None:
|
||||
# Malformed name (forbidden chars) → road == "path_of_pain".
|
||||
name = "garbage@#%name"
|
||||
folder = tmp_path / "release"
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
|
||||
result = inspect_release(
|
||||
name, folder, _KB, _StubProber(_media_info_1080p_h264())
|
||||
)
|
||||
|
||||
assert result.report.road == "path_of_pain"
|
||||
# main_video is found but the road still flags uncertainty.
|
||||
assert result.main_video is not None
|
||||
assert result.recommended_action == "ask_user"
|
||||
|
||||
def test_process_for_confident_movie(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _MOVIE_NAME
|
||||
folder.mkdir()
|
||||
(folder / "movie.mkv").write_bytes(b"")
|
||||
|
||||
result = inspect_release(
|
||||
_MOVIE_NAME, folder, _KB, _StubProber(_media_info_1080p_h264())
|
||||
)
|
||||
|
||||
assert result.parsed.media_type == "movie"
|
||||
assert result.report.road in ("easy", "shitty")
|
||||
assert result.recommended_action == "process"
|
||||
|
||||
def test_process_for_confident_tv_show(self, tmp_path: Path) -> None:
|
||||
folder = tmp_path / _TV_NAME
|
||||
folder.mkdir()
|
||||
(folder / "episode.mkv").write_bytes(b"")
|
||||
|
||||
result = inspect_release(
|
||||
_TV_NAME, folder, _KB, _StubProber(_media_info_1080p_h264())
|
||||
)
|
||||
|
||||
assert result.parsed.media_type == "tv_show"
|
||||
assert result.recommended_action == "process"
|
||||
|
||||
@@ -40,7 +40,7 @@ from alfred.application.filesystem.manage_subtitles import (
|
||||
_to_imdb_id,
|
||||
_to_unresolved_dto,
|
||||
)
|
||||
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleScanResult
|
||||
from alfred.application.subtitles.placer import PlacedTrack, PlaceResult
|
||||
from alfred.domain.subtitles.value_objects import (
|
||||
ScanStrategy,
|
||||
@@ -63,8 +63,8 @@ def _track(
|
||||
is_embedded: bool = False,
|
||||
raw_tokens: list[str] | None = None,
|
||||
file_size_kb: float | None = None,
|
||||
) -> SubtitleCandidate:
|
||||
return SubtitleCandidate(
|
||||
) -> SubtitleScanResult:
|
||||
return SubtitleScanResult(
|
||||
language=lang,
|
||||
format=fmt,
|
||||
subtitle_type=stype,
|
||||
|
||||
@@ -31,13 +31,53 @@ from alfred.application.filesystem.resolve_destination import (
|
||||
_Clarification,
|
||||
_find_existing_tvshow_folders,
|
||||
_resolve_series_folder,
|
||||
resolve_episode_destination,
|
||||
resolve_movie_destination,
|
||||
resolve_season_destination,
|
||||
resolve_series_destination,
|
||||
resolve_episode_destination as _resolve_episode_destination,
|
||||
resolve_movie_destination as _resolve_movie_destination,
|
||||
resolve_season_destination as _resolve_season_destination,
|
||||
resolve_series_destination as _resolve_series_destination,
|
||||
)
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
from alfred.infrastructure.persistence import Memory, set_memory
|
||||
|
||||
_KB = YamlReleaseKnowledge()
|
||||
|
||||
|
||||
class _NullProber:
|
||||
"""Default prober stub — never returns probe data."""
|
||||
|
||||
def list_subtitle_streams(self, video): # pragma: no cover
|
||||
return []
|
||||
|
||||
def probe(self, video):
|
||||
return None
|
||||
|
||||
|
||||
_DEFAULT_PROBER = _NullProber()
|
||||
|
||||
|
||||
def resolve_season_destination(*args, prober=None, **kwargs):
|
||||
return _resolve_season_destination(
|
||||
*args, kb=_KB, prober=prober or _DEFAULT_PROBER, **kwargs
|
||||
)
|
||||
|
||||
|
||||
def resolve_episode_destination(*args, prober=None, **kwargs):
|
||||
return _resolve_episode_destination(
|
||||
*args, kb=_KB, prober=prober or _DEFAULT_PROBER, **kwargs
|
||||
)
|
||||
|
||||
|
||||
def resolve_movie_destination(*args, prober=None, **kwargs):
|
||||
return _resolve_movie_destination(
|
||||
*args, kb=_KB, prober=prober or _DEFAULT_PROBER, **kwargs
|
||||
)
|
||||
|
||||
|
||||
def resolve_series_destination(*args, prober=None, **kwargs):
|
||||
return _resolve_series_destination(
|
||||
*args, kb=_KB, prober=prober or _DEFAULT_PROBER, **kwargs
|
||||
)
|
||||
|
||||
REL_EPISODE = "Oz.S01E01.1080p.WEBRip.x265-KONTRAST"
|
||||
REL_SEASON = "Oz.S03.1080p.WEBRip.x265-KONTRAST"
|
||||
REL_MOVIE = "Inception.2010.1080p.BluRay.x265-GROUP"
|
||||
@@ -365,46 +405,40 @@ class TestProbeEnrichmentWiring:
|
||||
should pick up ffprobe data via inspect_release and let the enriched
|
||||
tech_string land in the destination name."""
|
||||
|
||||
def test_movie_picks_up_probe_quality(
|
||||
self, cfg_memory, tmp_path, monkeypatch
|
||||
):
|
||||
from alfred.application.filesystem import resolve_destination as rd
|
||||
|
||||
monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info()))
|
||||
def test_movie_picks_up_probe_quality(self, cfg_memory, tmp_path):
|
||||
# Release name parses to "movie" but is missing the quality token;
|
||||
# probe must supply 1080p and refresh tech_string.
|
||||
bare_name = "Inception.2010.BluRay.x264-GROUP"
|
||||
video = tmp_path / "movie.mkv"
|
||||
video.write_bytes(b"")
|
||||
|
||||
out = resolve_movie_destination(bare_name, str(video), "Inception", 2010)
|
||||
out = resolve_movie_destination(
|
||||
bare_name,
|
||||
str(video),
|
||||
"Inception",
|
||||
2010,
|
||||
prober=_StubProber(_stereo_movie_info()),
|
||||
)
|
||||
|
||||
assert out.status == "ok"
|
||||
# tech_string -> "1080p.BluRay.x264" -> "1080p" shows up in names.
|
||||
assert "1080p" in out.movie_folder_name
|
||||
assert "1080p" in out.filename
|
||||
|
||||
def test_movie_skips_probe_when_path_missing(self, cfg_memory, monkeypatch):
|
||||
def test_movie_skips_probe_when_path_missing(self, cfg_memory):
|
||||
# If the file doesn't exist, no probe runs (the stub would have
|
||||
# injected 1080p — its absence proves the skip).
|
||||
from alfred.application.filesystem import resolve_destination as rd
|
||||
|
||||
monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info()))
|
||||
out = resolve_movie_destination(
|
||||
"Inception.2010.BluRay.x264-GROUP",
|
||||
"/nowhere/m.mkv",
|
||||
"Inception",
|
||||
2010,
|
||||
prober=_StubProber(_stereo_movie_info()),
|
||||
)
|
||||
assert out.status == "ok"
|
||||
assert "1080p" not in out.movie_folder_name
|
||||
|
||||
def test_season_picks_up_probe_via_source_path(
|
||||
self, cfg_memory, tmp_path, monkeypatch
|
||||
):
|
||||
from alfred.application.filesystem import resolve_destination as rd
|
||||
|
||||
monkeypatch.setattr(rd, "_PROBER", _StubProber(_stereo_movie_info()))
|
||||
def test_season_picks_up_probe_via_source_path(self, cfg_memory, tmp_path):
|
||||
# Season pack name missing quality token; probe must add it.
|
||||
bare_name = "Oz.S03.BluRay.x265-KONTRAST"
|
||||
release_dir = tmp_path / bare_name
|
||||
@@ -412,7 +446,11 @@ class TestProbeEnrichmentWiring:
|
||||
(release_dir / "episode.mkv").write_bytes(b"")
|
||||
|
||||
out = resolve_season_destination(
|
||||
bare_name, "Oz", 1997, source_path=str(release_dir)
|
||||
bare_name,
|
||||
"Oz",
|
||||
1997,
|
||||
source_path=str(release_dir),
|
||||
prober=_StubProber(_stereo_movie_info()),
|
||||
)
|
||||
|
||||
assert out.status == "ok"
|
||||
|
||||
@@ -21,7 +21,7 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import SubtitleScanResult
|
||||
from alfred.application.subtitles.placer import (
|
||||
PlacedTrack,
|
||||
PlaceResult,
|
||||
@@ -46,8 +46,8 @@ def _track(
|
||||
fmt=SRT,
|
||||
stype=SubtitleType.STANDARD,
|
||||
is_embedded: bool = False,
|
||||
) -> SubtitleCandidate:
|
||||
return SubtitleCandidate(
|
||||
) -> SubtitleScanResult:
|
||||
return SubtitleScanResult(
|
||||
language=lang,
|
||||
format=fmt,
|
||||
subtitle_type=stype,
|
||||
|
||||
@@ -123,7 +123,6 @@ class TestAssemble:
|
||||
assert fields["source"] == "WEBRip"
|
||||
assert fields["codec"] == "x265"
|
||||
assert fields["group"] == "KONTRAST"
|
||||
assert fields["tech_string"] == "1080p.WEBRip.x265"
|
||||
assert fields["media_type"] == "movie"
|
||||
assert fields["site_tag"] is None
|
||||
|
||||
@@ -150,7 +149,8 @@ class TestAssemble:
|
||||
assert fields["season"] == 2
|
||||
assert fields["episode"] is None # season pack
|
||||
assert fields["source"] is None # ELiTE omits it
|
||||
assert fields["tech_string"] == "1080p.x265"
|
||||
assert fields["quality"] == "1080p"
|
||||
assert fields["codec"] == "x265"
|
||||
assert fields["group"] == "ELiTE"
|
||||
|
||||
|
||||
@@ -198,7 +198,7 @@ class TestEnrichers:
|
||||
assert annotated is not None
|
||||
fields = assemble(annotated, tag, name, _KB)
|
||||
|
||||
assert fields["languages"] == ["FRENCH", "MULTI"]
|
||||
assert fields["languages"] == ("FRENCH", "MULTI")
|
||||
assert fields["audio_codec"] == "DTS-HD.MA"
|
||||
assert fields["audio_channels"] == "5.1"
|
||||
|
||||
@@ -212,5 +212,5 @@ class TestEnrichers:
|
||||
assert fields["title"] == "Show"
|
||||
assert fields["season"] == 1
|
||||
assert fields["episode"] == 5
|
||||
assert fields["languages"] == ["FRENCH"]
|
||||
assert fields["languages"] == ("FRENCH",)
|
||||
assert fields["media_type"] == "tv_show"
|
||||
|
||||
@@ -22,8 +22,8 @@ from alfred.domain.release.services import parse_release
|
||||
from alfred.domain.release.value_objects import (
|
||||
MediaTypeToken,
|
||||
ParsedRelease,
|
||||
ParsePath,
|
||||
ParseReport,
|
||||
TokenizationRoute,
|
||||
)
|
||||
from alfred.domain.shared.exceptions import ValidationError
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
@@ -78,9 +78,8 @@ def _movie(year: int = 2020, **overrides) -> ParsedRelease:
|
||||
source="BluRay",
|
||||
codec="x264",
|
||||
group="GROUP",
|
||||
tech_string="1080p.BluRay.x264",
|
||||
media_type=MediaTypeToken.MOVIE,
|
||||
parse_path=ParsePath.DIRECT,
|
||||
parse_path=TokenizationRoute.DIRECT,
|
||||
)
|
||||
base.update(overrides)
|
||||
return ParsedRelease(**base)
|
||||
@@ -120,9 +119,8 @@ class TestComputeScore:
|
||||
source="WEBRip",
|
||||
codec="x265",
|
||||
group="KONTRAST",
|
||||
tech_string="1080p.WEBRip.x265",
|
||||
media_type=MediaTypeToken.TV_SHOW,
|
||||
parse_path=ParsePath.DIRECT,
|
||||
parse_path=TokenizationRoute.DIRECT,
|
||||
)
|
||||
tokens = [
|
||||
Token("Oz", 0, TokenRole.TITLE),
|
||||
@@ -231,9 +229,8 @@ class TestCollectors:
|
||||
source=None,
|
||||
codec=None,
|
||||
group="UNKNOWN",
|
||||
tech_string="",
|
||||
media_type=MediaTypeToken.UNKNOWN,
|
||||
parse_path=ParsePath.DIRECT,
|
||||
parse_path=TokenizationRoute.DIRECT,
|
||||
)
|
||||
assert set(collect_missing_critical(empty)) == {
|
||||
"title",
|
||||
|
||||
@@ -264,10 +264,10 @@ class TestParsedReleaseInvariants:
|
||||
r = _parse(raw)
|
||||
assert r.raw == raw
|
||||
|
||||
def test_languages_defaults_to_empty_list_not_none(self):
|
||||
def test_languages_defaults_to_empty_tuple_not_none(self):
|
||||
r = _parse("Movie.2020.1080p.BluRay.x264-GRP")
|
||||
# __post_init__ ensures languages is a list, never None
|
||||
assert r.languages == []
|
||||
# ``languages`` defaults to an empty tuple (frozen VO).
|
||||
assert r.languages == ()
|
||||
|
||||
def test_tech_string_joined(self):
|
||||
r = _parse("Movie.2020.1080p.BluRay.x264-GRP")
|
||||
|
||||
@@ -44,8 +44,13 @@ def test_parse_matches_fixture(fixture: ReleaseFixture, tmp_path) -> None:
|
||||
|
||||
parsed, _report = parse_release(fixture.release_name, _KB)
|
||||
result = asdict(parsed)
|
||||
# ``is_season_pack`` is a @property — asdict() does not include it.
|
||||
# ``is_season_pack`` and ``tech_string`` are @property values —
|
||||
# ``asdict()`` does not include them.
|
||||
result["is_season_pack"] = parsed.is_season_pack
|
||||
result["tech_string"] = parsed.tech_string
|
||||
# ``languages`` is a tuple on the VO; fixtures encode it as a YAML list.
|
||||
# Compare list-to-list so the equality is unambiguous.
|
||||
result["languages"] = list(result.get("languages", ()))
|
||||
|
||||
for field, expected in fixture.expected_parsed.items():
|
||||
assert field in result, (
|
||||
|
||||
@@ -23,7 +23,7 @@ from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
from alfred.domain.shared.ports import FileEntry
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import SubtitleScanResult
|
||||
from alfred.domain.subtitles.services.identifier import (
|
||||
SubtitleIdentifier,
|
||||
_count_entries,
|
||||
@@ -310,8 +310,8 @@ class TestSizeDisambiguation:
|
||||
detection=TypeDetectionMethod.SIZE_AND_COUNT,
|
||||
)
|
||||
|
||||
def _track(self, lang_code: str, entries: int) -> SubtitleCandidate:
|
||||
return SubtitleCandidate(
|
||||
def _track(self, lang_code: str, entries: int) -> SubtitleScanResult:
|
||||
return SubtitleScanResult(
|
||||
language=SubtitleLanguage(code=lang_code, tokens=[lang_code]),
|
||||
format=None,
|
||||
subtitle_type=SubtitleType.UNKNOWN,
|
||||
|
||||
@@ -18,7 +18,7 @@ from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import SubtitleScanResult
|
||||
from alfred.domain.subtitles.services.matcher import SubtitleMatcher
|
||||
from alfred.domain.subtitles.value_objects import (
|
||||
SubtitleFormat,
|
||||
@@ -40,8 +40,8 @@ def _track(
|
||||
stype: SubtitleType = SubtitleType.STANDARD,
|
||||
confidence: float = 1.0,
|
||||
is_embedded: bool = False,
|
||||
) -> SubtitleCandidate:
|
||||
return SubtitleCandidate(
|
||||
) -> SubtitleScanResult:
|
||||
return SubtitleScanResult(
|
||||
language=lang,
|
||||
format=fmt,
|
||||
subtitle_type=stype,
|
||||
|
||||
@@ -5,9 +5,9 @@ uncovered:
|
||||
|
||||
- ``TestSubtitleFormat`` — extension matching (case-insensitive).
|
||||
- ``TestSubtitleLanguage`` — token matching (case-insensitive).
|
||||
- ``TestSubtitleCandidateDestName`` — ``destination_name`` property:
|
||||
- ``TestSubtitleScanResultDestName`` — ``destination_name`` property:
|
||||
standard / SDH / forced naming, error on missing language or format.
|
||||
- ``TestSubtitleCandidateRepr`` — debug repr for embedded vs external.
|
||||
- ``TestSubtitleScanResultRepr`` — debug repr for embedded vs external.
|
||||
- ``TestMediaSubtitleMetadata`` — ``all_tracks`` / ``total_count`` /
|
||||
``unresolved_tracks``.
|
||||
- ``TestAvailableSubtitles`` — utility dedup by (lang, type).
|
||||
@@ -24,7 +24,7 @@ from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from alfred.domain.subtitles.aggregates import SubtitleRuleSet
|
||||
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleScanResult
|
||||
from alfred.domain.subtitles.services.utils import available_subtitles
|
||||
from alfred.domain.subtitles.value_objects import (
|
||||
RuleScope,
|
||||
@@ -74,7 +74,7 @@ class TestSubtitleLanguage:
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# SubtitleCandidate #
|
||||
# SubtitleScanResult #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
@@ -82,50 +82,50 @@ SRT = SubtitleFormat(id="srt", extensions=[".srt"])
|
||||
FRA = SubtitleLanguage(code="fra", tokens=["fr", "fre"])
|
||||
|
||||
|
||||
class TestSubtitleCandidateDestName:
|
||||
class TestSubtitleScanResultDestName:
|
||||
def test_standard(self):
|
||||
t = SubtitleCandidate(
|
||||
t = SubtitleScanResult(
|
||||
language=FRA, format=SRT, subtitle_type=SubtitleType.STANDARD
|
||||
)
|
||||
assert t.destination_name == "fra.srt"
|
||||
|
||||
def test_sdh(self):
|
||||
t = SubtitleCandidate(language=FRA, format=SRT, subtitle_type=SubtitleType.SDH)
|
||||
t = SubtitleScanResult(language=FRA, format=SRT, subtitle_type=SubtitleType.SDH)
|
||||
assert t.destination_name == "fra.sdh.srt"
|
||||
|
||||
def test_forced(self):
|
||||
t = SubtitleCandidate(
|
||||
t = SubtitleScanResult(
|
||||
language=FRA, format=SRT, subtitle_type=SubtitleType.FORCED
|
||||
)
|
||||
assert t.destination_name == "fra.forced.srt"
|
||||
|
||||
def test_unknown_treated_as_standard(self):
|
||||
t = SubtitleCandidate(
|
||||
t = SubtitleScanResult(
|
||||
language=FRA, format=SRT, subtitle_type=SubtitleType.UNKNOWN
|
||||
)
|
||||
# UNKNOWN doesn't add a suffix → same as standard.
|
||||
assert t.destination_name == "fra.srt"
|
||||
|
||||
def test_missing_language_raises(self):
|
||||
t = SubtitleCandidate(language=None, format=SRT)
|
||||
t = SubtitleScanResult(language=None, format=SRT)
|
||||
with pytest.raises(ValueError, match="language or format missing"):
|
||||
t.destination_name
|
||||
|
||||
def test_missing_format_raises(self):
|
||||
t = SubtitleCandidate(language=FRA, format=None)
|
||||
t = SubtitleScanResult(language=FRA, format=None)
|
||||
with pytest.raises(ValueError, match="language or format missing"):
|
||||
t.destination_name
|
||||
|
||||
def test_extension_dot_stripped(self):
|
||||
# Format extension is ".srt" — leading dot must not be duplicated.
|
||||
t = SubtitleCandidate(language=FRA, format=SRT)
|
||||
t = SubtitleScanResult(language=FRA, format=SRT)
|
||||
assert t.destination_name.endswith(".srt")
|
||||
assert ".." not in t.destination_name
|
||||
|
||||
|
||||
class TestSubtitleCandidateRepr:
|
||||
class TestSubtitleScanResultRepr:
|
||||
def test_embedded_repr(self):
|
||||
t = SubtitleCandidate(
|
||||
t = SubtitleScanResult(
|
||||
language=FRA, format=None, is_embedded=True, confidence=1.0
|
||||
)
|
||||
r = repr(t)
|
||||
@@ -135,14 +135,14 @@ class TestSubtitleCandidateRepr:
|
||||
def test_external_repr_uses_filename(self, tmp_path):
|
||||
f = tmp_path / "fr.srt"
|
||||
f.write_text("")
|
||||
t = SubtitleCandidate(language=FRA, format=SRT, file_path=f, confidence=0.85)
|
||||
t = SubtitleScanResult(language=FRA, format=SRT, file_path=f, confidence=0.85)
|
||||
r = repr(t)
|
||||
assert "fra" in r
|
||||
assert "fr.srt" in r
|
||||
assert "0.85" in r
|
||||
|
||||
def test_unresolved_repr(self):
|
||||
t = SubtitleCandidate(language=None, format=None)
|
||||
t = SubtitleScanResult(language=None, format=None)
|
||||
r = repr(t)
|
||||
assert "?" in r
|
||||
|
||||
@@ -160,8 +160,8 @@ class TestMediaSubtitleMetadata:
|
||||
assert m.unresolved_tracks == []
|
||||
|
||||
def test_aggregates_embedded_and_external(self):
|
||||
e = SubtitleCandidate(language=FRA, format=None, is_embedded=True)
|
||||
x = SubtitleCandidate(language=FRA, format=SRT, file_path=Path("/x.srt"))
|
||||
e = SubtitleScanResult(language=FRA, format=None, is_embedded=True)
|
||||
x = SubtitleScanResult(language=FRA, format=SRT, file_path=Path("/x.srt"))
|
||||
m = MediaSubtitleMetadata(
|
||||
media_id=None,
|
||||
media_type="movie",
|
||||
@@ -174,13 +174,13 @@ class TestMediaSubtitleMetadata:
|
||||
def test_unresolved_tracks_only_external_with_none_lang(self):
|
||||
# An embedded with None language must NOT appear in unresolved_tracks
|
||||
# (the property only iterates external_tracks).
|
||||
embedded_unknown = SubtitleCandidate(
|
||||
embedded_unknown = SubtitleScanResult(
|
||||
language=None, format=None, is_embedded=True
|
||||
)
|
||||
external_known = SubtitleCandidate(
|
||||
external_known = SubtitleScanResult(
|
||||
language=FRA, format=SRT, file_path=Path("/a.srt")
|
||||
)
|
||||
external_unknown = SubtitleCandidate(
|
||||
external_unknown = SubtitleScanResult(
|
||||
language=None, format=SRT, file_path=Path("/b.srt")
|
||||
)
|
||||
m = MediaSubtitleMetadata(
|
||||
@@ -201,14 +201,14 @@ class TestAvailableSubtitles:
|
||||
def test_dedup_by_lang_and_type(self):
|
||||
ENG = SubtitleLanguage(code="eng", tokens=["en"])
|
||||
tracks = [
|
||||
SubtitleCandidate(
|
||||
SubtitleScanResult(
|
||||
language=FRA, format=SRT, subtitle_type=SubtitleType.STANDARD
|
||||
),
|
||||
SubtitleCandidate(
|
||||
SubtitleScanResult(
|
||||
language=FRA, format=SRT, subtitle_type=SubtitleType.STANDARD
|
||||
),
|
||||
SubtitleCandidate(language=FRA, format=SRT, subtitle_type=SubtitleType.SDH),
|
||||
SubtitleCandidate(
|
||||
SubtitleScanResult(language=FRA, format=SRT, subtitle_type=SubtitleType.SDH),
|
||||
SubtitleScanResult(
|
||||
language=ENG, format=SRT, subtitle_type=SubtitleType.STANDARD
|
||||
),
|
||||
]
|
||||
@@ -222,10 +222,10 @@ class TestAvailableSubtitles:
|
||||
|
||||
def test_none_language_treated_as_key(self):
|
||||
# Tracks with no language form a single None-keyed bucket.
|
||||
t1 = SubtitleCandidate(
|
||||
t1 = SubtitleScanResult(
|
||||
language=None, format=SRT, subtitle_type=SubtitleType.UNKNOWN
|
||||
)
|
||||
t2 = SubtitleCandidate(
|
||||
t2 = SubtitleScanResult(
|
||||
language=None, format=SRT, subtitle_type=SubtitleType.UNKNOWN
|
||||
)
|
||||
result = available_subtitles([t1, t2])
|
||||
|
||||
@@ -16,7 +16,7 @@ from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.entities import SubtitleScanResult
|
||||
from alfred.application.subtitles.placer import PlacedTrack
|
||||
from alfred.domain.subtitles.value_objects import (
|
||||
SubtitleFormat,
|
||||
@@ -32,8 +32,8 @@ ENG = SubtitleLanguage(code="eng", tokens=["en"])
|
||||
|
||||
def _track(
|
||||
lang=FRA, *, embedded: bool = False, confidence: float = 0.92
|
||||
) -> SubtitleCandidate:
|
||||
return SubtitleCandidate(
|
||||
) -> SubtitleScanResult:
|
||||
return SubtitleScanResult(
|
||||
language=lang,
|
||||
format=SRT,
|
||||
subtitle_type=SubtitleType.STANDARD,
|
||||
|
||||
Reference in New Issue
Block a user