diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e9c21e..575e567 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -184,6 +184,47 @@ callers). globally — noisy on parser mappers and orchestrator use-cases where early-return validation is essential complexity. Ignore `PLW0603` for the documented memory singleton (`infrastructure/persistence/context.py`). +- **Release-knowledge DDD purification** (`refactor/domain-release-knowledge`): + the last domain → infrastructure leak (`domain/release/value_objects.py` + loading YAML at import-time) is gone. Achieved via: + - **`ReleaseKnowledge` Protocol port** at + `alfred/domain/release/ports/knowledge.py` declares the read-only query + surface release parsing needs (token sets for resolutions, sources, codecs, + languages, hdr extras; structured dicts for audio, video_meta, editions, + media_type_tokens; separators list; file-extension sets used by + application/infra callers; `sanitize_for_fs(text)` method). + - **`YamlReleaseKnowledge` adapter** at + `alfred/infrastructure/knowledge/release_kb.py` loads every YAML constant + once at construction. Builds an immutable `str.maketrans` translation + table for filesystem sanitization. + - **`parse_release(name, kb)`** takes the knowledge as an explicit + parameter — no more module-level YAML loading inside the domain. Every + internal helper (`_tokenize`, `_extract_tech`, `_extract_languages`, + `_extract_audio`, `_extract_video_meta`, `_extract_edition`, + `_extract_title`, `_infer_media_type`, `_is_well_formed`) takes `kb`. + - **`ParsedRelease` Option B**: sanitization happens once at parse time + and is stored on a new `title_sanitized: str` field. Builder methods + (`show_folder_name`, `season_folder_name`, `episode_filename`, + `movie_folder_name`, `movie_filename`) are now pure — they accept + already-sanitized `tmdb_title_safe` / `tmdb_episode_title_safe` + arguments. Callers at the use-case boundary sanitize TMDB strings + via `kb.sanitize_for_fs(...)` before passing them in. + - **All domain-knowledge constants removed from `value_objects.py`**: + `_RESOLUTIONS`, `_SOURCES`, `_CODECS`, `_AUDIO`, `_VIDEO_META`, + `_EDITIONS`, `_HDR_EXTRA`, `_MEDIA_TYPE_TOKENS`, `_LANGUAGE_TOKENS`, + `_FORBIDDEN_CHARS`, `_VIDEO_EXTENSIONS`, `_NON_VIDEO_EXTENSIONS`, + `_SUBTITLE_EXTENSIONS`, `_METADATA_EXTENSIONS`, `_WIN_FORBIDDEN_TABLE`, + and the `_sanitize_for_fs` helper. The domain module is now pure. + - **Application-layer KB singleton**: `resolve_destination.py` instantiates + a module-level `_KB: ReleaseKnowledge = YamlReleaseKnowledge()` and + threads it through every `parse_release(...)` call. The local + `_sanitize` helper and `_WIN_FORBIDDEN` regex were dropped in favor of + `_KB.sanitize_for_fs(...)`. + - **`detect_media_type(parsed, source_path, kb)` and + `find_video_file(path, kb)`** now take the knowledge explicitly + instead of importing `_*_EXTENSIONS` constants from the domain. + `agent/tools/filesystem.py::analyze_release` imports the application + KB singleton and passes it through. --- diff --git a/alfred/agent/tools/filesystem.py b/alfred/agent/tools/filesystem.py index 9abadec..afb156e 100644 --- a/alfred/agent/tools/filesystem.py +++ b/alfred/agent/tools/filesystem.py @@ -190,15 +190,16 @@ def set_path_for_folder(folder_name: str, path_value: str) -> dict[str, Any]: def analyze_release(release_name: str, source_path: str) -> dict[str, Any]: """Thin tool wrapper — semantics live in alfred/agent/tools/specs/analyze_release.yaml.""" + from alfred.application.filesystem.resolve_destination import _KB # noqa: PLC0415 from alfred.domain.release.services import parse_release # noqa: PLC0415 path = Path(source_path) - parsed = parse_release(release_name) - parsed.media_type = detect_media_type(parsed, path) + parsed = parse_release(release_name, _KB) + parsed.media_type = detect_media_type(parsed, path, _KB) probe_used = False if parsed.media_type not in ("unknown", "other"): - video_file = find_video_file(path) + video_file = find_video_file(path, _KB) if video_file: media_info = probe(video_file) if media_info: diff --git a/alfred/application/filesystem/detect_media_type.py b/alfred/application/filesystem/detect_media_type.py index 10c584a..1fbef84 100644 --- a/alfred/application/filesystem/detect_media_type.py +++ b/alfred/application/filesystem/detect_media_type.py @@ -19,15 +19,13 @@ from __future__ import annotations from pathlib import Path -from alfred.domain.release.value_objects import ( - _METADATA_EXTENSIONS, - _NON_VIDEO_EXTENSIONS, - _VIDEO_EXTENSIONS, - ParsedRelease, -) +from alfred.domain.release.ports import ReleaseKnowledge +from alfred.domain.release.value_objects import ParsedRelease -def detect_media_type(parsed: ParsedRelease, source_path: Path) -> str: +def detect_media_type( + parsed: ParsedRelease, source_path: Path, kb: ReleaseKnowledge +) -> str: """ Return a refined media_type string for the given source_path. @@ -37,10 +35,10 @@ def detect_media_type(parsed: ParsedRelease, source_path: Path) -> str: extensions = _collect_extensions(source_path) # Metadata extensions (.nfo, .srt, …) are always present alongside releases # and must not influence the type decision. - conclusive = extensions - _METADATA_EXTENSIONS + conclusive = extensions - kb.metadata_extensions - has_video = bool(conclusive & _VIDEO_EXTENSIONS) - has_non_video = bool(conclusive & _NON_VIDEO_EXTENSIONS) + has_video = bool(conclusive & kb.video_extensions) + has_non_video = bool(conclusive & kb.non_video_extensions) if has_video and has_non_video: return "unknown" diff --git a/alfred/application/filesystem/resolve_destination.py b/alfred/application/filesystem/resolve_destination.py index 2a5aeb3..5fc5f44 100644 --- a/alfred/application/filesystem/resolve_destination.py +++ b/alfred/application/filesystem/resolve_destination.py @@ -8,34 +8,39 @@ Four distinct use cases, one per release type: - resolve_series_destination : complete series multi-season pack (folder move) Each returns a dedicated DTO with only the fields that make sense for that type. + +These use cases follow Option B of the snapshot-VO design: ``ParsedRelease`` +arrives with ``title_sanitized`` already computed, and TMDB-supplied strings +are sanitized **at the use-case boundary** (here) before being passed into +``ParsedRelease`` builder methods. The builders themselves perform no I/O and +no sanitization. """ from __future__ import annotations import logging -import re from dataclasses import dataclass from pathlib import Path from alfred.domain.release import parse_release +from alfred.domain.release.ports import ReleaseKnowledge +from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge from alfred.infrastructure.persistence import get_memory logger = logging.getLogger(__name__) -_WIN_FORBIDDEN = re.compile(r'[?:*"<>|\\]') - - -def _sanitize(text: str) -> str: - return _WIN_FORBIDDEN.sub("", text) +# Single module-level knowledge instance. YAML is loaded once at first import. +# Tests that need a custom KB can monkeypatch this attribute. +_KB: ReleaseKnowledge = YamlReleaseKnowledge() def _find_existing_tvshow_folders( - tv_root: Path, tmdb_title: str, tmdb_year: int + tv_root: Path, tmdb_title_safe: str, tmdb_year: int ) -> list[str]: """Return folder names in tv_root that match title + year prefix.""" if not tv_root.exists(): return [] - clean_title = _sanitize(tmdb_title).replace(" ", ".") + clean_title = tmdb_title_safe.replace(" ", ".") prefix = f"{clean_title}.{tmdb_year}".lower() return sorted( entry.name @@ -66,6 +71,7 @@ class _Clarification: def _resolve_series_folder( tv_root: Path, tmdb_title: str, + tmdb_title_safe: str, tmdb_year: int, computed_name: str, confirmed_folder: str | None, @@ -80,7 +86,7 @@ def _resolve_series_folder( if confirmed_folder: return confirmed_folder, not (tv_root / confirmed_folder).exists() - existing = _find_existing_tvshow_folders(tv_root, tmdb_title, tmdb_year) + existing = _find_existing_tvshow_folders(tv_root, tmdb_title_safe, tmdb_year) if not existing: return computed_name, True @@ -246,11 +252,12 @@ def resolve_season_destination( message="TV show library path is not configured.", ) - parsed = parse_release(release_name) - computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year)) + parsed = parse_release(release_name, _KB) + tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title) + computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year) resolved = _resolve_series_folder( - tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder + tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder ) if isinstance(resolved, _Clarification): return ResolvedSeasonDestination( @@ -295,12 +302,16 @@ def resolve_episode_destination( message="TV show library path is not configured.", ) - parsed = parse_release(release_name) + parsed = parse_release(release_name, _KB) ext = Path(source_file).suffix - computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year)) + tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title) + tmdb_episode_title_safe = ( + _KB.sanitize_for_fs(tmdb_episode_title) if tmdb_episode_title else None + ) + computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year) resolved = _resolve_series_folder( - tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder + tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder ) if isinstance(resolved, _Clarification): return ResolvedEpisodeDestination( @@ -311,7 +322,7 @@ def resolve_episode_destination( series_folder_name, is_new = resolved season_folder_name = parsed.season_folder_name() - filename = _sanitize(parsed.episode_filename(tmdb_episode_title, ext)) + filename = parsed.episode_filename(tmdb_episode_title_safe, ext) series_path = tv_root / series_folder_name season_path = series_path / season_folder_name @@ -349,11 +360,12 @@ def resolve_movie_destination( message="Movie library path is not configured.", ) - parsed = parse_release(release_name) + parsed = parse_release(release_name, _KB) ext = Path(source_file).suffix + tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title) - folder_name = _sanitize(parsed.movie_folder_name(tmdb_title, tmdb_year)) - filename = _sanitize(parsed.movie_filename(tmdb_title, tmdb_year, ext)) + folder_name = parsed.movie_folder_name(tmdb_title_safe, tmdb_year) + filename = parsed.movie_filename(tmdb_title_safe, tmdb_year, ext) folder_path = Path(movies_root) / folder_name file_path = folder_path / filename @@ -387,11 +399,12 @@ def resolve_series_destination( message="TV show library path is not configured.", ) - parsed = parse_release(release_name) - computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year)) + parsed = parse_release(release_name, _KB) + tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title) + computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year) resolved = _resolve_series_folder( - tv_root, tmdb_title, tmdb_year, computed_name, confirmed_folder + tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder ) if isinstance(resolved, _Clarification): return ResolvedSeriesDestination( diff --git a/alfred/domain/release/ports/__init__.py b/alfred/domain/release/ports/__init__.py new file mode 100644 index 0000000..b4ae2ec --- /dev/null +++ b/alfred/domain/release/ports/__init__.py @@ -0,0 +1,10 @@ +"""Domain ports for the release domain. + +Protocol-based abstractions that decouple ``parse_release`` and +``ParsedRelease`` from any concrete knowledge-base loader. The +infrastructure layer provides the adapter that satisfies this contract. +""" + +from .knowledge import ReleaseKnowledge + +__all__ = ["ReleaseKnowledge"] diff --git a/alfred/domain/release/ports/knowledge.py b/alfred/domain/release/ports/knowledge.py new file mode 100644 index 0000000..272e7ef --- /dev/null +++ b/alfred/domain/release/ports/knowledge.py @@ -0,0 +1,52 @@ +"""ReleaseKnowledge port — the read-only query surface that +``parse_release`` and ``ParsedRelease`` need from the release knowledge +base, expressed as a structural Protocol so the domain never imports any +concrete loader. + +The concrete YAML-backed implementation lives in +``alfred/infrastructure/knowledge/release_kb.py``. Tests can supply any +object that satisfies this shape (e.g. a simple dataclass). +""" + +from __future__ import annotations + +from typing import Protocol + + +class ReleaseKnowledge(Protocol): + """Read-only snapshot of release-name parsing knowledge.""" + + # --- Token sets used by the tokenizer / matchers --- + + resolutions: set[str] + sources: set[str] + codecs: set[str] + language_tokens: set[str] + forbidden_chars: set[str] + hdr_extra: set[str] + + # --- Structured knowledge (loaded from YAML as dicts) --- + + audio: dict + video_meta: dict + editions: dict + media_type_tokens: dict + + # --- Tokenizer separators --- + + separators: list[str] + + # --- File-extension sets (used by application/infra modules that work + # directly with filesystem paths, e.g. media-type detection, video + # lookup). Domain parsing itself doesn't touch these. --- + + video_extensions: set[str] + non_video_extensions: set[str] + subtitle_extensions: set[str] + metadata_extensions: set[str] + + # --- Filesystem sanitization (Option B: pre-sanitize at parse time) --- + + def sanitize_for_fs(self, text: str) -> str: + """Strip filesystem-forbidden characters from ``text``.""" + ... diff --git a/alfred/domain/release/services.py b/alfred/domain/release/services.py index 5bfd699..c2b943f 100644 --- a/alfred/domain/release/services.py +++ b/alfred/domain/release/services.py @@ -4,31 +4,17 @@ from __future__ import annotations import re -from alfred.infrastructure.knowledge.release import load_separators -from .value_objects import ( - _AUDIO, - _CODECS, - _EDITIONS, - _FORBIDDEN_CHARS, - _HDR_EXTRA, - _LANGUAGE_TOKENS, - _MEDIA_TYPE_TOKENS, - _RESOLUTIONS, - _SOURCES, - _VIDEO_META, - MediaTypeToken, - ParsedRelease, - ParsePath, -) +from .ports import ReleaseKnowledge +from .value_objects import MediaTypeToken, ParsedRelease, ParsePath -def _tokenize(name: str) -> list[str]: +def _tokenize(name: str, kb: ReleaseKnowledge) -> list[str]: """Split a release name on the configured separators, dropping empty tokens.""" - pattern = "[" + re.escape("".join(load_separators())) + "]+" + pattern = "[" + re.escape("".join(kb.separators)) + "]+" return [t for t in re.split(pattern, name) if t] -def parse_release(name: str) -> ParsedRelease: +def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease: """ Parse a release name and return a ParsedRelease. @@ -48,11 +34,12 @@ def parse_release(name: str) -> ParsedRelease: if site_tag is not None: parse_path = ParsePath.SANITIZED.value - if not _is_well_formed(clean): + if not _is_well_formed(clean, kb): return ParsedRelease( raw=name, normalised=clean, title=clean, + title_sanitized=kb.sanitize_for_fs(clean), year=None, season=None, episode=None, @@ -68,21 +55,22 @@ def parse_release(name: str) -> ParsedRelease: ) name = clean - tokens = _tokenize(name) + tokens = _tokenize(name, kb) season, episode, episode_end = _extract_season_episode(tokens) - quality, source, codec, group, tech_tokens = _extract_tech(tokens) - languages, lang_tokens = _extract_languages(tokens) - audio_codec, audio_channels, audio_tokens = _extract_audio(tokens) - bit_depth, hdr_format, video_tokens = _extract_video_meta(tokens) - edition, edition_tokens = _extract_edition(tokens) + quality, source, codec, group, tech_tokens = _extract_tech(tokens, kb) + languages, lang_tokens = _extract_languages(tokens, kb) + audio_codec, audio_channels, audio_tokens = _extract_audio(tokens, kb) + bit_depth, hdr_format, video_tokens = _extract_video_meta(tokens, kb) + edition, edition_tokens = _extract_edition(tokens, kb) title = _extract_title( tokens, tech_tokens | lang_tokens | audio_tokens | video_tokens | edition_tokens, + kb, ) year = _extract_year(tokens, title) media_type = _infer_media_type( - season, quality, source, codec, year, edition, tokens + season, quality, source, codec, year, edition, tokens, kb ) tech_parts = [p for p in [quality, source, codec] if p] @@ -92,6 +80,7 @@ def parse_release(name: str) -> ParsedRelease: raw=name, normalised=name, title=title, + title_sanitized=kb.sanitize_for_fs(title), year=year, season=season, episode=episode, @@ -121,6 +110,7 @@ def _infer_media_type( year: int | None, edition: str | None, tokens: list[str], + kb: ReleaseKnowledge, ) -> str: """ Infer media_type from token-level evidence only (no filesystem access). @@ -134,9 +124,9 @@ def _infer_media_type( """ upper_tokens = {t.upper() for t in tokens} - doc_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("doc", [])} - concert_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("concert", [])} - integrale_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("integrale", [])} + doc_tokens = {t.upper() for t in kb.media_type_tokens.get("doc", [])} + concert_tokens = {t.upper() for t in kb.media_type_tokens.get("concert", [])} + integrale_tokens = {t.upper() for t in kb.media_type_tokens.get("integrale", [])} if upper_tokens & doc_tokens: return MediaTypeToken.DOCUMENTARY.value @@ -154,15 +144,15 @@ def _infer_media_type( return MediaTypeToken.UNKNOWN.value -def _is_well_formed(name: str) -> bool: +def _is_well_formed(name: str, kb: ReleaseKnowledge) -> bool: """Return True if name contains no forbidden characters per scene naming rules. Characters listed as token separators (spaces, brackets, parens, …) are NOT considered malforming — the tokenizer handles them. Only truly broken chars like '@', '#', '!', '%' make a name malformed. """ - tokenizable = set(load_separators()) - return not any(c in name for c in _FORBIDDEN_CHARS if c not in tokenizable) + tokenizable = set(kb.separators) + return not any(c in name for c in kb.forbidden_chars if c not in tokenizable) def _strip_site_tag(name: str) -> tuple[str, str | None]: @@ -251,6 +241,7 @@ def _extract_season_episode( def _extract_tech( tokens: list[str], + kb: ReleaseKnowledge, ) -> tuple[str | None, str | None, str | None, str, set[str]]: """ Extract quality, source, codec, group from tokens. @@ -270,12 +261,12 @@ def _extract_tech( for tok in tokens: tl = tok.lower() - if tl in _RESOLUTIONS: + if tl in kb.resolutions: quality = tok tech_tokens.add(tok) continue - if tl in _SOURCES: + if tl in kb.sources: source = tok tech_tokens.add(tok) continue @@ -283,18 +274,18 @@ def _extract_tech( if "-" in tok: parts = tok.rsplit("-", 1) # codec-GROUP (highest priority for group) - if parts[0].lower() in _CODECS: + if parts[0].lower() in kb.codecs: codec = parts[0] group = parts[1] if parts[1] else "UNKNOWN" tech_tokens.add(tok) continue # source with dash: Web-DL, WEB-DL, etc. - if parts[0].lower() in _SOURCES or tok.lower().replace("-", "") in _SOURCES: + if parts[0].lower() in kb.sources or tok.lower().replace("-", "") in kb.sources: source = tok tech_tokens.add(tok) continue - if tl in _CODECS: + if tl in kb.codecs: codec = tok tech_tokens.add(tok) @@ -304,7 +295,7 @@ def _extract_tech( if "-" in tok: parts = tok.rsplit("-", 1) tl = tok.lower() - if tl in _SOURCES or tok.lower().replace("-", "") in _SOURCES: + if tl in kb.sources or tok.lower().replace("-", "") in kb.sources: continue if parts[1]: group = parts[1] @@ -318,17 +309,20 @@ def _is_year_token(tok: str) -> bool: return len(tok) == 4 and tok.isdigit() and 1900 <= int(tok) <= 2099 -def _extract_title(tokens: list[str], tech_tokens: set[str]) -> str: +def _extract_title( + tokens: list[str], tech_tokens: set[str], kb: ReleaseKnowledge +) -> str: """Extract the title portion: everything before the first season/year/tech token.""" title_parts = [] + known_tech = kb.resolutions | kb.sources | kb.codecs for tok in tokens: if _parse_season_episode(tok) is not None: break if _is_year_token(tok): break - if tok in tech_tokens or tok.lower() in _RESOLUTIONS | _SOURCES | _CODECS: + if tok in tech_tokens or tok.lower() in known_tech: break - if "-" in tok and any(p.lower() in _CODECS | _SOURCES for p in tok.split("-")): + if "-" in tok and any(p.lower() in kb.codecs | kb.sources for p in tok.split("-")): break title_parts.append(tok) @@ -376,12 +370,14 @@ def _match_sequences( # --------------------------------------------------------------------------- -def _extract_languages(tokens: list[str]) -> tuple[list[str], set[str]]: +def _extract_languages( + tokens: list[str], kb: ReleaseKnowledge +) -> tuple[list[str], set[str]]: """Extract language tokens. Returns (languages, matched_token_set).""" languages = [] lang_tokens: set[str] = set() for tok in tokens: - if tok.upper() in _LANGUAGE_TOKENS: + if tok.upper() in kb.language_tokens: languages.append(tok.upper()) lang_tokens.add(tok) return languages, lang_tokens @@ -393,7 +389,7 @@ def _extract_languages(tokens: list[str]) -> tuple[list[str], set[str]]: def _extract_audio( - tokens: list[str], + tokens: list[str], kb: ReleaseKnowledge, ) -> tuple[str | None, str | None, set[str]]: """ Extract audio codec and channel layout. @@ -405,12 +401,12 @@ def _extract_audio( audio_channels: str | None = None audio_tokens: set[str] = set() - known_codecs = {c.upper() for c in _AUDIO.get("codecs", [])} - known_channels = set(_AUDIO.get("channels", [])) + known_codecs = {c.upper() for c in kb.audio.get("codecs", [])} + known_channels = set(kb.audio.get("channels", [])) # Try multi-token sequences first matched_codec, matched_set = _match_sequences( - tokens, _AUDIO.get("sequences", []), "codec" + tokens, kb.audio.get("sequences", []), "codec" ) if matched_codec: audio_codec = matched_codec @@ -446,7 +442,7 @@ def _extract_audio( def _extract_video_meta( - tokens: list[str], + tokens: list[str], kb: ReleaseKnowledge, ) -> tuple[str | None, str | None, set[str]]: """ Extract bit depth and HDR format. @@ -457,12 +453,12 @@ def _extract_video_meta( hdr_format: str | None = None video_tokens: set[str] = set() - known_hdr = {h.upper() for h in _VIDEO_META.get("hdr", [])} | _HDR_EXTRA - known_depth = {d.lower() for d in _VIDEO_META.get("bit_depth", [])} + known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra + known_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])} # Try HDR sequences first matched_hdr, matched_set = _match_sequences( - tokens, _VIDEO_META.get("sequences", []), "hdr" + tokens, kb.video_meta.get("sequences", []), "hdr" ) if matched_hdr: hdr_format = matched_hdr @@ -486,17 +482,19 @@ def _extract_video_meta( # --------------------------------------------------------------------------- -def _extract_edition(tokens: list[str]) -> tuple[str | None, set[str]]: +def _extract_edition( + tokens: list[str], kb: ReleaseKnowledge +) -> tuple[str | None, set[str]]: """ Extract release edition (UNRATED, EXTENDED, DIRECTORS.CUT, …). Returns (edition, matched_token_set). """ - known_tokens = {t.upper() for t in _EDITIONS.get("tokens", [])} + known_tokens = {t.upper() for t in kb.editions.get("tokens", [])} # Try multi-token sequences first matched_edition, matched_set = _match_sequences( - tokens, _EDITIONS.get("sequences", []), "edition" + tokens, kb.editions.get("sequences", []), "edition" ) if matched_edition: return matched_edition, matched_set diff --git a/alfred/domain/release/value_objects.py b/alfred/domain/release/value_objects.py index 124a693..87329aa 100644 --- a/alfred/domain/release/value_objects.py +++ b/alfred/domain/release/value_objects.py @@ -1,4 +1,17 @@ -"""Release domain — value objects and token sets.""" +"""Release domain — value objects. + +This module is **pure**: no I/O, no YAML loading, no knowledge-base +imports. All knowledge that the parser consumes is injected at runtime +via the ``ReleaseKnowledge`` port (see ``ports/knowledge.py``). + +``ParsedRelease`` follows Option B of the snapshot-VO design: filesystem +sanitization is performed once at parse time and stored in +``title_sanitized``. The builder methods (``show_folder_name``, +``episode_filename``, etc.) are therefore pure string-formatting and do +**not** need access to any knowledge base — but they require the caller +to pass already-sanitized TMDB strings. The use case is responsible for +calling ``kb.sanitize_for_fs(tmdb_title)`` before invoking the builders. +""" from __future__ import annotations @@ -6,50 +19,6 @@ from dataclasses import dataclass, field from enum import Enum from ..shared.exceptions import ValidationError -from alfred.infrastructure.knowledge.release import ( - load_audio, - load_codecs, - load_editions, - load_forbidden_chars, - load_hdr_extra, - load_language_tokens, - load_media_type_tokens, - load_metadata_extensions, - load_non_video_extensions, - load_resolutions, - load_sources, - load_sources_extra, - load_subtitle_extensions, - load_video, - load_video_extensions, - load_win_forbidden_chars, -) - -# Token sets — loaded once at import time from alfred/knowledge/release/ -_RESOLUTIONS: set[str] = load_resolutions() -_SOURCES: set[str] = load_sources() | load_sources_extra() -_CODECS: set[str] = load_codecs() -_VIDEO_EXTENSIONS: set[str] = load_video_extensions() -_NON_VIDEO_EXTENSIONS: set[str] = load_non_video_extensions() -_SUBTITLE_EXTENSIONS: set[str] = load_subtitle_extensions() -# Both metadata and subtitle extensions are ignored when deciding the media -# type of a folder — neither is a conclusive signal for movie/tv/other. -_METADATA_EXTENSIONS: set[str] = load_metadata_extensions() | _SUBTITLE_EXTENSIONS -_FORBIDDEN_CHARS: set[str] = load_forbidden_chars() -_LANGUAGE_TOKENS: set[str] = load_language_tokens() -_AUDIO: dict = load_audio() -_VIDEO_META: dict = load_video() -_EDITIONS: dict = load_editions() -_HDR_EXTRA: set[str] = load_hdr_extra() -_MEDIA_TYPE_TOKENS: dict = load_media_type_tokens() - -# Translation table for stripping Windows-forbidden characters -_WIN_FORBIDDEN_TABLE = str.maketrans("", "", "".join(load_win_forbidden_chars())) - - -def _sanitize_for_fs(text: str) -> str: - """Remove Windows-forbidden characters from a string.""" - return text.translate(_WIN_FORBIDDEN_TABLE) class MediaTypeToken(str, Enum): @@ -105,11 +74,17 @@ def _strip_episode_from_normalized(normalized: str) -> str: @dataclass class ParsedRelease: - """Structured representation of a parsed release name.""" + """Structured representation of a parsed release name. + + ``title_sanitized`` carries the filesystem-safe form of ``title`` (computed + by the parser at construction time using the injected knowledge base). + Builder methods rely on it being already-sanitized — see module docstring. + """ raw: str # original release name (untouched) normalised: str # dots instead of spaces title: str # show/movie title (dots, no year/season/tech) + title_sanitized: str # title with filesystem-forbidden chars stripped year: int | None # movie year or show start year (from TMDB) season: int | None # season number (None for movies) episode: int | None # first episode number (None if season-pack) @@ -180,14 +155,17 @@ class ParsedRelease: def is_season_pack(self) -> bool: return self.season is not None and self.episode is None - def show_folder_name(self, tmdb_title: str, tmdb_year: int) -> str: + def show_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str: """ Build the series root folder name. Format: {Title}.{Year}.{Tech}-{Group} Example: Oz.1997.1080p.WEBRip.x265-KONTRAST + + ``tmdb_title_safe`` must already be filesystem-safe (the caller is + expected to have run it through ``kb.sanitize_for_fs``). """ - title_part = _sanitize_for_fs(tmdb_title).replace(" ", ".") + title_part = tmdb_title_safe.replace(" ", ".") tech = self.tech_string or "Unknown" return f"{title_part}.{tmdb_year}.{tech}-{self.group}" @@ -201,42 +179,45 @@ class ParsedRelease: """ return _strip_episode_from_normalized(self.normalised) - def episode_filename(self, tmdb_episode_title: str | None, ext: str) -> str: + def episode_filename(self, tmdb_episode_title_safe: str | None, ext: str) -> str: """ Build the episode filename. Format: {Title}.{SxxExx}.{EpisodeTitle}.{Tech}-{Group}.{ext} Example: Oz.S01E01.The.Routine.1080p.WEBRip.x265-KONTRAST.mkv - If tmdb_episode_title is None, omits the episode title segment. + ``tmdb_episode_title_safe`` must already be filesystem-safe; pass + ``None`` to omit the episode title segment. """ - title_part = _sanitize_for_fs(self.title) + title_part = self.title_sanitized s = f"S{self.season:02d}" if self.season is not None else "" e = f"E{self.episode:02d}" if self.episode is not None else "" se = s + e ep_title = "" - if tmdb_episode_title: - ep_title = "." + _sanitize_for_fs(tmdb_episode_title).replace(" ", ".") + if tmdb_episode_title_safe: + ep_title = "." + tmdb_episode_title_safe.replace(" ", ".") tech = self.tech_string or "Unknown" ext_clean = ext.lstrip(".") return f"{title_part}.{se}{ep_title}.{tech}-{self.group}.{ext_clean}" - def movie_folder_name(self, tmdb_title: str, tmdb_year: int) -> str: + def movie_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str: """ Build the movie folder name. Format: {Title}.{Year}.{Tech}-{Group} Example: Inception.2010.1080p.BluRay.x265-GROUP """ - return self.show_folder_name(tmdb_title, tmdb_year) + return self.show_folder_name(tmdb_title_safe, tmdb_year) - def movie_filename(self, tmdb_title: str, tmdb_year: int, ext: str) -> str: + def movie_filename( + self, tmdb_title_safe: str, tmdb_year: int, ext: str + ) -> str: """ Build the movie filename (same as folder name + extension). Example: Inception.2010.1080p.BluRay.x265-GROUP.mkv """ ext_clean = ext.lstrip(".") - return f"{self.movie_folder_name(tmdb_title, tmdb_year)}.{ext_clean}" + return f"{self.movie_folder_name(tmdb_title_safe, tmdb_year)}.{ext_clean}" diff --git a/alfred/infrastructure/filesystem/find_video.py b/alfred/infrastructure/filesystem/find_video.py index e91a290..9260331 100644 --- a/alfred/infrastructure/filesystem/find_video.py +++ b/alfred/infrastructure/filesystem/find_video.py @@ -4,10 +4,10 @@ from __future__ import annotations from pathlib import Path -from alfred.domain.release.value_objects import _VIDEO_EXTENSIONS +from alfred.domain.release.ports import ReleaseKnowledge -def find_video_file(path: Path) -> Path | None: +def find_video_file(path: Path, kb: ReleaseKnowledge) -> Path | None: """ Return the first video file found at path. @@ -15,11 +15,12 @@ def find_video_file(path: Path) -> Path | None: - If path is a folder — scan recursively, return the first video found (sorted by name for determinism, picks S01E01 before S01E02 etc.). """ + video_exts = kb.video_extensions if path.is_file(): - return path if path.suffix.lower() in _VIDEO_EXTENSIONS else None + return path if path.suffix.lower() in video_exts else None for candidate in sorted(path.rglob("*")): - if candidate.is_file() and candidate.suffix.lower() in _VIDEO_EXTENSIONS: + if candidate.is_file() and candidate.suffix.lower() in video_exts: return candidate return None diff --git a/alfred/infrastructure/knowledge/release_kb.py b/alfred/infrastructure/knowledge/release_kb.py new file mode 100644 index 0000000..5d4a790 --- /dev/null +++ b/alfred/infrastructure/knowledge/release_kb.py @@ -0,0 +1,83 @@ +"""YamlReleaseKnowledge — concrete adapter for the ``ReleaseKnowledge`` +domain port. + +Loads every release-knowledge YAML once at construction time and exposes +the parsed snapshots as plain attributes. The application layer builds a +single instance at boot and passes it down to ``parse_release`` and to +``ParsedRelease`` builder methods. + +A few extras (``video_extensions``, ``non_video_extensions``, +``subtitle_extensions``, ``metadata_extensions``) are not part of the +domain port — they are consumed by application/infra modules that handle +filesystem-level concerns. +""" + +from __future__ import annotations + +from .release import ( + load_audio, + load_codecs, + load_editions, + load_forbidden_chars, + load_hdr_extra, + load_language_tokens, + load_media_type_tokens, + load_metadata_extensions, + load_non_video_extensions, + load_resolutions, + load_separators, + load_sources, + load_sources_extra, + load_subtitle_extensions, + load_video, + load_video_extensions, + load_win_forbidden_chars, +) + + +class YamlReleaseKnowledge: + """Single object holding every parsed-release knowledge constant. + + Built once at application boot. Read-only at runtime — call sites + treat it as a snapshot. To pick up newly learned tokens without a + restart, build a fresh instance and swap it in at the call sites. + """ + + def __init__(self) -> None: + # Domain-port surface + self.resolutions: set[str] = load_resolutions() + self.sources: set[str] = load_sources() | load_sources_extra() + self.codecs: set[str] = load_codecs() + self.language_tokens: set[str] = load_language_tokens() + self.forbidden_chars: set[str] = load_forbidden_chars() + self.hdr_extra: set[str] = load_hdr_extra() + + self.audio: dict = load_audio() + self.video_meta: dict = load_video() + self.editions: dict = load_editions() + self.media_type_tokens: dict = load_media_type_tokens() + + self.separators: list[str] = load_separators() + + # File-extension sets (used by application/infra modules, not by + # the parser itself — kept here so there is a single ownership + # point for release knowledge). + self.video_extensions: set[str] = load_video_extensions() + self.non_video_extensions: set[str] = load_non_video_extensions() + self.subtitle_extensions: set[str] = load_subtitle_extensions() + # Metadata + subtitle extensions are both ignored when deciding + # the media type of a folder (neither is a conclusive signal for + # movie/tv/other), so we expose the union under the historical + # name. + self.metadata_extensions: set[str] = ( + load_metadata_extensions() | self.subtitle_extensions + ) + + # Translation table for stripping Windows-forbidden chars. + self._win_forbidden_table = str.maketrans( + "", "", "".join(load_win_forbidden_chars()) + ) + + def sanitize_for_fs(self, text: str) -> str: + """Strip Windows-forbidden characters from ``text``.""" + return text.translate(self._win_forbidden_table) diff --git a/tests/application/test_detect_media_type.py b/tests/application/test_detect_media_type.py index 2041f0a..260aa0d 100644 --- a/tests/application/test_detect_media_type.py +++ b/tests/application/test_detect_media_type.py @@ -20,16 +20,19 @@ import pytest from alfred.application.filesystem.detect_media_type import detect_media_type from alfred.domain.release.services import parse_release +from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge + +_KB = YamlReleaseKnowledge() def _parsed(media_type: str = "movie"): """Build a ParsedRelease with the requested media_type via the real parser.""" if media_type == "tv_show": - return parse_release("Show.S01E01.1080p-GRP") + return parse_release("Show.S01E01.1080p-GRP", _KB) if media_type == "movie": - return parse_release("Movie.2020.1080p-GRP") + return parse_release("Movie.2020.1080p-GRP", _KB) # "unknown" / other — feed a name the parser can't classify - return parse_release("randomthing") + return parse_release("randomthing", _KB) # --------------------------------------------------------------------------- # @@ -41,30 +44,30 @@ class TestFile: def test_video_file_preserves_parsed_type(self, tmp_path: Path): f = tmp_path / "x.mkv" f.write_bytes(b"") - assert detect_media_type(_parsed("movie"), f) == "movie" + assert detect_media_type(_parsed("movie"), f, _KB) == "movie" def test_video_file_preserves_tv_type(self, tmp_path: Path): f = tmp_path / "ep.mp4" f.write_bytes(b"") - assert detect_media_type(_parsed("tv_show"), f) == "tv_show" + assert detect_media_type(_parsed("tv_show"), f, _KB) == "tv_show" def test_non_video_file_returns_other(self, tmp_path: Path): f = tmp_path / "x.iso" f.write_bytes(b"") - assert detect_media_type(_parsed("movie"), f) == "other" + assert detect_media_type(_parsed("movie"), f, _KB) == "other" @pytest.mark.parametrize("ext", [".rar", ".zip", ".7z", ".exe", ".dmg"]) def test_various_non_video_extensions(self, tmp_path: Path, ext): f = tmp_path / f"x{ext}" f.write_bytes(b"") - assert detect_media_type(_parsed("movie"), f) == "other" + assert detect_media_type(_parsed("movie"), f, _KB) == "other" def test_metadata_only_file_keeps_parsed_type(self, tmp_path: Path): # Metadata extension is stripped from conclusive set — no video, no # non-video → falls through to parsed.media_type. f = tmp_path / "x.nfo" f.write_bytes(b"") - assert detect_media_type(_parsed("movie"), f) == "movie" + assert detect_media_type(_parsed("movie"), f, _KB) == "movie" # --------------------------------------------------------------------------- # @@ -75,27 +78,27 @@ class TestFile: class TestFolder: def test_folder_with_video_keeps_parsed_type(self, tmp_path: Path): (tmp_path / "main.mkv").write_bytes(b"") - assert detect_media_type(_parsed("movie"), tmp_path) == "movie" + assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "movie" def test_folder_only_non_video_returns_other(self, tmp_path: Path): (tmp_path / "disc.iso").write_bytes(b"") (tmp_path / "part.rar").write_bytes(b"") - assert detect_media_type(_parsed("movie"), tmp_path) == "other" + assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "other" def test_folder_mixed_returns_unknown(self, tmp_path: Path): (tmp_path / "main.mkv").write_bytes(b"") (tmp_path / "extras.iso").write_bytes(b"") - assert detect_media_type(_parsed("movie"), tmp_path) == "unknown" + assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "unknown" def test_empty_folder_keeps_parsed_type(self, tmp_path: Path): - assert detect_media_type(_parsed("tv_show"), tmp_path) == "tv_show" + assert detect_media_type(_parsed("tv_show"), tmp_path, _KB) == "tv_show" def test_folder_only_metadata_keeps_parsed_type(self, tmp_path: Path): (tmp_path / "info.nfo").write_bytes(b"") (tmp_path / "cover.jpg").write_bytes(b"") (tmp_path / "subs.srt").write_bytes(b"") # All metadata → conclusive set empty → falls through. - assert detect_media_type(_parsed("movie"), tmp_path) == "movie" + assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "movie" # --------------------------------------------------------------------------- # @@ -109,18 +112,18 @@ class TestMetadataIgnored: (tmp_path / "info.nfo").write_bytes(b"") (tmp_path / "cover.jpg").write_bytes(b"") (tmp_path / "subs.srt").write_bytes(b"") - assert detect_media_type(_parsed("movie"), tmp_path) == "movie" + assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "movie" def test_non_video_plus_metadata_still_other(self, tmp_path: Path): (tmp_path / "disc.iso").write_bytes(b"") (tmp_path / "info.nfo").write_bytes(b"") - assert detect_media_type(_parsed("movie"), tmp_path) == "other" + assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "other" def test_case_insensitive_extensions(self, tmp_path: Path): # Suffix is lowercased before classification. f = tmp_path / "X.MKV" f.write_bytes(b"") - assert detect_media_type(_parsed("movie"), f) == "movie" + assert detect_media_type(_parsed("movie"), f, _KB) == "movie" # --------------------------------------------------------------------------- # @@ -132,11 +135,11 @@ class TestMissing: def test_nonexistent_path_keeps_parsed_type(self, tmp_path: Path): missing = tmp_path / "does_not_exist.mkv" # Doesn't exist → empty extension set → falls through. - assert detect_media_type(_parsed("movie"), missing) == "movie" + assert detect_media_type(_parsed("movie"), missing, _KB) == "movie" def test_nonexistent_folder_keeps_parsed_type(self, tmp_path: Path): missing = tmp_path / "ghost" - assert detect_media_type(_parsed("tv_show"), missing) == "tv_show" + assert detect_media_type(_parsed("tv_show"), missing, _KB) == "tv_show" def test_subfolder_not_recursed(self, tmp_path: Path): # _collect_extensions scans only the first level — files inside @@ -145,4 +148,4 @@ class TestMissing: sub.mkdir() (sub / "deep.mkv").write_bytes(b"") # Top level has no files at all → empty → falls through to parsed type. - assert detect_media_type(_parsed("movie"), tmp_path) == "movie" + assert detect_media_type(_parsed("movie"), tmp_path, _KB) == "movie" diff --git a/tests/application/test_enrich_from_probe.py b/tests/application/test_enrich_from_probe.py index d40e21b..c192912 100644 --- a/tests/application/test_enrich_from_probe.py +++ b/tests/application/test_enrich_from_probe.py @@ -37,6 +37,7 @@ def _bare(**overrides) -> ParsedRelease: raw="X", normalised="X", title="X", + title_sanitized="X", year=None, season=None, episode=None, diff --git a/tests/application/test_resolve_destination.py b/tests/application/test_resolve_destination.py index ff7bc0c..1c67359 100644 --- a/tests/application/test_resolve_destination.py +++ b/tests/application/test_resolve_destination.py @@ -9,7 +9,6 @@ Four use cases compute library paths from a release name + TMDB metadata: Coverage: -- ``TestSanitize`` — Windows-forbidden chars stripped. - ``TestFindExistingTvshowFolders`` — empty root, prefix match (case + space → dot). - ``TestResolveSeriesFolderInternal`` — confirmed_folder, no existing, single match, ambiguous → _Clarification. @@ -32,7 +31,6 @@ from alfred.application.filesystem.resolve_destination import ( _Clarification, _find_existing_tvshow_folders, _resolve_series_folder, - _sanitize, resolve_episode_destination, resolve_movie_destination, resolve_season_destination, @@ -51,15 +49,6 @@ REL_SERIES = "Oz.Complete.Series.1080p.WEBRip.x265-KONTRAST" # --------------------------------------------------------------------------- # -class TestSanitize: - def test_passthrough_safe_chars(self): - assert _sanitize("Oz.1997.1080p-GRP") == "Oz.1997.1080p-GRP" - - def test_strips_windows_forbidden(self): - # ? : * " < > | \ - assert _sanitize('a?b:c*d"eg|h\\i') == "abcdefghi" - - # --------------------------------------------------------------------------- # # _find_existing_tvshow_folders # # --------------------------------------------------------------------------- # @@ -107,6 +96,7 @@ class TestResolveSeriesFolderInternal: out = _resolve_series_folder( tmp_path, "Oz", + "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", confirmed_folder="Oz.1997.X-GRP", @@ -117,6 +107,7 @@ class TestResolveSeriesFolderInternal: out = _resolve_series_folder( tmp_path, "Oz", + "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", confirmed_folder="Oz.1997.New-X", @@ -125,21 +116,21 @@ class TestResolveSeriesFolderInternal: def test_no_existing_returns_computed_as_new(self, tmp_path): out = _resolve_series_folder( - tmp_path, "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None + tmp_path, "Oz", "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None ) assert out == ("Oz.1997.WEBRip-KONTRAST", True) def test_single_existing_matching_computed_returns_existing(self, tmp_path): (tmp_path / "Oz.1997.WEBRip-KONTRAST").mkdir() out = _resolve_series_folder( - tmp_path, "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None + tmp_path, "Oz", "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None ) assert out == ("Oz.1997.WEBRip-KONTRAST", False) def test_single_existing_different_name_returns_clarification(self, tmp_path): (tmp_path / "Oz.1997.BluRay-OTHER").mkdir() out = _resolve_series_folder( - tmp_path, "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None + tmp_path, "Oz", "Oz", 1997, "Oz.1997.WEBRip-KONTRAST", None ) assert isinstance(out, _Clarification) assert "Oz" in out.question @@ -149,7 +140,7 @@ class TestResolveSeriesFolderInternal: def test_multiple_existing_returns_clarification(self, tmp_path): (tmp_path / "Oz.1997.A-GRP").mkdir() (tmp_path / "Oz.1997.B-GRP").mkdir() - out = _resolve_series_folder(tmp_path, "Oz", 1997, "Oz.1997.A-GRP", None) + out = _resolve_series_folder(tmp_path, "Oz", "Oz", 1997, "Oz.1997.A-GRP", None) assert isinstance(out, _Clarification) # Computed already in existing → not duplicated. assert out.options.count("Oz.1997.A-GRP") == 1 diff --git a/tests/domain/test_release.py b/tests/domain/test_release.py index d3005cb..f44a6cf 100644 --- a/tests/domain/test_release.py +++ b/tests/domain/test_release.py @@ -20,13 +20,20 @@ import pytest from alfred.domain.release.services import parse_release from alfred.domain.release.value_objects import ParsedRelease +from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge + +_KB = YamlReleaseKnowledge() + + +def _parse(name: str) -> ParsedRelease: + return parse_release(name, _KB) class TestParseTVEpisode: """Single-episode TV releases.""" def test_basic_tv_episode(self): - r = parse_release("Oz.S03E01.1080p.WEBRip.x265-KONTRAST") + r = _parse("Oz.S03E01.1080p.WEBRip.x265-KONTRAST") assert r.title == "Oz" assert r.season == 3 assert r.episode == 1 @@ -40,27 +47,27 @@ class TestParseTVEpisode: assert r.is_season_pack is False def test_multi_episode(self): - r = parse_release("Archer.S14E09E10.1080p.WEB.x265-GRP") + r = _parse("Archer.S14E09E10.1080p.WEB.x265-GRP") assert r.season == 14 assert r.episode == 9 assert r.episode_end == 10 def test_nxnn_alt_form(self): # Alt season/episode form: 1x05 instead of S01E05. - r = parse_release("Some.Show.1x05.720p.HDTV.x264-GRP") + r = _parse("Some.Show.1x05.720p.HDTV.x264-GRP") assert r.season == 1 assert r.episode == 5 assert r.episode_end is None assert r.media_type == "tv_show" def test_nxnnxnn_multi_episode_alt_form(self): - r = parse_release("Some.Show.2x07x08.1080p.WEB.x265-GRP") + r = _parse("Some.Show.2x07x08.1080p.WEB.x265-GRP") assert r.season == 2 assert r.episode == 7 assert r.episode_end == 8 def test_season_pack(self): - r = parse_release("Oz.S03.1080p.WEBRip.x265-KONTRAST") + r = _parse("Oz.S03.1080p.WEBRip.x265-KONTRAST") assert r.season == 3 assert r.episode is None assert r.is_season_pack is True @@ -71,7 +78,7 @@ class TestParseMovie: """Movie releases.""" def test_basic_movie(self): - r = parse_release("Inception.2010.1080p.BluRay.x264-GROUP") + r = _parse("Inception.2010.1080p.BluRay.x264-GROUP") assert r.title == "Inception" assert r.year == 2010 assert r.season is None @@ -83,13 +90,13 @@ class TestParseMovie: assert r.media_type == "movie" def test_movie_multi_word_title(self): - r = parse_release("The.Dark.Knight.2008.2160p.UHD.BluRay.x265-TERMINAL") + r = _parse("The.Dark.Knight.2008.2160p.UHD.BluRay.x265-TERMINAL") assert r.title == "The.Dark.Knight" assert r.year == 2008 assert r.quality == "2160p" def test_movie_without_year_still_movie_if_tech_present(self): - r = parse_release("UntitledFilm.1080p.WEBRip.x264-GRP") + r = _parse("UntitledFilm.1080p.WEBRip.x264-GRP") # No season, no year, but tech markers → still movie assert r.media_type == "movie" assert r.year is None @@ -99,39 +106,39 @@ class TestParseEdgeCases: """Site tags, malformed names, and unknown media types.""" def test_site_tag_prefix_stripped(self): - r = parse_release("[ OxTorrent.vc ] The.Title.S01E01.1080p.WEB.x265-GRP") + r = _parse("[ OxTorrent.vc ] The.Title.S01E01.1080p.WEB.x265-GRP") assert r.site_tag == "OxTorrent.vc" assert r.parse_path == "sanitized" assert r.season == 1 assert r.episode == 1 def test_site_tag_suffix_stripped(self): - r = parse_release("The.Title.S01E01.1080p.WEB.x265-NTb[TGx]") + r = _parse("The.Title.S01E01.1080p.WEB.x265-NTb[TGx]") assert r.site_tag == "TGx" # Suffix-tagged names are well-formed (only [] in tag → after strip clean) assert r.season == 1 def test_irrecoverably_malformed(self): # @ is a forbidden char and not stripped by _sanitize → stays malformed - r = parse_release("foo@bar@baz") + r = _parse("foo@bar@baz") assert r.media_type == "unknown" assert r.parse_path == "ai" assert r.group == "UNKNOWN" def test_empty_unknown_when_no_evidence(self): - r = parse_release("Some.Random.Title") + r = _parse("Some.Random.Title") # No season, no year, no tech markers → unknown assert r.media_type == "unknown" def test_missing_group_defaults_to_unknown(self): - r = parse_release("Movie.2020.1080p.WEBRip.x265") + r = _parse("Movie.2020.1080p.WEBRip.x265") # No "-GROUP" suffix → group = "UNKNOWN" assert r.group == "UNKNOWN" def test_yts_bracket_release(self): # YTS-style: spaces, parens for year, multiple bracketed tech tokens. # The tokenizer must handle ' ', '(', ')', '[', ']' transparently. - r = parse_release("The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]") + r = _parse("The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]") assert r.title == "The.Father" assert r.year == 2020 assert r.quality == "1080p" @@ -141,7 +148,7 @@ class TestParseEdgeCases: def test_human_friendly_spaces(self): # Spaces as separators (no brackets). - r = parse_release("Inception 2010 1080p BluRay x264-GROUP") + r = _parse("Inception 2010 1080p BluRay x264-GROUP") assert r.title == "Inception" assert r.year == 2010 assert r.quality == "1080p" @@ -151,7 +158,7 @@ class TestParseEdgeCases: def test_underscore_separators(self): # Old usenet style: underscores between tokens. - r = parse_release("Some_Show_S01E01_1080p_WEB_x265-GRP") + r = _parse("Some_Show_S01E01_1080p_WEB_x265-GRP") assert r.season == 1 assert r.episode == 1 assert r.quality == "1080p" @@ -162,15 +169,15 @@ class TestParseAudioVideoEdition: """Audio, video metadata, edition extraction.""" def test_audio_codec_and_channels(self): - r = parse_release("Movie.2020.1080p.BluRay.DTS.5.1.x264-GRP") + r = _parse("Movie.2020.1080p.BluRay.DTS.5.1.x264-GRP") assert r.audio_channels == "5.1" def test_language_token(self): - r = parse_release("Movie.2020.MULTI.1080p.WEBRip.x265-GRP") + r = _parse("Movie.2020.MULTI.1080p.WEBRip.x265-GRP") assert "MULTI" in r.languages def test_edition_token(self): - r = parse_release("Movie.2020.UNRATED.1080p.BluRay.x264-GRP") + r = _parse("Movie.2020.UNRATED.1080p.BluRay.x264-GRP") assert r.edition == "UNRATED" @@ -178,19 +185,21 @@ class TestParsedReleaseFolderNames: """Helpers that build filesystem-safe folder/filenames.""" def _parsed_tv(self) -> ParsedRelease: - return parse_release("Oz.S03E01.1080p.WEBRip.x265-KONTRAST") + return _parse("Oz.S03E01.1080p.WEBRip.x265-KONTRAST") def _parsed_movie(self) -> ParsedRelease: - return parse_release("Inception.2010.1080p.BluRay.x264-GROUP") + return _parse("Inception.2010.1080p.BluRay.x264-GROUP") def test_show_folder_name(self): r = self._parsed_tv() assert r.show_folder_name("Oz", 1997) == "Oz.1997.1080p.WEBRip.x265-KONTRAST" - def test_show_folder_name_strips_windows_chars(self): + def test_show_folder_name_uses_already_safe_title(self): + # Option B: callers sanitize at the use-case boundary via + # kb.sanitize_for_fs(...) before passing the title in. r = self._parsed_tv() - # Colons and question marks are Windows-forbidden — must be stripped. - result = r.show_folder_name("Oz: The Series?", 1997) + safe = _KB.sanitize_for_fs("Oz: The Series?") + result = r.show_folder_name(safe, 1997) assert ":" not in result assert "?" not in result @@ -202,7 +211,7 @@ class TestParsedReleaseFolderNames: assert "E01" not in result def test_season_folder_name_multi_episode(self): - r = parse_release("Archer.S14E09E10E11.1080p.WEB.x265-GRP") + r = _parse("Archer.S14E09E10E11.1080p.WEB.x265-GRP") result = r.season_folder_name() assert "S14" in result assert "E09" not in result @@ -251,21 +260,21 @@ class TestParsedReleaseInvariants: def test_raw_is_preserved(self): raw = "Oz.S03E01.1080p.WEBRip.x265-KONTRAST" - r = parse_release(raw) + r = _parse(raw) assert r.raw == raw def test_languages_defaults_to_empty_list_not_none(self): - r = parse_release("Movie.2020.1080p.BluRay.x264-GRP") + r = _parse("Movie.2020.1080p.BluRay.x264-GRP") # __post_init__ ensures languages is a list, never None assert r.languages == [] def test_tech_string_joined(self): - r = parse_release("Movie.2020.1080p.BluRay.x264-GRP") + r = _parse("Movie.2020.1080p.BluRay.x264-GRP") assert r.tech_string == "1080p.BluRay.x264" def test_tech_string_partial(self): # Codec-only release (no quality/source): tech_string == codec - r = parse_release("Show.S01E01.x265-GRP") + r = _parse("Show.S01E01.x265-GRP") assert r.tech_string == "x265" assert r.codec == "x265" assert r.quality is None @@ -280,4 +289,4 @@ class TestParsedReleaseInvariants: ], ) def test_media_type_inference(self, name, expected_type): - assert parse_release(name).media_type == expected_type + assert _parse(name).media_type == expected_type diff --git a/tests/domain/test_release_fixtures.py b/tests/domain/test_release_fixtures.py index dd7d0dd..31f3fff 100644 --- a/tests/domain/test_release_fixtures.py +++ b/tests/domain/test_release_fixtures.py @@ -19,8 +19,10 @@ from dataclasses import asdict import pytest from alfred.domain.release.services import parse_release +from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge from tests.fixtures.releases.conftest import ReleaseFixture, discover_fixtures +_KB = YamlReleaseKnowledge() FIXTURES = discover_fixtures() @@ -34,9 +36,9 @@ def test_parse_matches_fixture(fixture: ReleaseFixture, tmp_path) -> None: # plausible filesystem paths. Catches typos / missing leading dirs early. fixture.materialize(tmp_path) - result = asdict(parse_release(fixture.release_name)) + result = asdict(parse_release(fixture.release_name, _KB)) # ``is_season_pack`` is a @property — asdict() does not include it. - result["is_season_pack"] = parse_release(fixture.release_name).is_season_pack + result["is_season_pack"] = parse_release(fixture.release_name, _KB).is_season_pack for field, expected in fixture.expected_parsed.items(): assert field in result, ( diff --git a/tests/infrastructure/test_filesystem_extras.py b/tests/infrastructure/test_filesystem_extras.py index 29ad832..6988489 100644 --- a/tests/infrastructure/test_filesystem_extras.py +++ b/tests/infrastructure/test_filesystem_extras.py @@ -34,6 +34,9 @@ from alfred.infrastructure.filesystem.filesystem_operations import ( ) from alfred.infrastructure.filesystem.find_video import find_video_file from alfred.infrastructure.filesystem.organizer import MediaOrganizer +from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge + +_KB = YamlReleaseKnowledge() # --------------------------------------------------------------------------- # # ffprobe.probe # @@ -263,35 +266,35 @@ class TestFindVideo: def test_returns_file_directly_when_video(self, tmp_path): f = tmp_path / "Movie.mkv" f.write_bytes(b"") - assert find_video_file(f) == f + assert find_video_file(f, _KB) == f def test_returns_none_when_file_is_not_video(self, tmp_path): f = tmp_path / "notes.txt" f.write_text("x") - assert find_video_file(f) is None + assert find_video_file(f, _KB) is None def test_returns_none_when_folder_has_no_video(self, tmp_path): (tmp_path / "a.txt").write_text("x") - assert find_video_file(tmp_path) is None + assert find_video_file(tmp_path, _KB) is None def test_returns_first_sorted_video(self, tmp_path): (tmp_path / "B.mkv").write_bytes(b"") (tmp_path / "A.mkv").write_bytes(b"") (tmp_path / "C.mkv").write_bytes(b"") - found = find_video_file(tmp_path) + found = find_video_file(tmp_path, _KB) assert found.name == "A.mkv" def test_recurses_into_subfolders(self, tmp_path): sub = tmp_path / "sub" sub.mkdir() (sub / "X.mkv").write_bytes(b"") - found = find_video_file(tmp_path) + found = find_video_file(tmp_path, _KB) assert found is not None and found.name == "X.mkv" def test_case_insensitive_extension(self, tmp_path): f = tmp_path / "Movie.MKV" f.write_bytes(b"") - assert find_video_file(f) == f + assert find_video_file(f, _KB) == f # --------------------------------------------------------------------------- #