Merge branch 'refactor/domain-release-knowledge'

Final DDD purification of the release parser. Domain layer no longer
imports anything from infrastructure, no YAML at import time, and
ParsedRelease's filesystem-builders are pure (Option B).

- ReleaseKnowledge Protocol port + YamlReleaseKnowledge adapter
- parse_release(name, kb) explicit injection
- ParsedRelease.title_sanitized field; builders accept already-safe strings
- Callers (resolve_destination, detect_media_type, find_video,
  analyze_release) thread the kb through
- 987 tests pass
This commit is contained in:
2026-05-19 22:05:36 +02:00
16 changed files with 410 additions and 223 deletions
+41
View File
@@ -184,6 +184,47 @@ callers).
globally — noisy on parser mappers and orchestrator use-cases where early-return
validation is essential complexity. Ignore `PLW0603` for the documented memory
singleton (`infrastructure/persistence/context.py`).
- **Release-knowledge DDD purification** (`refactor/domain-release-knowledge`):
the last domain → infrastructure leak (`domain/release/value_objects.py`
loading YAML at import-time) is gone. Achieved via:
- **`ReleaseKnowledge` Protocol port** at
`alfred/domain/release/ports/knowledge.py` declares the read-only query
surface release parsing needs (token sets for resolutions, sources, codecs,
languages, hdr extras; structured dicts for audio, video_meta, editions,
media_type_tokens; separators list; file-extension sets used by
application/infra callers; `sanitize_for_fs(text)` method).
- **`YamlReleaseKnowledge` adapter** at
`alfred/infrastructure/knowledge/release_kb.py` loads every YAML constant
once at construction. Builds an immutable `str.maketrans` translation
table for filesystem sanitization.
- **`parse_release(name, kb)`** takes the knowledge as an explicit
parameter — no more module-level YAML loading inside the domain. Every
internal helper (`_tokenize`, `_extract_tech`, `_extract_languages`,
`_extract_audio`, `_extract_video_meta`, `_extract_edition`,
`_extract_title`, `_infer_media_type`, `_is_well_formed`) takes `kb`.
- **`ParsedRelease` Option B**: sanitization happens once at parse time
and is stored on a new `title_sanitized: str` field. Builder methods
(`show_folder_name`, `season_folder_name`, `episode_filename`,
`movie_folder_name`, `movie_filename`) are now pure — they accept
already-sanitized `tmdb_title_safe` / `tmdb_episode_title_safe`
arguments. Callers at the use-case boundary sanitize TMDB strings
via `kb.sanitize_for_fs(...)` before passing them in.
- **All domain-knowledge constants removed from `value_objects.py`**:
`_RESOLUTIONS`, `_SOURCES`, `_CODECS`, `_AUDIO`, `_VIDEO_META`,
`_EDITIONS`, `_HDR_EXTRA`, `_MEDIA_TYPE_TOKENS`, `_LANGUAGE_TOKENS`,
`_FORBIDDEN_CHARS`, `_VIDEO_EXTENSIONS`, `_NON_VIDEO_EXTENSIONS`,
`_SUBTITLE_EXTENSIONS`, `_METADATA_EXTENSIONS`, `_WIN_FORBIDDEN_TABLE`,
and the `_sanitize_for_fs` helper. The domain module is now pure.
- **Application-layer KB singleton**: `resolve_destination.py` instantiates
a module-level `_KB: ReleaseKnowledge = YamlReleaseKnowledge()` and
threads it through every `parse_release(...)` call. The local
`_sanitize` helper and `_WIN_FORBIDDEN` regex were dropped in favor of
`_KB.sanitize_for_fs(...)`.
- **`detect_media_type(parsed, source_path, kb)` and
`find_video_file(path, kb)`** now take the knowledge explicitly
instead of importing `_*_EXTENSIONS` constants from the domain.
`agent/tools/filesystem.py::analyze_release` imports the application
KB singleton and passes it through.
---
+4 -3
View File
@@ -190,15 +190,16 @@ def set_path_for_folder(folder_name: str, path_value: str) -> dict[str, Any]:
def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/analyze_release.yaml."""
from alfred.application.filesystem.resolve_destination import _KB # noqa: PLC0415
from alfred.domain.release.services import parse_release # noqa: PLC0415
path = Path(source_path)
parsed = parse_release(release_name)
parsed.media_type = detect_media_type(parsed, path)
parsed = parse_release(release_name, _KB)
parsed.media_type = detect_media_type(parsed, path, _KB)
probe_used = False
if parsed.media_type not in ("unknown", "other"):
video_file = find_video_file(path)
video_file = find_video_file(path, _KB)
if video_file:
media_info = probe(video_file)
if media_info:
@@ -19,15 +19,13 @@ from __future__ import annotations
from pathlib import Path
from alfred.domain.release.value_objects import (
_METADATA_EXTENSIONS,
_NON_VIDEO_EXTENSIONS,
_VIDEO_EXTENSIONS,
ParsedRelease,
)
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.release.value_objects import ParsedRelease
def detect_media_type(parsed: ParsedRelease, source_path: Path) -> str:
def detect_media_type(
parsed: ParsedRelease, source_path: Path, kb: ReleaseKnowledge
) -> str:
"""
Return a refined media_type string for the given source_path.
@@ -37,10 +35,10 @@ def detect_media_type(parsed: ParsedRelease, source_path: Path) -> str:
extensions = _collect_extensions(source_path)
# Metadata extensions (.nfo, .srt, …) are always present alongside releases
# and must not influence the type decision.
conclusive = extensions - _METADATA_EXTENSIONS
conclusive = extensions - kb.metadata_extensions
has_video = bool(conclusive & _VIDEO_EXTENSIONS)
has_non_video = bool(conclusive & _NON_VIDEO_EXTENSIONS)
has_video = bool(conclusive & kb.video_extensions)
has_non_video = bool(conclusive & kb.non_video_extensions)
if has_video and has_non_video:
return "unknown"
@@ -8,34 +8,39 @@ Four distinct use cases, one per release type:
- resolve_series_destination : complete series multi-season pack (folder move)
Each returns a dedicated DTO with only the fields that make sense for that type.
These use cases follow Option B of the snapshot-VO design: ``ParsedRelease``
arrives with ``title_sanitized`` already computed, and TMDB-supplied strings
are sanitized **at the use-case boundary** (here) before being passed into
``ParsedRelease`` builder methods. The builders themselves perform no I/O and
no sanitization.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from alfred.domain.release import parse_release
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.persistence import get_memory
logger = logging.getLogger(__name__)
_WIN_FORBIDDEN = re.compile(r'[?:*"<>|\\]')
def _sanitize(text: str) -> str:
return _WIN_FORBIDDEN.sub("", text)
# Single module-level knowledge instance. YAML is loaded once at first import.
# Tests that need a custom KB can monkeypatch this attribute.
_KB: ReleaseKnowledge = YamlReleaseKnowledge()
def _find_existing_tvshow_folders(
tv_root: Path, tmdb_title: str, tmdb_year: int
tv_root: Path, tmdb_title_safe: str, tmdb_year: int
) -> list[str]:
"""Return folder names in tv_root that match title + year prefix."""
if not tv_root.exists():
return []
clean_title = _sanitize(tmdb_title).replace(" ", ".")
clean_title = tmdb_title_safe.replace(" ", ".")
prefix = f"{clean_title}.{tmdb_year}".lower()
return sorted(
entry.name
@@ -66,6 +71,7 @@ class _Clarification:
def _resolve_series_folder(
tv_root: Path,
tmdb_title: str,
tmdb_title_safe: str,
tmdb_year: int,
computed_name: str,
confirmed_folder: str | None,
@@ -80,7 +86,7 @@ def _resolve_series_folder(
if confirmed_folder:
return confirmed_folder, not (tv_root / confirmed_folder).exists()
existing = _find_existing_tvshow_folders(tv_root, tmdb_title, tmdb_year)
existing = _find_existing_tvshow_folders(tv_root, tmdb_title_safe, tmdb_year)
if not existing:
return computed_name, True
@@ -246,11 +252,12 @@ def resolve_season_destination(
message="TV show library path is not configured.",
)
parsed = parse_release(release_name)
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year))
parsed = parse_release(release_name, _KB)
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder
tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
)
if isinstance(resolved, _Clarification):
return ResolvedSeasonDestination(
@@ -295,12 +302,16 @@ def resolve_episode_destination(
message="TV show library path is not configured.",
)
parsed = parse_release(release_name)
parsed = parse_release(release_name, _KB)
ext = Path(source_file).suffix
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year))
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
tmdb_episode_title_safe = (
_KB.sanitize_for_fs(tmdb_episode_title) if tmdb_episode_title else None
)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder
tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
)
if isinstance(resolved, _Clarification):
return ResolvedEpisodeDestination(
@@ -311,7 +322,7 @@ def resolve_episode_destination(
series_folder_name, is_new = resolved
season_folder_name = parsed.season_folder_name()
filename = _sanitize(parsed.episode_filename(tmdb_episode_title, ext))
filename = parsed.episode_filename(tmdb_episode_title_safe, ext)
series_path = tv_root / series_folder_name
season_path = series_path / season_folder_name
@@ -349,11 +360,12 @@ def resolve_movie_destination(
message="Movie library path is not configured.",
)
parsed = parse_release(release_name)
parsed = parse_release(release_name, _KB)
ext = Path(source_file).suffix
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
folder_name = _sanitize(parsed.movie_folder_name(tmdb_title, tmdb_year))
filename = _sanitize(parsed.movie_filename(tmdb_title, tmdb_year, ext))
folder_name = parsed.movie_folder_name(tmdb_title_safe, tmdb_year)
filename = parsed.movie_filename(tmdb_title_safe, tmdb_year, ext)
folder_path = Path(movies_root) / folder_name
file_path = folder_path / filename
@@ -387,11 +399,12 @@ def resolve_series_destination(
message="TV show library path is not configured.",
)
parsed = parse_release(release_name)
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year))
parsed = parse_release(release_name, _KB)
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder
tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
)
if isinstance(resolved, _Clarification):
return ResolvedSeriesDestination(
+10
View File
@@ -0,0 +1,10 @@
"""Domain ports for the release domain.
Protocol-based abstractions that decouple ``parse_release`` and
``ParsedRelease`` from any concrete knowledge-base loader. The
infrastructure layer provides the adapter that satisfies this contract.
"""
from .knowledge import ReleaseKnowledge
__all__ = ["ReleaseKnowledge"]
+52
View File
@@ -0,0 +1,52 @@
"""ReleaseKnowledge port — the read-only query surface that
``parse_release`` and ``ParsedRelease`` need from the release knowledge
base, expressed as a structural Protocol so the domain never imports any
concrete loader.
The concrete YAML-backed implementation lives in
``alfred/infrastructure/knowledge/release_kb.py``. Tests can supply any
object that satisfies this shape (e.g. a simple dataclass).
"""
from __future__ import annotations
from typing import Protocol
class ReleaseKnowledge(Protocol):
"""Read-only snapshot of release-name parsing knowledge."""
# --- Token sets used by the tokenizer / matchers ---
resolutions: set[str]
sources: set[str]
codecs: set[str]
language_tokens: set[str]
forbidden_chars: set[str]
hdr_extra: set[str]
# --- Structured knowledge (loaded from YAML as dicts) ---
audio: dict
video_meta: dict
editions: dict
media_type_tokens: dict
# --- Tokenizer separators ---
separators: list[str]
# --- File-extension sets (used by application/infra modules that work
# directly with filesystem paths, e.g. media-type detection, video
# lookup). Domain parsing itself doesn't touch these. ---
video_extensions: set[str]
non_video_extensions: set[str]
subtitle_extensions: set[str]
metadata_extensions: set[str]
# --- Filesystem sanitization (Option B: pre-sanitize at parse time) ---
def sanitize_for_fs(self, text: str) -> str:
"""Strip filesystem-forbidden characters from ``text``."""
...
+53 -55
View File
@@ -4,31 +4,17 @@ from __future__ import annotations
import re
from alfred.infrastructure.knowledge.release import load_separators
from .value_objects import (
_AUDIO,
_CODECS,
_EDITIONS,
_FORBIDDEN_CHARS,
_HDR_EXTRA,
_LANGUAGE_TOKENS,
_MEDIA_TYPE_TOKENS,
_RESOLUTIONS,
_SOURCES,
_VIDEO_META,
MediaTypeToken,
ParsedRelease,
ParsePath,
)
from .ports import ReleaseKnowledge
from .value_objects import MediaTypeToken, ParsedRelease, ParsePath
def _tokenize(name: str) -> list[str]:
def _tokenize(name: str, kb: ReleaseKnowledge) -> list[str]:
"""Split a release name on the configured separators, dropping empty tokens."""
pattern = "[" + re.escape("".join(load_separators())) + "]+"
pattern = "[" + re.escape("".join(kb.separators)) + "]+"
return [t for t in re.split(pattern, name) if t]
def parse_release(name: str) -> ParsedRelease:
def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
"""
Parse a release name and return a ParsedRelease.
@@ -48,11 +34,12 @@ def parse_release(name: str) -> ParsedRelease:
if site_tag is not None:
parse_path = ParsePath.SANITIZED.value
if not _is_well_formed(clean):
if not _is_well_formed(clean, kb):
return ParsedRelease(
raw=name,
normalised=clean,
title=clean,
title_sanitized=kb.sanitize_for_fs(clean),
year=None,
season=None,
episode=None,
@@ -68,21 +55,22 @@ def parse_release(name: str) -> ParsedRelease:
)
name = clean
tokens = _tokenize(name)
tokens = _tokenize(name, kb)
season, episode, episode_end = _extract_season_episode(tokens)
quality, source, codec, group, tech_tokens = _extract_tech(tokens)
languages, lang_tokens = _extract_languages(tokens)
audio_codec, audio_channels, audio_tokens = _extract_audio(tokens)
bit_depth, hdr_format, video_tokens = _extract_video_meta(tokens)
edition, edition_tokens = _extract_edition(tokens)
quality, source, codec, group, tech_tokens = _extract_tech(tokens, kb)
languages, lang_tokens = _extract_languages(tokens, kb)
audio_codec, audio_channels, audio_tokens = _extract_audio(tokens, kb)
bit_depth, hdr_format, video_tokens = _extract_video_meta(tokens, kb)
edition, edition_tokens = _extract_edition(tokens, kb)
title = _extract_title(
tokens,
tech_tokens | lang_tokens | audio_tokens | video_tokens | edition_tokens,
kb,
)
year = _extract_year(tokens, title)
media_type = _infer_media_type(
season, quality, source, codec, year, edition, tokens
season, quality, source, codec, year, edition, tokens, kb
)
tech_parts = [p for p in [quality, source, codec] if p]
@@ -92,6 +80,7 @@ def parse_release(name: str) -> ParsedRelease:
raw=name,
normalised=name,
title=title,
title_sanitized=kb.sanitize_for_fs(title),
year=year,
season=season,
episode=episode,
@@ -121,6 +110,7 @@ def _infer_media_type(
year: int | None,
edition: str | None,
tokens: list[str],
kb: ReleaseKnowledge,
) -> str:
"""
Infer media_type from token-level evidence only (no filesystem access).
@@ -134,9 +124,9 @@ def _infer_media_type(
"""
upper_tokens = {t.upper() for t in tokens}
doc_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("doc", [])}
concert_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("concert", [])}
integrale_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("integrale", [])}
doc_tokens = {t.upper() for t in kb.media_type_tokens.get("doc", [])}
concert_tokens = {t.upper() for t in kb.media_type_tokens.get("concert", [])}
integrale_tokens = {t.upper() for t in kb.media_type_tokens.get("integrale", [])}
if upper_tokens & doc_tokens:
return MediaTypeToken.DOCUMENTARY.value
@@ -154,15 +144,15 @@ def _infer_media_type(
return MediaTypeToken.UNKNOWN.value
def _is_well_formed(name: str) -> bool:
def _is_well_formed(name: str, kb: ReleaseKnowledge) -> bool:
"""Return True if name contains no forbidden characters per scene naming rules.
Characters listed as token separators (spaces, brackets, parens, …) are NOT
considered malforming — the tokenizer handles them. Only truly broken chars
like '@', '#', '!', '%' make a name malformed.
"""
tokenizable = set(load_separators())
return not any(c in name for c in _FORBIDDEN_CHARS if c not in tokenizable)
tokenizable = set(kb.separators)
return not any(c in name for c in kb.forbidden_chars if c not in tokenizable)
def _strip_site_tag(name: str) -> tuple[str, str | None]:
@@ -251,6 +241,7 @@ def _extract_season_episode(
def _extract_tech(
tokens: list[str],
kb: ReleaseKnowledge,
) -> tuple[str | None, str | None, str | None, str, set[str]]:
"""
Extract quality, source, codec, group from tokens.
@@ -270,12 +261,12 @@ def _extract_tech(
for tok in tokens:
tl = tok.lower()
if tl in _RESOLUTIONS:
if tl in kb.resolutions:
quality = tok
tech_tokens.add(tok)
continue
if tl in _SOURCES:
if tl in kb.sources:
source = tok
tech_tokens.add(tok)
continue
@@ -283,18 +274,18 @@ def _extract_tech(
if "-" in tok:
parts = tok.rsplit("-", 1)
# codec-GROUP (highest priority for group)
if parts[0].lower() in _CODECS:
if parts[0].lower() in kb.codecs:
codec = parts[0]
group = parts[1] if parts[1] else "UNKNOWN"
tech_tokens.add(tok)
continue
# source with dash: Web-DL, WEB-DL, etc.
if parts[0].lower() in _SOURCES or tok.lower().replace("-", "") in _SOURCES:
if parts[0].lower() in kb.sources or tok.lower().replace("-", "") in kb.sources:
source = tok
tech_tokens.add(tok)
continue
if tl in _CODECS:
if tl in kb.codecs:
codec = tok
tech_tokens.add(tok)
@@ -304,7 +295,7 @@ def _extract_tech(
if "-" in tok:
parts = tok.rsplit("-", 1)
tl = tok.lower()
if tl in _SOURCES or tok.lower().replace("-", "") in _SOURCES:
if tl in kb.sources or tok.lower().replace("-", "") in kb.sources:
continue
if parts[1]:
group = parts[1]
@@ -318,17 +309,20 @@ def _is_year_token(tok: str) -> bool:
return len(tok) == 4 and tok.isdigit() and 1900 <= int(tok) <= 2099
def _extract_title(tokens: list[str], tech_tokens: set[str]) -> str:
def _extract_title(
tokens: list[str], tech_tokens: set[str], kb: ReleaseKnowledge
) -> str:
"""Extract the title portion: everything before the first season/year/tech token."""
title_parts = []
known_tech = kb.resolutions | kb.sources | kb.codecs
for tok in tokens:
if _parse_season_episode(tok) is not None:
break
if _is_year_token(tok):
break
if tok in tech_tokens or tok.lower() in _RESOLUTIONS | _SOURCES | _CODECS:
if tok in tech_tokens or tok.lower() in known_tech:
break
if "-" in tok and any(p.lower() in _CODECS | _SOURCES for p in tok.split("-")):
if "-" in tok and any(p.lower() in kb.codecs | kb.sources for p in tok.split("-")):
break
title_parts.append(tok)
@@ -376,12 +370,14 @@ def _match_sequences(
# ---------------------------------------------------------------------------
def _extract_languages(tokens: list[str]) -> tuple[list[str], set[str]]:
def _extract_languages(
tokens: list[str], kb: ReleaseKnowledge
) -> tuple[list[str], set[str]]:
"""Extract language tokens. Returns (languages, matched_token_set)."""
languages = []
lang_tokens: set[str] = set()
for tok in tokens:
if tok.upper() in _LANGUAGE_TOKENS:
if tok.upper() in kb.language_tokens:
languages.append(tok.upper())
lang_tokens.add(tok)
return languages, lang_tokens
@@ -393,7 +389,7 @@ def _extract_languages(tokens: list[str]) -> tuple[list[str], set[str]]:
def _extract_audio(
tokens: list[str],
tokens: list[str], kb: ReleaseKnowledge,
) -> tuple[str | None, str | None, set[str]]:
"""
Extract audio codec and channel layout.
@@ -405,12 +401,12 @@ def _extract_audio(
audio_channels: str | None = None
audio_tokens: set[str] = set()
known_codecs = {c.upper() for c in _AUDIO.get("codecs", [])}
known_channels = set(_AUDIO.get("channels", []))
known_codecs = {c.upper() for c in kb.audio.get("codecs", [])}
known_channels = set(kb.audio.get("channels", []))
# Try multi-token sequences first
matched_codec, matched_set = _match_sequences(
tokens, _AUDIO.get("sequences", []), "codec"
tokens, kb.audio.get("sequences", []), "codec"
)
if matched_codec:
audio_codec = matched_codec
@@ -446,7 +442,7 @@ def _extract_audio(
def _extract_video_meta(
tokens: list[str],
tokens: list[str], kb: ReleaseKnowledge,
) -> tuple[str | None, str | None, set[str]]:
"""
Extract bit depth and HDR format.
@@ -457,12 +453,12 @@ def _extract_video_meta(
hdr_format: str | None = None
video_tokens: set[str] = set()
known_hdr = {h.upper() for h in _VIDEO_META.get("hdr", [])} | _HDR_EXTRA
known_depth = {d.lower() for d in _VIDEO_META.get("bit_depth", [])}
known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra
known_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])}
# Try HDR sequences first
matched_hdr, matched_set = _match_sequences(
tokens, _VIDEO_META.get("sequences", []), "hdr"
tokens, kb.video_meta.get("sequences", []), "hdr"
)
if matched_hdr:
hdr_format = matched_hdr
@@ -486,17 +482,19 @@ def _extract_video_meta(
# ---------------------------------------------------------------------------
def _extract_edition(tokens: list[str]) -> tuple[str | None, set[str]]:
def _extract_edition(
tokens: list[str], kb: ReleaseKnowledge
) -> tuple[str | None, set[str]]:
"""
Extract release edition (UNRATED, EXTENDED, DIRECTORS.CUT, …).
Returns (edition, matched_token_set).
"""
known_tokens = {t.upper() for t in _EDITIONS.get("tokens", [])}
known_tokens = {t.upper() for t in kb.editions.get("tokens", [])}
# Try multi-token sequences first
matched_edition, matched_set = _match_sequences(
tokens, _EDITIONS.get("sequences", []), "edition"
tokens, kb.editions.get("sequences", []), "edition"
)
if matched_edition:
return matched_edition, matched_set
+38 -57
View File
@@ -1,4 +1,17 @@
"""Release domain — value objects and token sets."""
"""Release domain — value objects.
This module is **pure**: no I/O, no YAML loading, no knowledge-base
imports. All knowledge that the parser consumes is injected at runtime
via the ``ReleaseKnowledge`` port (see ``ports/knowledge.py``).
``ParsedRelease`` follows Option B of the snapshot-VO design: filesystem
sanitization is performed once at parse time and stored in
``title_sanitized``. The builder methods (``show_folder_name``,
``episode_filename``, etc.) are therefore pure string-formatting and do
**not** need access to any knowledge base — but they require the caller
to pass already-sanitized TMDB strings. The use case is responsible for
calling ``kb.sanitize_for_fs(tmdb_title)`` before invoking the builders.
"""
from __future__ import annotations
@@ -6,50 +19,6 @@ from dataclasses import dataclass, field
from enum import Enum
from ..shared.exceptions import ValidationError
from alfred.infrastructure.knowledge.release import (
load_audio,
load_codecs,
load_editions,
load_forbidden_chars,
load_hdr_extra,
load_language_tokens,
load_media_type_tokens,
load_metadata_extensions,
load_non_video_extensions,
load_resolutions,
load_sources,
load_sources_extra,
load_subtitle_extensions,
load_video,
load_video_extensions,
load_win_forbidden_chars,
)
# Token sets — loaded once at import time from alfred/knowledge/release/
_RESOLUTIONS: set[str] = load_resolutions()
_SOURCES: set[str] = load_sources() | load_sources_extra()
_CODECS: set[str] = load_codecs()
_VIDEO_EXTENSIONS: set[str] = load_video_extensions()
_NON_VIDEO_EXTENSIONS: set[str] = load_non_video_extensions()
_SUBTITLE_EXTENSIONS: set[str] = load_subtitle_extensions()
# Both metadata and subtitle extensions are ignored when deciding the media
# type of a folder — neither is a conclusive signal for movie/tv/other.
_METADATA_EXTENSIONS: set[str] = load_metadata_extensions() | _SUBTITLE_EXTENSIONS
_FORBIDDEN_CHARS: set[str] = load_forbidden_chars()
_LANGUAGE_TOKENS: set[str] = load_language_tokens()
_AUDIO: dict = load_audio()
_VIDEO_META: dict = load_video()
_EDITIONS: dict = load_editions()
_HDR_EXTRA: set[str] = load_hdr_extra()
_MEDIA_TYPE_TOKENS: dict = load_media_type_tokens()
# Translation table for stripping Windows-forbidden characters
_WIN_FORBIDDEN_TABLE = str.maketrans("", "", "".join(load_win_forbidden_chars()))
def _sanitize_for_fs(text: str) -> str:
"""Remove Windows-forbidden characters from a string."""
return text.translate(_WIN_FORBIDDEN_TABLE)
class MediaTypeToken(str, Enum):
@@ -105,11 +74,17 @@ def _strip_episode_from_normalized(normalized: str) -> str:
@dataclass
class ParsedRelease:
"""Structured representation of a parsed release name."""
"""Structured representation of a parsed release name.
``title_sanitized`` carries the filesystem-safe form of ``title`` (computed
by the parser at construction time using the injected knowledge base).
Builder methods rely on it being already-sanitized — see module docstring.
"""
raw: str # original release name (untouched)
normalised: str # dots instead of spaces
title: str # show/movie title (dots, no year/season/tech)
title_sanitized: str # title with filesystem-forbidden chars stripped
year: int | None # movie year or show start year (from TMDB)
season: int | None # season number (None for movies)
episode: int | None # first episode number (None if season-pack)
@@ -180,14 +155,17 @@ class ParsedRelease:
def is_season_pack(self) -> bool:
return self.season is not None and self.episode is None
def show_folder_name(self, tmdb_title: str, tmdb_year: int) -> str:
def show_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str:
"""
Build the series root folder name.
Format: {Title}.{Year}.{Tech}-{Group}
Example: Oz.1997.1080p.WEBRip.x265-KONTRAST
``tmdb_title_safe`` must already be filesystem-safe (the caller is
expected to have run it through ``kb.sanitize_for_fs``).
"""
title_part = _sanitize_for_fs(tmdb_title).replace(" ", ".")
title_part = tmdb_title_safe.replace(" ", ".")
tech = self.tech_string or "Unknown"
return f"{title_part}.{tmdb_year}.{tech}-{self.group}"
@@ -201,42 +179,45 @@ class ParsedRelease:
"""
return _strip_episode_from_normalized(self.normalised)
def episode_filename(self, tmdb_episode_title: str | None, ext: str) -> str:
def episode_filename(self, tmdb_episode_title_safe: str | None, ext: str) -> str:
"""
Build the episode filename.
Format: {Title}.{SxxExx}.{EpisodeTitle}.{Tech}-{Group}.{ext}
Example: Oz.S01E01.The.Routine.1080p.WEBRip.x265-KONTRAST.mkv
If tmdb_episode_title is None, omits the episode title segment.
``tmdb_episode_title_safe`` must already be filesystem-safe; pass
``None`` to omit the episode title segment.
"""
title_part = _sanitize_for_fs(self.title)
title_part = self.title_sanitized
s = f"S{self.season:02d}" if self.season is not None else ""
e = f"E{self.episode:02d}" if self.episode is not None else ""
se = s + e
ep_title = ""
if tmdb_episode_title:
ep_title = "." + _sanitize_for_fs(tmdb_episode_title).replace(" ", ".")
if tmdb_episode_title_safe:
ep_title = "." + tmdb_episode_title_safe.replace(" ", ".")
tech = self.tech_string or "Unknown"
ext_clean = ext.lstrip(".")
return f"{title_part}.{se}{ep_title}.{tech}-{self.group}.{ext_clean}"
def movie_folder_name(self, tmdb_title: str, tmdb_year: int) -> str:
def movie_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str:
"""
Build the movie folder name.
Format: {Title}.{Year}.{Tech}-{Group}
Example: Inception.2010.1080p.BluRay.x265-GROUP
"""
return self.show_folder_name(tmdb_title, tmdb_year)
return self.show_folder_name(tmdb_title_safe, tmdb_year)
def movie_filename(self, tmdb_title: str, tmdb_year: int, ext: str) -> str:
def movie_filename(
self, tmdb_title_safe: str, tmdb_year: int, ext: str
) -> str:
"""
Build the movie filename (same as folder name + extension).
Example: Inception.2010.1080p.BluRay.x265-GROUP.mkv
"""
ext_clean = ext.lstrip(".")
return f"{self.movie_folder_name(tmdb_title, tmdb_year)}.{ext_clean}"
return f"{self.movie_folder_name(tmdb_title_safe, tmdb_year)}.{ext_clean}"
@@ -4,10 +4,10 @@ from __future__ import annotations
from pathlib import Path
from alfred.domain.release.value_objects import _VIDEO_EXTENSIONS
from alfred.domain.release.ports import ReleaseKnowledge
def find_video_file(path: Path) -> Path | None:
def find_video_file(path: Path, kb: ReleaseKnowledge) -> Path | None:
"""
Return the first video file found at path.
@@ -15,11 +15,12 @@ def find_video_file(path: Path) -> Path | None:
- If path is a folder — scan recursively, return the first video found
(sorted by name for determinism, picks S01E01 before S01E02 etc.).
"""
video_exts = kb.video_extensions
if path.is_file():
return path if path.suffix.lower() in _VIDEO_EXTENSIONS else None
return path if path.suffix.lower() in video_exts else None
for candidate in sorted(path.rglob("*")):
if candidate.is_file() and candidate.suffix.lower() in _VIDEO_EXTENSIONS:
if candidate.is_file() and candidate.suffix.lower() in video_exts:
return candidate
return None
@@ -0,0 +1,83 @@
"""YamlReleaseKnowledge — concrete adapter for the ``ReleaseKnowledge``
domain port.
Loads every release-knowledge YAML once at construction time and exposes
the parsed snapshots as plain attributes. The application layer builds a
single instance at boot and passes it down to ``parse_release`` and to
``ParsedRelease`` builder methods.
A few extras (``video_extensions``, ``non_video_extensions``,
``subtitle_extensions``, ``metadata_extensions``) are not part of the
domain port — they are consumed by application/infra modules that handle
filesystem-level concerns.
"""
from __future__ import annotations
from .release import (
load_audio,
load_codecs,
load_editions,
load_forbidden_chars,
load_hdr_extra,
load_language_tokens,
load_media_type_tokens,
load_metadata_extensions,
load_non_video_extensions,
load_resolutions,
load_separators,
load_sources,
load_sources_extra,
load_subtitle_extensions,
load_video,
load_video_extensions,
load_win_forbidden_chars,
)
class YamlReleaseKnowledge:
"""Single object holding every parsed-release knowledge constant.
Built once at application boot. Read-only at runtime — call sites
treat it as a snapshot. To pick up newly learned tokens without a
restart, build a fresh instance and swap it in at the call sites.
"""
def __init__(self) -> None:
# Domain-port surface
self.resolutions: set[str] = load_resolutions()
self.sources: set[str] = load_sources() | load_sources_extra()
self.codecs: set[str] = load_codecs()
self.language_tokens: set[str] = load_language_tokens()
self.forbidden_chars: set[str] = load_forbidden_chars()
self.hdr_extra: set[str] = load_hdr_extra()
self.audio: dict = load_audio()
self.video_meta: dict = load_video()
self.editions: dict = load_editions()
self.media_type_tokens: dict = load_media_type_tokens()
self.separators: list[str] = load_separators()
# File-extension sets (used by application/infra modules, not by
# the parser itself — kept here so there is a single ownership
# point for release knowledge).
self.video_extensions: set[str] = load_video_extensions()
self.non_video_extensions: set[str] = load_non_video_extensions()
self.subtitle_extensions: set[str] = load_subtitle_extensions()
# Metadata + subtitle extensions are both ignored when deciding
# the media type of a folder (neither is a conclusive signal for
# movie/tv/other), so we expose the union under the historical
# name.
self.metadata_extensions: set[str] = (
load_metadata_extensions() | self.subtitle_extensions
)
# Translation table for stripping Windows-forbidden chars.
self._win_forbidden_table = str.maketrans(
"", "", "".join(load_win_forbidden_chars())
)
def sanitize_for_fs(self, text: str) -> str:
"""Strip Windows-forbidden characters from ``text``."""
return text.translate(self._win_forbidden_table)
+22 -19
View File
@@ -20,16 +20,19 @@ import pytest
from alfred.application.filesystem.detect_media_type import detect_media_type
from alfred.domain.release.services import parse_release
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
_KB = YamlReleaseKnowledge()
def _parsed(media_type: str = "movie"):
"""Build a ParsedRelease with the requested media_type via the real parser."""
if media_type == "tv_show":
return parse_release("Show.S01E01.1080p-GRP")
return parse_release("Show.S01E01.1080p-GRP", _KB)
if media_type == "movie":
return parse_release("Movie.2020.1080p-GRP")
return parse_release("Movie.2020.1080p-GRP", _KB)
# "unknown" / other — feed a name the parser can't classify
return parse_release("randomthing")
return parse_release("randomthing", _KB)
# --------------------------------------------------------------------------- #
@@ -41,30 +44,30 @@ class TestFile:
def test_video_file_preserves_parsed_type(self, tmp_path: Path):
f = tmp_path / "x.mkv"
f.write_bytes(b"")
assert detect_media_type(_parsed("movie"), f) == "movie"
assert detect_media_type(_parsed("movie"), f, _KB) == "movie"
def test_video_file_preserves_tv_type(self, tmp_path: Path):
f = tmp_path / "ep.mp4"
f.write_bytes(b"")
assert detect_media_type(_parsed("tv_show"), f) == "tv_show"
assert detect_media_type(_parsed("tv_show"), f, _KB) == "tv_show"
def test_non_video_file_returns_other(self, tmp_path: Path):
f = tmp_path / "x.iso"
f.write_bytes(b"")
assert detect_media_type(_parsed("movie"), f) == "other"
assert detect_media_type(_parsed("movie"), f, _KB) == "other"
@pytest.mark.parametrize("ext", [".rar", ".zip", ".7z", ".exe", ".dmg"])
def test_various_non_video_extensions(self, tmp_path: Path, ext):
f = tmp_path / f"x{ext}"
f.write_bytes(b"")
assert detect_media_type(_parsed("movie"), f) == "other"
assert detect_media_type(_parsed("movie"), f, _KB) == "other"
def test_metadata_only_file_keeps_parsed_type(self, tmp_path: Path):
# Metadata extension is stripped from conclusive set — no video, no
# non-video → falls through to parsed.media_type.
f = tmp_path / "x.nfo"
f.write_bytes(b"")
assert detect_media_type(_parsed("movie"), f) == "movie"
assert detect_media_type(_parsed("movie"), f, _KB) == "movie"
# --------------------------------------------------------------------------- #
@@ -75,27 +78,27 @@ class TestFile:
class TestFolder:
def test_folder_with_video_keeps_parsed_type(self, tmp_path: Path):
(tmp_path / "main.mkv").write_bytes(b"")
assert detect_media_type(_parsed("movie"), tmp_path) == "movie"
assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "movie"
def test_folder_only_non_video_returns_other(self, tmp_path: Path):
(tmp_path / "disc.iso").write_bytes(b"")
(tmp_path / "part.rar").write_bytes(b"")
assert detect_media_type(_parsed("movie"), tmp_path) == "other"
assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "other"
def test_folder_mixed_returns_unknown(self, tmp_path: Path):
(tmp_path / "main.mkv").write_bytes(b"")
(tmp_path / "extras.iso").write_bytes(b"")
assert detect_media_type(_parsed("movie"), tmp_path) == "unknown"
assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "unknown"
def test_empty_folder_keeps_parsed_type(self, tmp_path: Path):
assert detect_media_type(_parsed("tv_show"), tmp_path) == "tv_show"
assert detect_media_type(_parsed("tv_show"), tmp_path, _KB) == "tv_show"
def test_folder_only_metadata_keeps_parsed_type(self, tmp_path: Path):
(tmp_path / "info.nfo").write_bytes(b"")
(tmp_path / "cover.jpg").write_bytes(b"")
(tmp_path / "subs.srt").write_bytes(b"")
# All metadata → conclusive set empty → falls through.
assert detect_media_type(_parsed("movie"), tmp_path) == "movie"
assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "movie"
# --------------------------------------------------------------------------- #
@@ -109,18 +112,18 @@ class TestMetadataIgnored:
(tmp_path / "info.nfo").write_bytes(b"")
(tmp_path / "cover.jpg").write_bytes(b"")
(tmp_path / "subs.srt").write_bytes(b"")
assert detect_media_type(_parsed("movie"), tmp_path) == "movie"
assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "movie"
def test_non_video_plus_metadata_still_other(self, tmp_path: Path):
(tmp_path / "disc.iso").write_bytes(b"")
(tmp_path / "info.nfo").write_bytes(b"")
assert detect_media_type(_parsed("movie"), tmp_path) == "other"
assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "other"
def test_case_insensitive_extensions(self, tmp_path: Path):
# Suffix is lowercased before classification.
f = tmp_path / "X.MKV"
f.write_bytes(b"")
assert detect_media_type(_parsed("movie"), f) == "movie"
assert detect_media_type(_parsed("movie"), f, _KB) == "movie"
# --------------------------------------------------------------------------- #
@@ -132,11 +135,11 @@ class TestMissing:
def test_nonexistent_path_keeps_parsed_type(self, tmp_path: Path):
missing = tmp_path / "does_not_exist.mkv"
# Doesn't exist → empty extension set → falls through.
assert detect_media_type(_parsed("movie"), missing) == "movie"
assert detect_media_type(_parsed("movie"), missing, _KB) == "movie"
def test_nonexistent_folder_keeps_parsed_type(self, tmp_path: Path):
missing = tmp_path / "ghost"
assert detect_media_type(_parsed("tv_show"), missing) == "tv_show"
assert detect_media_type(_parsed("tv_show"), missing, _KB) == "tv_show"
def test_subfolder_not_recursed(self, tmp_path: Path):
# _collect_extensions scans only the first level — files inside
@@ -145,4 +148,4 @@ class TestMissing:
sub.mkdir()
(sub / "deep.mkv").write_bytes(b"")
# Top level has no files at all → empty → falls through to parsed type.
assert detect_media_type(_parsed("movie"), tmp_path) == "movie"
assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "movie"
@@ -37,6 +37,7 @@ def _bare(**overrides) -> ParsedRelease:
raw="X",
normalised="X",
title="X",
title_sanitized="X",
year=None,
season=None,
episode=None,
+6 -15
View File
@@ -9,7 +9,6 @@ Four use cases compute library paths from a release name + TMDB metadata:
Coverage:
- ``TestSanitize`` — Windows-forbidden chars stripped.
- ``TestFindExistingTvshowFolders`` — empty root, prefix match (case + space → dot).
- ``TestResolveSeriesFolderInternal`` — confirmed_folder, no existing, single match,
ambiguous → _Clarification.
@@ -32,7 +31,6 @@ from alfred.application.filesystem.resolve_destination import (
_Clarification,
_find_existing_tvshow_folders,
_resolve_series_folder,
_sanitize,
resolve_episode_destination,
resolve_movie_destination,
resolve_season_destination,
@@ -51,15 +49,6 @@ REL_SERIES = "Oz.Complete.Series.1080p.WEBRip.x265-KONTRAST"
# --------------------------------------------------------------------------- #
class TestSanitize:
def test_passthrough_safe_chars(self):
assert _sanitize("Oz.1997.1080p-GRP") == "Oz.1997.1080p-GRP"
def test_strips_windows_forbidden(self):
# ? : * " < > | \
assert _sanitize('a?b:c*d"e<f>g|h\\i') == "abcdefghi"
# --------------------------------------------------------------------------- #
# _find_existing_tvshow_folders #
# --------------------------------------------------------------------------- #
@@ -107,6 +96,7 @@ class TestResolveSeriesFolderInternal:
out = _resolve_series_folder(
tmp_path,
"Oz",
"Oz",
1997,
"Oz.1997.WEBRip-KONTRAST",
confirmed_folder="Oz.1997.X-GRP",
@@ -117,6 +107,7 @@ class TestResolveSeriesFolderInternal:
out = _resolve_series_folder(
tmp_path,
"Oz",
"Oz",
1997,
"Oz.1997.WEBRip-KONTRAST",
confirmed_folder="Oz.1997.New-X",
@@ -125,21 +116,21 @@ class TestResolveSeriesFolderInternal:
def test_no_existing_returns_computed_as_new(self, tmp_path):
out = _resolve_series_folder(
tmp_path, "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None
tmp_path, "Oz", "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None
)
assert out == ("Oz.1997.WEBRip-KONTRAST", True)
def test_single_existing_matching_computed_returns_existing(self, tmp_path):
(tmp_path / "Oz.1997.WEBRip-KONTRAST").mkdir()
out = _resolve_series_folder(
tmp_path, "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None
tmp_path, "Oz", "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None
)
assert out == ("Oz.1997.WEBRip-KONTRAST", False)
def test_single_existing_different_name_returns_clarification(self, tmp_path):
(tmp_path / "Oz.1997.BluRay-OTHER").mkdir()
out = _resolve_series_folder(
tmp_path, "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None
tmp_path, "Oz", "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None
)
assert isinstance(out, _Clarification)
assert "Oz" in out.question
@@ -149,7 +140,7 @@ class TestResolveSeriesFolderInternal:
def test_multiple_existing_returns_clarification(self, tmp_path):
(tmp_path / "Oz.1997.A-GRP").mkdir()
(tmp_path / "Oz.1997.B-GRP").mkdir()
out = _resolve_series_folder(tmp_path, "Oz", 1997, "Oz.1997.A-GRP", None)
out = _resolve_series_folder(tmp_path, "Oz", "Oz", 1997, "Oz.1997.A-GRP", None)
assert isinstance(out, _Clarification)
# Computed already in existing → not duplicated.
assert out.options.count("Oz.1997.A-GRP") == 1
+39 -30
View File
@@ -20,13 +20,20 @@ import pytest
from alfred.domain.release.services import parse_release
from alfred.domain.release.value_objects import ParsedRelease
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
_KB = YamlReleaseKnowledge()
def _parse(name: str) -> ParsedRelease:
return parse_release(name, _KB)
class TestParseTVEpisode:
"""Single-episode TV releases."""
def test_basic_tv_episode(self):
r = parse_release("Oz.S03E01.1080p.WEBRip.x265-KONTRAST")
r = _parse("Oz.S03E01.1080p.WEBRip.x265-KONTRAST")
assert r.title == "Oz"
assert r.season == 3
assert r.episode == 1
@@ -40,27 +47,27 @@ class TestParseTVEpisode:
assert r.is_season_pack is False
def test_multi_episode(self):
r = parse_release("Archer.S14E09E10.1080p.WEB.x265-GRP")
r = _parse("Archer.S14E09E10.1080p.WEB.x265-GRP")
assert r.season == 14
assert r.episode == 9
assert r.episode_end == 10
def test_nxnn_alt_form(self):
# Alt season/episode form: 1x05 instead of S01E05.
r = parse_release("Some.Show.1x05.720p.HDTV.x264-GRP")
r = _parse("Some.Show.1x05.720p.HDTV.x264-GRP")
assert r.season == 1
assert r.episode == 5
assert r.episode_end is None
assert r.media_type == "tv_show"
def test_nxnnxnn_multi_episode_alt_form(self):
r = parse_release("Some.Show.2x07x08.1080p.WEB.x265-GRP")
r = _parse("Some.Show.2x07x08.1080p.WEB.x265-GRP")
assert r.season == 2
assert r.episode == 7
assert r.episode_end == 8
def test_season_pack(self):
r = parse_release("Oz.S03.1080p.WEBRip.x265-KONTRAST")
r = _parse("Oz.S03.1080p.WEBRip.x265-KONTRAST")
assert r.season == 3
assert r.episode is None
assert r.is_season_pack is True
@@ -71,7 +78,7 @@ class TestParseMovie:
"""Movie releases."""
def test_basic_movie(self):
r = parse_release("Inception.2010.1080p.BluRay.x264-GROUP")
r = _parse("Inception.2010.1080p.BluRay.x264-GROUP")
assert r.title == "Inception"
assert r.year == 2010
assert r.season is None
@@ -83,13 +90,13 @@ class TestParseMovie:
assert r.media_type == "movie"
def test_movie_multi_word_title(self):
r = parse_release("The.Dark.Knight.2008.2160p.UHD.BluRay.x265-TERMINAL")
r = _parse("The.Dark.Knight.2008.2160p.UHD.BluRay.x265-TERMINAL")
assert r.title == "The.Dark.Knight"
assert r.year == 2008
assert r.quality == "2160p"
def test_movie_without_year_still_movie_if_tech_present(self):
r = parse_release("UntitledFilm.1080p.WEBRip.x264-GRP")
r = _parse("UntitledFilm.1080p.WEBRip.x264-GRP")
# No season, no year, but tech markers → still movie
assert r.media_type == "movie"
assert r.year is None
@@ -99,39 +106,39 @@ class TestParseEdgeCases:
"""Site tags, malformed names, and unknown media types."""
def test_site_tag_prefix_stripped(self):
r = parse_release("[ OxTorrent.vc ] The.Title.S01E01.1080p.WEB.x265-GRP")
r = _parse("[ OxTorrent.vc ] The.Title.S01E01.1080p.WEB.x265-GRP")
assert r.site_tag == "OxTorrent.vc"
assert r.parse_path == "sanitized"
assert r.season == 1
assert r.episode == 1
def test_site_tag_suffix_stripped(self):
r = parse_release("The.Title.S01E01.1080p.WEB.x265-NTb[TGx]")
r = _parse("The.Title.S01E01.1080p.WEB.x265-NTb[TGx]")
assert r.site_tag == "TGx"
# Suffix-tagged names are well-formed (only [] in tag → after strip clean)
assert r.season == 1
def test_irrecoverably_malformed(self):
# @ is a forbidden char and not stripped by _sanitize → stays malformed
r = parse_release("foo@bar@baz")
r = _parse("foo@bar@baz")
assert r.media_type == "unknown"
assert r.parse_path == "ai"
assert r.group == "UNKNOWN"
def test_empty_unknown_when_no_evidence(self):
r = parse_release("Some.Random.Title")
r = _parse("Some.Random.Title")
# No season, no year, no tech markers → unknown
assert r.media_type == "unknown"
def test_missing_group_defaults_to_unknown(self):
r = parse_release("Movie.2020.1080p.WEBRip.x265")
r = _parse("Movie.2020.1080p.WEBRip.x265")
# No "-GROUP" suffix → group = "UNKNOWN"
assert r.group == "UNKNOWN"
def test_yts_bracket_release(self):
# YTS-style: spaces, parens for year, multiple bracketed tech tokens.
# The tokenizer must handle ' ', '(', ')', '[', ']' transparently.
r = parse_release("The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]")
r = _parse("The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]")
assert r.title == "The.Father"
assert r.year == 2020
assert r.quality == "1080p"
@@ -141,7 +148,7 @@ class TestParseEdgeCases:
def test_human_friendly_spaces(self):
# Spaces as separators (no brackets).
r = parse_release("Inception 2010 1080p BluRay x264-GROUP")
r = _parse("Inception 2010 1080p BluRay x264-GROUP")
assert r.title == "Inception"
assert r.year == 2010
assert r.quality == "1080p"
@@ -151,7 +158,7 @@ class TestParseEdgeCases:
def test_underscore_separators(self):
# Old usenet style: underscores between tokens.
r = parse_release("Some_Show_S01E01_1080p_WEB_x265-GRP")
r = _parse("Some_Show_S01E01_1080p_WEB_x265-GRP")
assert r.season == 1
assert r.episode == 1
assert r.quality == "1080p"
@@ -162,15 +169,15 @@ class TestParseAudioVideoEdition:
"""Audio, video metadata, edition extraction."""
def test_audio_codec_and_channels(self):
r = parse_release("Movie.2020.1080p.BluRay.DTS.5.1.x264-GRP")
r = _parse("Movie.2020.1080p.BluRay.DTS.5.1.x264-GRP")
assert r.audio_channels == "5.1"
def test_language_token(self):
r = parse_release("Movie.2020.MULTI.1080p.WEBRip.x265-GRP")
r = _parse("Movie.2020.MULTI.1080p.WEBRip.x265-GRP")
assert "MULTI" in r.languages
def test_edition_token(self):
r = parse_release("Movie.2020.UNRATED.1080p.BluRay.x264-GRP")
r = _parse("Movie.2020.UNRATED.1080p.BluRay.x264-GRP")
assert r.edition == "UNRATED"
@@ -178,19 +185,21 @@ class TestParsedReleaseFolderNames:
"""Helpers that build filesystem-safe folder/filenames."""
def _parsed_tv(self) -> ParsedRelease:
return parse_release("Oz.S03E01.1080p.WEBRip.x265-KONTRAST")
return _parse("Oz.S03E01.1080p.WEBRip.x265-KONTRAST")
def _parsed_movie(self) -> ParsedRelease:
return parse_release("Inception.2010.1080p.BluRay.x264-GROUP")
return _parse("Inception.2010.1080p.BluRay.x264-GROUP")
def test_show_folder_name(self):
r = self._parsed_tv()
assert r.show_folder_name("Oz", 1997) == "Oz.1997.1080p.WEBRip.x265-KONTRAST"
def test_show_folder_name_strips_windows_chars(self):
def test_show_folder_name_uses_already_safe_title(self):
# Option B: callers sanitize at the use-case boundary via
# kb.sanitize_for_fs(...) before passing the title in.
r = self._parsed_tv()
# Colons and question marks are Windows-forbidden — must be stripped.
result = r.show_folder_name("Oz: The Series?", 1997)
safe = _KB.sanitize_for_fs("Oz: The Series?")
result = r.show_folder_name(safe, 1997)
assert ":" not in result
assert "?" not in result
@@ -202,7 +211,7 @@ class TestParsedReleaseFolderNames:
assert "E01" not in result
def test_season_folder_name_multi_episode(self):
r = parse_release("Archer.S14E09E10E11.1080p.WEB.x265-GRP")
r = _parse("Archer.S14E09E10E11.1080p.WEB.x265-GRP")
result = r.season_folder_name()
assert "S14" in result
assert "E09" not in result
@@ -251,21 +260,21 @@ class TestParsedReleaseInvariants:
def test_raw_is_preserved(self):
raw = "Oz.S03E01.1080p.WEBRip.x265-KONTRAST"
r = parse_release(raw)
r = _parse(raw)
assert r.raw == raw
def test_languages_defaults_to_empty_list_not_none(self):
r = parse_release("Movie.2020.1080p.BluRay.x264-GRP")
r = _parse("Movie.2020.1080p.BluRay.x264-GRP")
# __post_init__ ensures languages is a list, never None
assert r.languages == []
def test_tech_string_joined(self):
r = parse_release("Movie.2020.1080p.BluRay.x264-GRP")
r = _parse("Movie.2020.1080p.BluRay.x264-GRP")
assert r.tech_string == "1080p.BluRay.x264"
def test_tech_string_partial(self):
# Codec-only release (no quality/source): tech_string == codec
r = parse_release("Show.S01E01.x265-GRP")
r = _parse("Show.S01E01.x265-GRP")
assert r.tech_string == "x265"
assert r.codec == "x265"
assert r.quality is None
@@ -280,4 +289,4 @@ class TestParsedReleaseInvariants:
],
)
def test_media_type_inference(self, name, expected_type):
assert parse_release(name).media_type == expected_type
assert _parse(name).media_type == expected_type
+4 -2
View File
@@ -19,8 +19,10 @@ from dataclasses import asdict
import pytest
from alfred.domain.release.services import parse_release
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from tests.fixtures.releases.conftest import ReleaseFixture, discover_fixtures
_KB = YamlReleaseKnowledge()
FIXTURES = discover_fixtures()
@@ -34,9 +36,9 @@ def test_parse_matches_fixture(fixture: ReleaseFixture, tmp_path) -> None:
# plausible filesystem paths. Catches typos / missing leading dirs early.
fixture.materialize(tmp_path)
result = asdict(parse_release(fixture.release_name))
result = asdict(parse_release(fixture.release_name, _KB))
# ``is_season_pack`` is a @property — asdict() does not include it.
result["is_season_pack"] = parse_release(fixture.release_name).is_season_pack
result["is_season_pack"] = parse_release(fixture.release_name, _KB).is_season_pack
for field, expected in fixture.expected_parsed.items():
assert field in result, (
@@ -34,6 +34,9 @@ from alfred.infrastructure.filesystem.filesystem_operations import (
)
from alfred.infrastructure.filesystem.find_video import find_video_file
from alfred.infrastructure.filesystem.organizer import MediaOrganizer
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
_KB = YamlReleaseKnowledge()
# --------------------------------------------------------------------------- #
# ffprobe.probe #
@@ -263,35 +266,35 @@ class TestFindVideo:
def test_returns_file_directly_when_video(self, tmp_path):
f = tmp_path / "Movie.mkv"
f.write_bytes(b"")
assert find_video_file(f) == f
assert find_video_file(f, _KB) == f
def test_returns_none_when_file_is_not_video(self, tmp_path):
f = tmp_path / "notes.txt"
f.write_text("x")
assert find_video_file(f) is None
assert find_video_file(f, _KB) is None
def test_returns_none_when_folder_has_no_video(self, tmp_path):
(tmp_path / "a.txt").write_text("x")
assert find_video_file(tmp_path) is None
assert find_video_file(tmp_path, _KB) is None
def test_returns_first_sorted_video(self, tmp_path):
(tmp_path / "B.mkv").write_bytes(b"")
(tmp_path / "A.mkv").write_bytes(b"")
(tmp_path / "C.mkv").write_bytes(b"")
found = find_video_file(tmp_path)
found = find_video_file(tmp_path, _KB)
assert found.name == "A.mkv"
def test_recurses_into_subfolders(self, tmp_path):
sub = tmp_path / "sub"
sub.mkdir()
(sub / "X.mkv").write_bytes(b"")
found = find_video_file(tmp_path)
found = find_video_file(tmp_path, _KB)
assert found is not None and found.name == "X.mkv"
def test_case_insensitive_extension(self, tmp_path):
f = tmp_path / "Movie.MKV"
f.write_bytes(b"")
assert find_video_file(f) == f
assert find_video_file(f, _KB) == f
# --------------------------------------------------------------------------- #