refactor(release): purify domain — parse_release(name, kb) + ParsedRelease Option B

Removes the last domain → infrastructure leak in the release parser.

services.py:
- parse_release(name, kb) takes the knowledge as an explicit parameter.
- Every helper (_tokenize, _is_well_formed, _extract_tech,
  _extract_languages, _extract_audio, _extract_video_meta,
  _extract_edition, _extract_title, _infer_media_type) takes kb.
- No more module-level YAML loading.

value_objects.py — Option B:
- Sanitization happens once at parse time; ParsedRelease now carries
  a title_sanitized: str field alongside title.
- Builder methods (show_folder_name, episode_filename, movie_folder_name,
  movie_filename) become pure: they accept already-sanitized
  tmdb_title_safe / tmdb_episode_title_safe arguments. Callers at the
  use-case boundary sanitize via kb.sanitize_for_fs(...) before passing in.
- All domain-knowledge constants removed (_RESOLUTIONS, _SOURCES, _CODECS,
  _AUDIO, _VIDEO_META, _EDITIONS, _HDR_EXTRA, _MEDIA_TYPE_TOKENS,
  _LANGUAGE_TOKENS, _FORBIDDEN_CHARS, _*_EXTENSIONS, _WIN_FORBIDDEN_TABLE,
  _sanitize_for_fs). The module is now pure DDD.
