From 4a74fff9ccd4c68af986ea7813ab9f1f632dc0bc Mon Sep 17 00:00:00 2001 From: Francwa Date: Tue, 19 May 2026 22:05:10 +0200 Subject: [PATCH] =?UTF-8?q?refactor(release):=20purify=20domain=20?= =?UTF-8?q?=E2=80=94=20parse=5Frelease(name,=20kb)=20+=20ParsedRelease=20O?= =?UTF-8?q?ption=20B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes the last domain → infrastructure leak in the release parser. services.py: - parse_release(name, kb) takes the knowledge as an explicit parameter. - Every helper (_tokenize, _is_well_formed, _extract_tech, _extract_languages, _extract_audio, _extract_video_meta, _extract_edition, _extract_title, _infer_media_type) takes kb. - No more module-level YAML loading. value_objects.py — Option B: - Sanitization happens once at parse time; ParsedRelease now carries a title_sanitized: str field alongside title. - Builder methods (show_folder_name, episode_filename, movie_folder_name, movie_filename) become pure: they accept already-sanitized tmdb_title_safe / tmdb_episode_title_safe arguments. Callers at the use-case boundary sanitize via kb.sanitize_for_fs(...) before passing in. - All domain-knowledge constants removed (_RESOLUTIONS, _SOURCES, _CODECS, _AUDIO, _VIDEO_META, _EDITIONS, _HDR_EXTRA, _MEDIA_TYPE_TOKENS, _LANGUAGE_TOKENS, _FORBIDDEN_CHARS, _*_EXTENSIONS, _WIN_FORBIDDEN_TABLE, _sanitize_for_fs). The module is now pure DDD. --- alfred/domain/release/services.py | 108 ++++++++++++------------- alfred/domain/release/value_objects.py | 95 +++++++++------------- 2 files changed, 91 insertions(+), 112 deletions(-) diff --git a/alfred/domain/release/services.py b/alfred/domain/release/services.py index 5bfd699..c2b943f 100644 --- a/alfred/domain/release/services.py +++ b/alfred/domain/release/services.py @@ -4,31 +4,17 @@ from __future__ import annotations import re -from alfred.infrastructure.knowledge.release import load_separators -from .value_objects import ( - _AUDIO, - _CODECS, - _EDITIONS, - _FORBIDDEN_CHARS, - _HDR_EXTRA, - _LANGUAGE_TOKENS, - _MEDIA_TYPE_TOKENS, - _RESOLUTIONS, - _SOURCES, - _VIDEO_META, - MediaTypeToken, - ParsedRelease, - ParsePath, -) +from .ports import ReleaseKnowledge +from .value_objects import MediaTypeToken, ParsedRelease, ParsePath -def _tokenize(name: str) -> list[str]: +def _tokenize(name: str, kb: ReleaseKnowledge) -> list[str]: """Split a release name on the configured separators, dropping empty tokens.""" - pattern = "[" + re.escape("".join(load_separators())) + "]+" + pattern = "[" + re.escape("".join(kb.separators)) + "]+" return [t for t in re.split(pattern, name) if t] -def parse_release(name: str) -> ParsedRelease: +def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease: """ Parse a release name and return a ParsedRelease. @@ -48,11 +34,12 @@ def parse_release(name: str) -> ParsedRelease: if site_tag is not None: parse_path = ParsePath.SANITIZED.value - if not _is_well_formed(clean): + if not _is_well_formed(clean, kb): return ParsedRelease( raw=name, normalised=clean, title=clean, + title_sanitized=kb.sanitize_for_fs(clean), year=None, season=None, episode=None, @@ -68,21 +55,22 @@ def parse_release(name: str) -> ParsedRelease: ) name = clean - tokens = _tokenize(name) + tokens = _tokenize(name, kb) season, episode, episode_end = _extract_season_episode(tokens) - quality, source, codec, group, tech_tokens = _extract_tech(tokens) - languages, lang_tokens = _extract_languages(tokens) - audio_codec, audio_channels, audio_tokens = _extract_audio(tokens) - bit_depth, hdr_format, video_tokens = _extract_video_meta(tokens) - edition, edition_tokens = _extract_edition(tokens) + quality, source, codec, group, tech_tokens = _extract_tech(tokens, kb) + languages, lang_tokens = _extract_languages(tokens, kb) + audio_codec, audio_channels, audio_tokens = _extract_audio(tokens, kb) + bit_depth, hdr_format, video_tokens = _extract_video_meta(tokens, kb) + edition, edition_tokens = _extract_edition(tokens, kb) title = _extract_title( tokens, tech_tokens | lang_tokens | audio_tokens | video_tokens | edition_tokens, + kb, ) year = _extract_year(tokens, title) media_type = _infer_media_type( - season, quality, source, codec, year, edition, tokens + season, quality, source, codec, year, edition, tokens, kb ) tech_parts = [p for p in [quality, source, codec] if p] @@ -92,6 +80,7 @@ def parse_release(name: str) -> ParsedRelease: raw=name, normalised=name, title=title, + title_sanitized=kb.sanitize_for_fs(title), year=year, season=season, episode=episode, @@ -121,6 +110,7 @@ def _infer_media_type( year: int | None, edition: str | None, tokens: list[str], + kb: ReleaseKnowledge, ) -> str: """ Infer media_type from token-level evidence only (no filesystem access). @@ -134,9 +124,9 @@ def _infer_media_type( """ upper_tokens = {t.upper() for t in tokens} - doc_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("doc", [])} - concert_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("concert", [])} - integrale_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("integrale", [])} + doc_tokens = {t.upper() for t in kb.media_type_tokens.get("doc", [])} + concert_tokens = {t.upper() for t in kb.media_type_tokens.get("concert", [])} + integrale_tokens = {t.upper() for t in kb.media_type_tokens.get("integrale", [])} if upper_tokens & doc_tokens: return MediaTypeToken.DOCUMENTARY.value @@ -154,15 +144,15 @@ def _infer_media_type( return MediaTypeToken.UNKNOWN.value -def _is_well_formed(name: str) -> bool: +def _is_well_formed(name: str, kb: ReleaseKnowledge) -> bool: """Return True if name contains no forbidden characters per scene naming rules. Characters listed as token separators (spaces, brackets, parens, …) are NOT considered malforming — the tokenizer handles them. Only truly broken chars like '@', '#', '!', '%' make a name malformed. """ - tokenizable = set(load_separators()) - return not any(c in name for c in _FORBIDDEN_CHARS if c not in tokenizable) + tokenizable = set(kb.separators) + return not any(c in name for c in kb.forbidden_chars if c not in tokenizable) def _strip_site_tag(name: str) -> tuple[str, str | None]: @@ -251,6 +241,7 @@ def _extract_season_episode( def _extract_tech( tokens: list[str], + kb: ReleaseKnowledge, ) -> tuple[str | None, str | None, str | None, str, set[str]]: """ Extract quality, source, codec, group from tokens. @@ -270,12 +261,12 @@ def _extract_tech( for tok in tokens: tl = tok.lower() - if tl in _RESOLUTIONS: + if tl in kb.resolutions: quality = tok tech_tokens.add(tok) continue - if tl in _SOURCES: + if tl in kb.sources: source = tok tech_tokens.add(tok) continue @@ -283,18 +274,18 @@ def _extract_tech( if "-" in tok: parts = tok.rsplit("-", 1) # codec-GROUP (highest priority for group) - if parts[0].lower() in _CODECS: + if parts[0].lower() in kb.codecs: codec = parts[0] group = parts[1] if parts[1] else "UNKNOWN" tech_tokens.add(tok) continue # source with dash: Web-DL, WEB-DL, etc. - if parts[0].lower() in _SOURCES or tok.lower().replace("-", "") in _SOURCES: + if parts[0].lower() in kb.sources or tok.lower().replace("-", "") in kb.sources: source = tok tech_tokens.add(tok) continue - if tl in _CODECS: + if tl in kb.codecs: codec = tok tech_tokens.add(tok) @@ -304,7 +295,7 @@ def _extract_tech( if "-" in tok: parts = tok.rsplit("-", 1) tl = tok.lower() - if tl in _SOURCES or tok.lower().replace("-", "") in _SOURCES: + if tl in kb.sources or tok.lower().replace("-", "") in kb.sources: continue if parts[1]: group = parts[1] @@ -318,17 +309,20 @@ def _is_year_token(tok: str) -> bool: return len(tok) == 4 and tok.isdigit() and 1900 <= int(tok) <= 2099 -def _extract_title(tokens: list[str], tech_tokens: set[str]) -> str: +def _extract_title( + tokens: list[str], tech_tokens: set[str], kb: ReleaseKnowledge +) -> str: """Extract the title portion: everything before the first season/year/tech token.""" title_parts = [] + known_tech = kb.resolutions | kb.sources | kb.codecs for tok in tokens: if _parse_season_episode(tok) is not None: break if _is_year_token(tok): break - if tok in tech_tokens or tok.lower() in _RESOLUTIONS | _SOURCES | _CODECS: + if tok in tech_tokens or tok.lower() in known_tech: break - if "-" in tok and any(p.lower() in _CODECS | _SOURCES for p in tok.split("-")): + if "-" in tok and any(p.lower() in kb.codecs | kb.sources for p in tok.split("-")): break title_parts.append(tok) @@ -376,12 +370,14 @@ def _match_sequences( # --------------------------------------------------------------------------- -def _extract_languages(tokens: list[str]) -> tuple[list[str], set[str]]: +def _extract_languages( + tokens: list[str], kb: ReleaseKnowledge +) -> tuple[list[str], set[str]]: """Extract language tokens. Returns (languages, matched_token_set).""" languages = [] lang_tokens: set[str] = set() for tok in tokens: - if tok.upper() in _LANGUAGE_TOKENS: + if tok.upper() in kb.language_tokens: languages.append(tok.upper()) lang_tokens.add(tok) return languages, lang_tokens @@ -393,7 +389,7 @@ def _extract_languages(tokens: list[str]) -> tuple[list[str], set[str]]: def _extract_audio( - tokens: list[str], + tokens: list[str], kb: ReleaseKnowledge, ) -> tuple[str | None, str | None, set[str]]: """ Extract audio codec and channel layout. @@ -405,12 +401,12 @@ def _extract_audio( audio_channels: str | None = None audio_tokens: set[str] = set() - known_codecs = {c.upper() for c in _AUDIO.get("codecs", [])} - known_channels = set(_AUDIO.get("channels", [])) + known_codecs = {c.upper() for c in kb.audio.get("codecs", [])} + known_channels = set(kb.audio.get("channels", [])) # Try multi-token sequences first matched_codec, matched_set = _match_sequences( - tokens, _AUDIO.get("sequences", []), "codec" + tokens, kb.audio.get("sequences", []), "codec" ) if matched_codec: audio_codec = matched_codec @@ -446,7 +442,7 @@ def _extract_audio( def _extract_video_meta( - tokens: list[str], + tokens: list[str], kb: ReleaseKnowledge, ) -> tuple[str | None, str | None, set[str]]: """ Extract bit depth and HDR format. @@ -457,12 +453,12 @@ def _extract_video_meta( hdr_format: str | None = None video_tokens: set[str] = set() - known_hdr = {h.upper() for h in _VIDEO_META.get("hdr", [])} | _HDR_EXTRA - known_depth = {d.lower() for d in _VIDEO_META.get("bit_depth", [])} + known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra + known_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])} # Try HDR sequences first matched_hdr, matched_set = _match_sequences( - tokens, _VIDEO_META.get("sequences", []), "hdr" + tokens, kb.video_meta.get("sequences", []), "hdr" ) if matched_hdr: hdr_format = matched_hdr @@ -486,17 +482,19 @@ def _extract_video_meta( # --------------------------------------------------------------------------- -def _extract_edition(tokens: list[str]) -> tuple[str | None, set[str]]: +def _extract_edition( + tokens: list[str], kb: ReleaseKnowledge +) -> tuple[str | None, set[str]]: """ Extract release edition (UNRATED, EXTENDED, DIRECTORS.CUT, …). Returns (edition, matched_token_set). """ - known_tokens = {t.upper() for t in _EDITIONS.get("tokens", [])} + known_tokens = {t.upper() for t in kb.editions.get("tokens", [])} # Try multi-token sequences first matched_edition, matched_set = _match_sequences( - tokens, _EDITIONS.get("sequences", []), "edition" + tokens, kb.editions.get("sequences", []), "edition" ) if matched_edition: return matched_edition, matched_set diff --git a/alfred/domain/release/value_objects.py b/alfred/domain/release/value_objects.py index 124a693..87329aa 100644 --- a/alfred/domain/release/value_objects.py +++ b/alfred/domain/release/value_objects.py @@ -1,4 +1,17 @@ -"""Release domain — value objects and token sets.""" +"""Release domain — value objects. + +This module is **pure**: no I/O, no YAML loading, no knowledge-base +imports. All knowledge that the parser consumes is injected at runtime +via the ``ReleaseKnowledge`` port (see ``ports/knowledge.py``). + +``ParsedRelease`` follows Option B of the snapshot-VO design: filesystem +sanitization is performed once at parse time and stored in +``title_sanitized``. The builder methods (``show_folder_name``, +``episode_filename``, etc.) are therefore pure string-formatting and do +**not** need access to any knowledge base — but they require the caller +to pass already-sanitized TMDB strings. The use case is responsible for +calling ``kb.sanitize_for_fs(tmdb_title)`` before invoking the builders. +""" from __future__ import annotations @@ -6,50 +19,6 @@ from dataclasses import dataclass, field from enum import Enum from ..shared.exceptions import ValidationError -from alfred.infrastructure.knowledge.release import ( - load_audio, - load_codecs, - load_editions, - load_forbidden_chars, - load_hdr_extra, - load_language_tokens, - load_media_type_tokens, - load_metadata_extensions, - load_non_video_extensions, - load_resolutions, - load_sources, - load_sources_extra, - load_subtitle_extensions, - load_video, - load_video_extensions, - load_win_forbidden_chars, -) - -# Token sets — loaded once at import time from alfred/knowledge/release/ -_RESOLUTIONS: set[str] = load_resolutions() -_SOURCES: set[str] = load_sources() | load_sources_extra() -_CODECS: set[str] = load_codecs() -_VIDEO_EXTENSIONS: set[str] = load_video_extensions() -_NON_VIDEO_EXTENSIONS: set[str] = load_non_video_extensions() -_SUBTITLE_EXTENSIONS: set[str] = load_subtitle_extensions() -# Both metadata and subtitle extensions are ignored when deciding the media -# type of a folder — neither is a conclusive signal for movie/tv/other. -_METADATA_EXTENSIONS: set[str] = load_metadata_extensions() | _SUBTITLE_EXTENSIONS -_FORBIDDEN_CHARS: set[str] = load_forbidden_chars() -_LANGUAGE_TOKENS: set[str] = load_language_tokens() -_AUDIO: dict = load_audio() -_VIDEO_META: dict = load_video() -_EDITIONS: dict = load_editions() -_HDR_EXTRA: set[str] = load_hdr_extra() -_MEDIA_TYPE_TOKENS: dict = load_media_type_tokens() - -# Translation table for stripping Windows-forbidden characters -_WIN_FORBIDDEN_TABLE = str.maketrans("", "", "".join(load_win_forbidden_chars())) - - -def _sanitize_for_fs(text: str) -> str: - """Remove Windows-forbidden characters from a string.""" - return text.translate(_WIN_FORBIDDEN_TABLE) class MediaTypeToken(str, Enum): @@ -105,11 +74,17 @@ def _strip_episode_from_normalized(normalized: str) -> str: @dataclass class ParsedRelease: - """Structured representation of a parsed release name.""" + """Structured representation of a parsed release name. + + ``title_sanitized`` carries the filesystem-safe form of ``title`` (computed + by the parser at construction time using the injected knowledge base). + Builder methods rely on it being already-sanitized — see module docstring. + """ raw: str # original release name (untouched) normalised: str # dots instead of spaces title: str # show/movie title (dots, no year/season/tech) + title_sanitized: str # title with filesystem-forbidden chars stripped year: int | None # movie year or show start year (from TMDB) season: int | None # season number (None for movies) episode: int | None # first episode number (None if season-pack) @@ -180,14 +155,17 @@ class ParsedRelease: def is_season_pack(self) -> bool: return self.season is not None and self.episode is None - def show_folder_name(self, tmdb_title: str, tmdb_year: int) -> str: + def show_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str: """ Build the series root folder name. Format: {Title}.{Year}.{Tech}-{Group} Example: Oz.1997.1080p.WEBRip.x265-KONTRAST + + ``tmdb_title_safe`` must already be filesystem-safe (the caller is + expected to have run it through ``kb.sanitize_for_fs``). """ - title_part = _sanitize_for_fs(tmdb_title).replace(" ", ".") + title_part = tmdb_title_safe.replace(" ", ".") tech = self.tech_string or "Unknown" return f"{title_part}.{tmdb_year}.{tech}-{self.group}" @@ -201,42 +179,45 @@ class ParsedRelease: """ return _strip_episode_from_normalized(self.normalised) - def episode_filename(self, tmdb_episode_title: str | None, ext: str) -> str: + def episode_filename(self, tmdb_episode_title_safe: str | None, ext: str) -> str: """ Build the episode filename. Format: {Title}.{SxxExx}.{EpisodeTitle}.{Tech}-{Group}.{ext} Example: Oz.S01E01.The.Routine.1080p.WEBRip.x265-KONTRAST.mkv - If tmdb_episode_title is None, omits the episode title segment. + ``tmdb_episode_title_safe`` must already be filesystem-safe; pass + ``None`` to omit the episode title segment. """ - title_part = _sanitize_for_fs(self.title) + title_part = self.title_sanitized s = f"S{self.season:02d}" if self.season is not None else "" e = f"E{self.episode:02d}" if self.episode is not None else "" se = s + e ep_title = "" - if tmdb_episode_title: - ep_title = "." + _sanitize_for_fs(tmdb_episode_title).replace(" ", ".") + if tmdb_episode_title_safe: + ep_title = "." + tmdb_episode_title_safe.replace(" ", ".") tech = self.tech_string or "Unknown" ext_clean = ext.lstrip(".") return f"{title_part}.{se}{ep_title}.{tech}-{self.group}.{ext_clean}" - def movie_folder_name(self, tmdb_title: str, tmdb_year: int) -> str: + def movie_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str: """ Build the movie folder name. Format: {Title}.{Year}.{Tech}-{Group} Example: Inception.2010.1080p.BluRay.x265-GROUP """ - return self.show_folder_name(tmdb_title, tmdb_year) + return self.show_folder_name(tmdb_title_safe, tmdb_year) - def movie_filename(self, tmdb_title: str, tmdb_year: int, ext: str) -> str: + def movie_filename( + self, tmdb_title_safe: str, tmdb_year: int, ext: str + ) -> str: """ Build the movie filename (same as folder name + extension). Example: Inception.2010.1080p.BluRay.x265-GROUP.mkv """ ext_clean = ext.lstrip(".") - return f"{self.movie_folder_name(tmdb_title, tmdb_year)}.{ext_clean}" + return f"{self.movie_folder_name(tmdb_title_safe, tmdb_year)}.{ext_clean}"