This commit is contained in:
2026-05-19 22:05:10 +02:00
parent c3a3cb50c9
commit 4a74fff9cc
2 changed files with 91 additions and 112 deletions
+53 -55
View File
@@ -4,31 +4,17 @@ from __future__ import annotations
import re
from alfred.infrastructure.knowledge.release import load_separators
from .value_objects import (
_AUDIO,
_CODECS,
_EDITIONS,
_FORBIDDEN_CHARS,
_HDR_EXTRA,
_LANGUAGE_TOKENS,
_MEDIA_TYPE_TOKENS,
_RESOLUTIONS,
_SOURCES,
_VIDEO_META,
MediaTypeToken,
ParsedRelease,
ParsePath,
)
from .ports import ReleaseKnowledge
from .value_objects import MediaTypeToken, ParsedRelease, ParsePath
def _tokenize(name: str) -> list[str]:
def _tokenize(name: str, kb: ReleaseKnowledge) -> list[str]:
"""Split a release name on the configured separators, dropping empty tokens."""
pattern = "[" + re.escape("".join(load_separators())) + "]+"
pattern = "[" + re.escape("".join(kb.separators)) + "]+"
return [t for t in re.split(pattern, name) if t]
def parse_release(name: str) -> ParsedRelease:
def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
"""
Parse a release name and return a ParsedRelease.
@@ -48,11 +34,12 @@ def parse_release(name: str) -> ParsedRelease:
if site_tag is not None:
parse_path = ParsePath.SANITIZED.value
if not _is_well_formed(clean):
if not _is_well_formed(clean, kb):
return ParsedRelease(
raw=name,
normalised=clean,
title=clean,
title_sanitized=kb.sanitize_for_fs(clean),
year=None,
season=None,
episode=None,
@@ -68,21 +55,22 @@ def parse_release(name: str) -> ParsedRelease:
)
name = clean
tokens = _tokenize(name)
tokens = _tokenize(name, kb)
season, episode, episode_end = _extract_season_episode(tokens)
quality, source, codec, group, tech_tokens = _extract_tech(tokens)
languages, lang_tokens = _extract_languages(tokens)
audio_codec, audio_channels, audio_tokens = _extract_audio(tokens)
bit_depth, hdr_format, video_tokens = _extract_video_meta(tokens)
edition, edition_tokens = _extract_edition(tokens)
quality, source, codec, group, tech_tokens = _extract_tech(tokens, kb)
languages, lang_tokens = _extract_languages(tokens, kb)
audio_codec, audio_channels, audio_tokens = _extract_audio(tokens, kb)
bit_depth, hdr_format, video_tokens = _extract_video_meta(tokens, kb)
edition, edition_tokens = _extract_edition(tokens, kb)
title = _extract_title(
tokens,
tech_tokens | lang_tokens | audio_tokens | video_tokens | edition_tokens,
kb,
)
year = _extract_year(tokens, title)
media_type = _infer_media_type(
season, quality, source, codec, year, edition, tokens
season, quality, source, codec, year, edition, tokens, kb
)
tech_parts = [p for p in [quality, source, codec] if p]
@@ -92,6 +80,7 @@ def parse_release(name: str) -> ParsedRelease:
raw=name,
normalised=name,
title=title,
title_sanitized=kb.sanitize_for_fs(title),
year=year,
season=season,
episode=episode,
@@ -121,6 +110,7 @@ def _infer_media_type(
year: int | None,
edition: str | None,
tokens: list[str],
kb: ReleaseKnowledge,
) -> str:
"""
Infer media_type from token-level evidence only (no filesystem access).
@@ -134,9 +124,9 @@ def _infer_media_type(
"""
upper_tokens = {t.upper() for t in tokens}
doc_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("doc", [])}
concert_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("concert", [])}
integrale_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("integrale", [])}
doc_tokens = {t.upper() for t in kb.media_type_tokens.get("doc", [])}
concert_tokens = {t.upper() for t in kb.media_type_tokens.get("concert", [])}
integrale_tokens = {t.upper() for t in kb.media_type_tokens.get("integrale", [])}
if upper_tokens & doc_tokens:
return MediaTypeToken.DOCUMENTARY.value
@@ -154,15 +144,15 @@ def _infer_media_type(
return MediaTypeToken.UNKNOWN.value
def _is_well_formed(name: str) -> bool:
def _is_well_formed(name: str, kb: ReleaseKnowledge) -> bool:
"""Return True if name contains no forbidden characters per scene naming rules.
Characters listed as token separators (spaces, brackets, parens, …) are NOT
considered malforming — the tokenizer handles them. Only truly broken chars
like '@', '#', '!', '%' make a name malformed.
"""
tokenizable = set(load_separators())
return not any(c in name for c in _FORBIDDEN_CHARS if c not in tokenizable)
tokenizable = set(kb.separators)
return not any(c in name for c in kb.forbidden_chars if c not in tokenizable)
def _strip_site_tag(name: str) -> tuple[str, str | None]:
@@ -251,6 +241,7 @@ def _extract_season_episode(
def _extract_tech(
tokens: list[str],
kb: ReleaseKnowledge,
) -> tuple[str | None, str | None, str | None, str, set[str]]:
"""
Extract quality, source, codec, group from tokens.
@@ -270,12 +261,12 @@ def _extract_tech(
for tok in tokens:
tl = tok.lower()
if tl in _RESOLUTIONS:
if tl in kb.resolutions:
quality = tok
tech_tokens.add(tok)
continue
if tl in _SOURCES:
if tl in kb.sources:
source = tok
tech_tokens.add(tok)
continue
@@ -283,18 +274,18 @@ def _extract_tech(
if "-" in tok:
parts = tok.rsplit("-", 1)
# codec-GROUP (highest priority for group)
if parts[0].lower() in _CODECS:
if parts[0].lower() in kb.codecs:
codec = parts[0]
group = parts[1] if parts[1] else "UNKNOWN"
tech_tokens.add(tok)
continue
# source with dash: Web-DL, WEB-DL, etc.
if parts[0].lower() in _SOURCES or tok.lower().replace("-", "") in _SOURCES:
if parts[0].lower() in kb.sources or tok.lower().replace("-", "") in kb.sources:
source = tok
tech_tokens.add(tok)
continue
if tl in _CODECS:
if tl in kb.codecs:
codec = tok
tech_tokens.add(tok)
@@ -304,7 +295,7 @@ def _extract_tech(
if "-" in tok:
parts = tok.rsplit("-", 1)
tl = tok.lower()
if tl in _SOURCES or tok.lower().replace("-", "") in _SOURCES:
if tl in kb.sources or tok.lower().replace("-", "") in kb.sources:
continue
if parts[1]:
group = parts[1]
@@ -318,17 +309,20 @@ def _is_year_token(tok: str) -> bool:
return len(tok) == 4 and tok.isdigit() and 1900 <= int(tok) <= 2099
def _extract_title(tokens: list[str], tech_tokens: set[str]) -> str:
def _extract_title(
tokens: list[str], tech_tokens: set[str], kb: ReleaseKnowledge
) -> str:
"""Extract the title portion: everything before the first season/year/tech token."""
title_parts = []
known_tech = kb.resolutions | kb.sources | kb.codecs
for tok in tokens:
if _parse_season_episode(tok) is not None:
break
if _is_year_token(tok):
break
if tok in tech_tokens or tok.lower() in _RESOLUTIONS | _SOURCES | _CODECS:
if tok in tech_tokens or tok.lower() in known_tech:
break
if "-" in tok and any(p.lower() in _CODECS | _SOURCES for p in tok.split("-")):
if "-" in tok and any(p.lower() in kb.codecs | kb.sources for p in tok.split("-")):
break
title_parts.append(tok)
@@ -376,12 +370,14 @@ def _match_sequences(
# ---------------------------------------------------------------------------
def _extract_languages(tokens: list[str]) -> tuple[list[str], set[str]]:
def _extract_languages(
tokens: list[str], kb: ReleaseKnowledge
) -> tuple[list[str], set[str]]:
"""Extract language tokens. Returns (languages, matched_token_set)."""
languages = []
lang_tokens: set[str] = set()
for tok in tokens:
if tok.upper() in _LANGUAGE_TOKENS:
if tok.upper() in kb.language_tokens:
languages.append(tok.upper())
lang_tokens.add(tok)
return languages, lang_tokens
@@ -393,7 +389,7 @@ def _extract_languages(tokens: list[str]) -> tuple[list[str], set[str]]:
def _extract_audio(
tokens: list[str],
tokens: list[str], kb: ReleaseKnowledge,
) -> tuple[str | None, str | None, set[str]]:
"""
Extract audio codec and channel layout.
@@ -405,12 +401,12 @@ def _extract_audio(
audio_channels: str | None = None
audio_tokens: set[str] = set()
known_codecs = {c.upper() for c in _AUDIO.get("codecs", [])}
known_channels = set(_AUDIO.get("channels", []))
known_codecs = {c.upper() for c in kb.audio.get("codecs", [])}
known_channels = set(kb.audio.get("channels", []))
# Try multi-token sequences first
matched_codec, matched_set = _match_sequences(
tokens, _AUDIO.get("sequences", []), "codec"
tokens, kb.audio.get("sequences", []), "codec"
)
if matched_codec:
audio_codec = matched_codec
@@ -446,7 +442,7 @@ def _extract_audio(
def _extract_video_meta(
tokens: list[str],
tokens: list[str], kb: ReleaseKnowledge,
) -> tuple[str | None, str | None, set[str]]:
"""
Extract bit depth and HDR format.
@@ -457,12 +453,12 @@ def _extract_video_meta(
hdr_format: str | None = None
video_tokens: set[str] = set()
known_hdr = {h.upper() for h in _VIDEO_META.get("hdr", [])} | _HDR_EXTRA
known_depth = {d.lower() for d in _VIDEO_META.get("bit_depth", [])}
known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra
known_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])}
# Try HDR sequences first
matched_hdr, matched_set = _match_sequences(
tokens, _VIDEO_META.get("sequences", []), "hdr"
tokens, kb.video_meta.get("sequences", []), "hdr"
)
if matched_hdr:
hdr_format = matched_hdr
@@ -486,17 +482,19 @@ def _extract_video_meta(
# ---------------------------------------------------------------------------
def _extract_edition(tokens: list[str]) -> tuple[str | None, set[str]]:
def _extract_edition(
tokens: list[str], kb: ReleaseKnowledge
) -> tuple[str | None, set[str]]:
"""
Extract release edition (UNRATED, EXTENDED, DIRECTORS.CUT, …).
Returns (edition, matched_token_set).
"""
known_tokens = {t.upper() for t in _EDITIONS.get("tokens", [])}
known_tokens = {t.upper() for t in kb.editions.get("tokens", [])}
# Try multi-token sequences first
matched_edition, matched_set = _match_sequences(
tokens, _EDITIONS.get("sequences", []), "edition"
tokens, kb.editions.get("sequences", []), "edition"
)
if matched_edition:
return matched_edition, matched_set
+38 -57
View File
@@ -1,4 +1,17 @@
"""Release domain — value objects and token sets."""
"""Release domain — value objects.
This module is **pure**: no I/O, no YAML loading, no knowledge-base
imports. All knowledge that the parser consumes is injected at runtime
via the ``ReleaseKnowledge`` port (see ``ports/knowledge.py``).
``ParsedRelease`` follows Option B of the snapshot-VO design: filesystem
sanitization is performed once at parse time and stored in
``title_sanitized``. The builder methods (``show_folder_name``,
``episode_filename``, etc.) are therefore pure string-formatting and do
**not** need access to any knowledge base — but they require the caller
to pass already-sanitized TMDB strings. The use case is responsible for
calling ``kb.sanitize_for_fs(tmdb_title)`` before invoking the builders.
"""
from __future__ import annotations
@@ -6,50 +19,6 @@ from dataclasses import dataclass, field
from enum import Enum
from ..shared.exceptions import ValidationError
from alfred.infrastructure.knowledge.release import (
load_audio,
load_codecs,
load_editions,
load_forbidden_chars,
load_hdr_extra,
load_language_tokens,
load_media_type_tokens,
load_metadata_extensions,
load_non_video_extensions,
load_resolutions,
load_sources,
load_sources_extra,
load_subtitle_extensions,
load_video,
load_video_extensions,
load_win_forbidden_chars,
)
# Token sets — loaded once at import time from alfred/knowledge/release/
_RESOLUTIONS: set[str] = load_resolutions()
_SOURCES: set[str] = load_sources() | load_sources_extra()
_CODECS: set[str] = load_codecs()
_VIDEO_EXTENSIONS: set[str] = load_video_extensions()
_NON_VIDEO_EXTENSIONS: set[str] = load_non_video_extensions()
_SUBTITLE_EXTENSIONS: set[str] = load_subtitle_extensions()
# Both metadata and subtitle extensions are ignored when deciding the media
# type of a folder — neither is a conclusive signal for movie/tv/other.
_METADATA_EXTENSIONS: set[str] = load_metadata_extensions() | _SUBTITLE_EXTENSIONS
_FORBIDDEN_CHARS: set[str] = load_forbidden_chars()
_LANGUAGE_TOKENS: set[str] = load_language_tokens()
_AUDIO: dict = load_audio()
_VIDEO_META: dict = load_video()
_EDITIONS: dict = load_editions()
_HDR_EXTRA: set[str] = load_hdr_extra()
_MEDIA_TYPE_TOKENS: dict = load_media_type_tokens()
# Translation table for stripping Windows-forbidden characters
_WIN_FORBIDDEN_TABLE = str.maketrans("", "", "".join(load_win_forbidden_chars()))
def _sanitize_for_fs(text: str) -> str:
"""Remove Windows-forbidden characters from a string."""
return text.translate(_WIN_FORBIDDEN_TABLE)
class MediaTypeToken(str, Enum):
@@ -105,11 +74,17 @@ def _strip_episode_from_normalized(normalized: str) -> str:
@dataclass
class ParsedRelease:
"""Structured representation of a parsed release name."""
"""Structured representation of a parsed release name.
``title_sanitized`` carries the filesystem-safe form of ``title`` (computed
by the parser at construction time using the injected knowledge base).
Builder methods rely on it being already-sanitized — see module docstring.
"""
raw: str # original release name (untouched)
normalised: str # dots instead of spaces
title: str # show/movie title (dots, no year/season/tech)
title_sanitized: str # title with filesystem-forbidden chars stripped
year: int | None # movie year or show start year (from TMDB)
season: int | None # season number (None for movies)
episode: int | None # first episode number (None if season-pack)
@@ -180,14 +155,17 @@ class ParsedRelease:
def is_season_pack(self) -> bool:
return self.season is not None and self.episode is None
def show_folder_name(self, tmdb_title: str, tmdb_year: int) -> str:
def show_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str:
"""
Build the series root folder name.
Format: {Title}.{Year}.{Tech}-{Group}
Example: Oz.1997.1080p.WEBRip.x265-KONTRAST
``tmdb_title_safe`` must already be filesystem-safe (the caller is
expected to have run it through ``kb.sanitize_for_fs``).
"""
title_part = _sanitize_for_fs(tmdb_title).replace(" ", ".")
title_part = tmdb_title_safe.replace(" ", ".")
tech = self.tech_string or "Unknown"
return f"{title_part}.{tmdb_year}.{tech}-{self.group}"
@@ -201,42 +179,45 @@ class ParsedRelease:
"""
return _strip_episode_from_normalized(self.normalised)
def episode_filename(self, tmdb_episode_title: str | None, ext: str) -> str:
def episode_filename(self, tmdb_episode_title_safe: str | None, ext: str) -> str:
"""
Build the episode filename.
Format: {Title}.{SxxExx}.{EpisodeTitle}.{Tech}-{Group}.{ext}
Example: Oz.S01E01.The.Routine.1080p.WEBRip.x265-KONTRAST.mkv
If tmdb_episode_title is None, omits the episode title segment.
``tmdb_episode_title_safe`` must already be filesystem-safe; pass
``None`` to omit the episode title segment.
"""
title_part = _sanitize_for_fs(self.title)
title_part = self.title_sanitized
s = f"S{self.season:02d}" if self.season is not None else ""
e = f"E{self.episode:02d}" if self.episode is not None else ""
se = s + e
ep_title = ""
if tmdb_episode_title:
ep_title = "." + _sanitize_for_fs(tmdb_episode_title).replace(" ", ".")
if tmdb_episode_title_safe:
ep_title = "." + tmdb_episode_title_safe.replace(" ", ".")
tech = self.tech_string or "Unknown"
ext_clean = ext.lstrip(".")
return f"{title_part}.{se}{ep_title}.{tech}-{self.group}.{ext_clean}"
def movie_folder_name(self, tmdb_title: str, tmdb_year: int) -> str:
def movie_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str:
"""
Build the movie folder name.
Format: {Title}.{Year}.{Tech}-{Group}
Example: Inception.2010.1080p.BluRay.x265-GROUP
"""
return self.show_folder_name(tmdb_title, tmdb_year)
return self.show_folder_name(tmdb_title_safe, tmdb_year)
def movie_filename(self, tmdb_title: str, tmdb_year: int, ext: str) -> str:
def movie_filename(
self, tmdb_title_safe: str, tmdb_year: int, ext: str
) -> str:
"""
Build the movie filename (same as folder name + extension).
Example: Inception.2010.1080p.BluRay.x265-GROUP.mkv
"""
ext_clean = ext.lstrip(".")
return f"{self.movie_folder_name(tmdb_title, tmdb_year)}.{ext_clean}"
return f"{self.movie_folder_name(tmdb_title_safe, tmdb_year)}.{ext_clean}"