Merge branch 'feat/release-parser-scoring'
This commit is contained in:
@@ -17,6 +17,42 @@ callers).
|
||||
|
||||
---
|
||||
|
||||
## [2026-05-20] — Release parser confidence scoring + exclusion
|
||||
|
||||
### Added
|
||||
|
||||
- **Pre-pipeline exclusion helpers** (`alfred/application/release/supported_media.py`):
|
||||
`is_supported_video(path, kb)` (extension-only check against
|
||||
`kb.video_extensions`) and `find_main_video(folder, kb)` (top-level
|
||||
scan, lexicographically-first eligible file, returns `None` when no
|
||||
video qualifies; accepts a bare file as folder for single-file
|
||||
releases). No size threshold, no filename heuristics —
|
||||
PATH_OF_PAIN handles the exotic cases. Foundation for the future
|
||||
`inspect_release` orchestrator.
|
||||
|
||||
- **Release parser — parse-confidence scoring** (`alfred/domain/release/parser/scoring.py`,
|
||||
`alfred/knowledge/release/scoring.yaml`). `parse_release` now returns
|
||||
`(ParsedRelease, ParseReport)`. The new `ParseReport` frozen VO
|
||||
carries a 0–100 `confidence`, a `road` (`"easy"` / `"shitty"` /
|
||||
`"path_of_pain"`), the residual UNKNOWN tokens, and the missing
|
||||
critical fields. EASY is decided structurally (a group schema
|
||||
matched); SHITTY vs PATH_OF_PAIN is decided by score against a
|
||||
YAML-configurable cutoff (default 60). Weights and penalties also
|
||||
live in `scoring.yaml` — title 30, media_type 20, year 15, season
|
||||
10, episode 5, tech 5 each; penalty 5 per UNKNOWN token capped at
|
||||
-30. `Road` is a new enum, distinct from `ParsePath` (which records
|
||||
the tokenization route, not the confidence tier). `ReleaseKnowledge`
|
||||
port gains a `scoring: dict` field.
|
||||
|
||||
### Changed
|
||||
|
||||
- **`parse_release` signature** is now `(name, kb) → tuple[ParsedRelease,
|
||||
ParseReport]` instead of returning a bare `ParsedRelease`. Call
|
||||
sites updated in `application/filesystem/resolve_destination.py` and
|
||||
`agent/tools/filesystem.py`. Tests updated accordingly.
|
||||
|
||||
---
|
||||
|
||||
## [2026-05-20] — Release parser v2 (EASY + SHITTY)
|
||||
|
||||
### Added
|
||||
|
||||
@@ -194,7 +194,7 @@ def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
|
||||
from alfred.domain.release.services import parse_release # noqa: PLC0415
|
||||
|
||||
path = Path(source_path)
|
||||
parsed = parse_release(release_name, _KB)
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
parsed.media_type = detect_media_type(parsed, path, _KB)
|
||||
|
||||
probe_used = False
|
||||
|
||||
@@ -252,7 +252,7 @@ def resolve_season_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed = parse_release(release_name, _KB)
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
|
||||
|
||||
@@ -302,7 +302,7 @@ def resolve_episode_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed = parse_release(release_name, _KB)
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
ext = Path(source_file).suffix
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
tmdb_episode_title_safe = (
|
||||
@@ -360,7 +360,7 @@ def resolve_movie_destination(
|
||||
message="Movie library path is not configured.",
|
||||
)
|
||||
|
||||
parsed = parse_release(release_name, _KB)
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
ext = Path(source_file).suffix
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
|
||||
@@ -399,7 +399,7 @@ def resolve_series_destination(
|
||||
message="TV show library path is not configured.",
|
||||
)
|
||||
|
||||
parsed = parse_release(release_name, _KB)
|
||||
parsed, _ = parse_release(release_name, _KB)
|
||||
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
|
||||
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
|
||||
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
"""Release application layer — orchestrators sitting between domain
|
||||
parsing and infrastructure I/O.
|
||||
|
||||
Today it exposes the pre-pipeline exclusion helpers
|
||||
(:mod:`supported_media`). Phase C will add the ``inspect_release``
|
||||
orchestrator here.
|
||||
"""
|
||||
|
||||
from .supported_media import find_main_video, is_supported_video
|
||||
|
||||
__all__ = ["find_main_video", "is_supported_video"]
|
||||
@@ -0,0 +1,74 @@
|
||||
"""Pre-pipeline exclusion — decide which files are worth parsing.
|
||||
|
||||
These helpers live one notch above the domain: they touch the
|
||||
filesystem (``Path.iterdir``, ``Path.suffix``) but carry no parsing
|
||||
logic of their own. The goal is to filter out non-video files and pick
|
||||
the canonical "main video" from a release folder *before* anything
|
||||
hits :func:`~alfred.domain.release.parse_release`.
|
||||
|
||||
Design notes (Phase A bis, 2026-05-20):
|
||||
|
||||
- **Extension is the sole eligibility criterion.** A file is supported
|
||||
iff its suffix is in ``kb.video_extensions``. No size threshold, no
|
||||
filename heuristics ("sample", "trailer", …). If a release packs a
|
||||
bloated featurette or names its sample alphabetically before the
|
||||
main feature, that's PATH_OF_PAIN territory — not this layer's job.
|
||||
|
||||
- **Top-level scan only.** ``find_main_video`` does not descend into
|
||||
subdirectories. Releases that wrap the main video in ``Sample/`` or
|
||||
similar are non-scene-standard and handled by the orchestrator
|
||||
upstream.
|
||||
|
||||
- **Lexicographic tie-break.** When several candidates qualify
|
||||
(legitimate for season packs), we return the first by alphabetical
|
||||
order. Deterministic, no size-based ranking.
|
||||
|
||||
- **Direct ``Path`` I/O.** No ``FilesystemScanner`` port — this layer
|
||||
is application, not domain. If isolation becomes necessary for
|
||||
testing scale, we'll introduce a port then.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.release.ports.knowledge import ReleaseKnowledge
|
||||
|
||||
|
||||
def is_supported_video(path: Path, kb: ReleaseKnowledge) -> bool:
|
||||
"""Return True when ``path`` is a video file the parser should
|
||||
consider.
|
||||
|
||||
The check is purely extension-based: ``path.suffix.lower()`` must
|
||||
belong to ``kb.video_extensions``. ``path`` must also be a regular
|
||||
file — directories and broken symlinks return False.
|
||||
"""
|
||||
if not path.is_file():
|
||||
return False
|
||||
return path.suffix.lower() in kb.video_extensions
|
||||
|
||||
|
||||
def find_main_video(folder: Path, kb: ReleaseKnowledge) -> Path | None:
|
||||
"""Return the canonical main video file inside ``folder``, or
|
||||
``None`` if there isn't one.
|
||||
|
||||
Behavior:
|
||||
|
||||
- Top-level scan only — subdirectories are ignored.
|
||||
- Eligibility is :func:`is_supported_video`.
|
||||
- When several files qualify, the lexicographically first one wins.
|
||||
- When ``folder`` itself is a video file, it is returned as-is
|
||||
(single-file releases are valid).
|
||||
- When ``folder`` doesn't exist or isn't a directory (and isn't a
|
||||
video file either), returns ``None``.
|
||||
"""
|
||||
if folder.is_file():
|
||||
return folder if is_supported_video(folder, kb) else None
|
||||
|
||||
if not folder.is_dir():
|
||||
return None
|
||||
|
||||
candidates = sorted(
|
||||
child for child in folder.iterdir() if is_supported_video(child, kb)
|
||||
)
|
||||
return candidates[0] if candidates else None
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Release domain — release name parsing and naming conventions."""
|
||||
|
||||
from .services import parse_release
|
||||
from .value_objects import ParsedRelease
|
||||
from .value_objects import ParsedRelease, ParseReport
|
||||
|
||||
__all__ = ["ParsedRelease", "parse_release"]
|
||||
__all__ = ["ParsedRelease", "ParseReport", "parse_release"]
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
"""Parse-confidence scoring.
|
||||
|
||||
``parse_release`` returns a :class:`ParseReport` alongside its
|
||||
:class:`ParsedRelease`. The report carries:
|
||||
|
||||
- ``confidence``: integer 0–100 derived from which structural and
|
||||
technical fields got populated, minus a penalty per UNKNOWN token
|
||||
left in the annotated stream.
|
||||
- ``road``: which of the three roads the parse took
|
||||
(:class:`Road.EASY` / :class:`Road.SHITTY` / :class:`Road.PATH_OF_PAIN`).
|
||||
- ``unknown_tokens``: textual residue, useful for diagnostics.
|
||||
- ``missing_critical``: structural fields the score-tally found absent
|
||||
(e.g. ``("year", "media_type")``) — the caller can use this to drive
|
||||
PoP recovery (questions, LLM call).
|
||||
|
||||
All weights, penalties and thresholds come from the injected knowledge
|
||||
base (``kb.scoring``), itself loaded from
|
||||
``alfred/knowledge/release/scoring.yaml``. No magic numbers here.
|
||||
|
||||
The scoring functions are pure — they consume the annotated token list
|
||||
and the resulting :class:`ParsedRelease` and return the report. They are
|
||||
called by ``services.parse_release`` after ``assemble`` has run.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from ..ports.knowledge import ReleaseKnowledge
|
||||
from ..value_objects import ParsedRelease
|
||||
from .tokens import Token, TokenRole
|
||||
|
||||
|
||||
class Road(str, Enum):
|
||||
"""How the parser handled a given release name.
|
||||
|
||||
Distinct from :class:`~alfred.domain.release.value_objects.ParsePath`,
|
||||
which records the tokenization route (DIRECT / SANITIZED / AI). Road
|
||||
is about confidence in the *result*, not the *method*.
|
||||
"""
|
||||
|
||||
EASY = "easy" # group schema matched — structural annotation
|
||||
SHITTY = "shitty" # no schema, dict-driven annotation, score ≥ threshold
|
||||
PATH_OF_PAIN = "path_of_pain" # score below threshold, needs help
|
||||
|
||||
|
||||
# Critical structural fields — their absence drives the
|
||||
# ``missing_critical`` list in the report.
|
||||
_CRITICAL_FIELDS: tuple[str, ...] = ("title", "media_type", "year")
|
||||
|
||||
|
||||
def _is_tv_shaped(parsed: ParsedRelease) -> bool:
|
||||
"""Season/episode weights only count for releases that *look* like TV."""
|
||||
return parsed.season is not None
|
||||
|
||||
|
||||
def compute_score(
|
||||
parsed: ParsedRelease,
|
||||
annotated: list[Token],
|
||||
kb: ReleaseKnowledge,
|
||||
) -> int:
|
||||
"""Compute a 0–100 confidence score for the parse.
|
||||
|
||||
Each populated field contributes its weight from
|
||||
``kb.scoring["weights"]``. Season/episode only count when the parse
|
||||
looks like TV. ``group == "UNKNOWN"`` is treated as absent.
|
||||
|
||||
Then a penalty is subtracted per residual UNKNOWN token in
|
||||
``annotated``, capped at ``penalties["max_unknown_penalty"]``.
|
||||
|
||||
Result is clamped to ``[0, 100]``.
|
||||
"""
|
||||
weights = kb.scoring["weights"]
|
||||
penalties = kb.scoring["penalties"]
|
||||
|
||||
score = 0
|
||||
if parsed.title:
|
||||
score += weights.get("title", 0)
|
||||
if parsed.media_type and parsed.media_type.value != "unknown":
|
||||
score += weights.get("media_type", 0)
|
||||
if parsed.year is not None:
|
||||
score += weights.get("year", 0)
|
||||
if _is_tv_shaped(parsed):
|
||||
if parsed.season is not None:
|
||||
score += weights.get("season", 0)
|
||||
if parsed.episode is not None:
|
||||
score += weights.get("episode", 0)
|
||||
if parsed.quality:
|
||||
score += weights.get("resolution", 0)
|
||||
if parsed.source:
|
||||
score += weights.get("source", 0)
|
||||
if parsed.codec:
|
||||
score += weights.get("codec", 0)
|
||||
if parsed.group and parsed.group != "UNKNOWN":
|
||||
score += weights.get("group", 0)
|
||||
|
||||
unknown_count = sum(1 for t in annotated if t.role is TokenRole.UNKNOWN)
|
||||
raw_penalty = unknown_count * penalties.get("unknown_token", 0)
|
||||
capped_penalty = min(raw_penalty, penalties.get("max_unknown_penalty", 0))
|
||||
score -= capped_penalty
|
||||
|
||||
return max(0, min(100, score))
|
||||
|
||||
|
||||
def collect_unknown_tokens(annotated: list[Token]) -> tuple[str, ...]:
|
||||
"""Return the text of every token still tagged UNKNOWN."""
|
||||
return tuple(t.text for t in annotated if t.role is TokenRole.UNKNOWN)
|
||||
|
||||
|
||||
def collect_missing_critical(parsed: ParsedRelease) -> tuple[str, ...]:
|
||||
"""Return the names of critical structural fields that are absent."""
|
||||
missing: list[str] = []
|
||||
if not parsed.title:
|
||||
missing.append("title")
|
||||
if not parsed.media_type or parsed.media_type.value == "unknown":
|
||||
missing.append("media_type")
|
||||
if parsed.year is None:
|
||||
missing.append("year")
|
||||
return tuple(missing)
|
||||
|
||||
|
||||
def decide_road(
|
||||
score: int,
|
||||
has_schema: bool,
|
||||
kb: ReleaseKnowledge,
|
||||
) -> Road:
|
||||
"""Pick the road the parse took.
|
||||
|
||||
EASY is decided structurally: if a known group schema matched, the
|
||||
annotation walked the schema, and that's enough — the score does not
|
||||
veto EASY. Otherwise the score decides between SHITTY and
|
||||
PATH_OF_PAIN using ``kb.scoring["thresholds"]["shitty_min"]``.
|
||||
"""
|
||||
if has_schema:
|
||||
return Road.EASY
|
||||
threshold = kb.scoring["thresholds"].get("shitty_min", 60)
|
||||
if score >= threshold:
|
||||
return Road.SHITTY
|
||||
return Road.PATH_OF_PAIN
|
||||
@@ -40,6 +40,18 @@ class ReleaseKnowledge(Protocol):
|
||||
|
||||
separators: list[str]
|
||||
|
||||
# --- Parse scoring (Phase A) ---
|
||||
#
|
||||
# ``scoring`` is a dict with three keys:
|
||||
# - ``weights``: dict[field_name, int] field weight contribution
|
||||
# - ``penalties``: {"unknown_token": int, "max_unknown_penalty": int}
|
||||
# - ``thresholds``: {"shitty_min": int} SHITTY vs PATH_OF_PAIN cutoff
|
||||
#
|
||||
# Concrete values come from ``alfred/knowledge/release/scoring.yaml``.
|
||||
# The loader fills in safe defaults so this dict is always populated.
|
||||
|
||||
scoring: dict
|
||||
|
||||
# --- File-extension sets (used by application/infra modules that work
|
||||
# directly with filesystem paths, e.g. media-type detection, video
|
||||
# lookup). Domain parsing itself doesn't touch these. ---
|
||||
|
||||
@@ -8,21 +8,30 @@ Thin orchestrator over the annotate-based pipeline in
|
||||
the LLM can clean them up.
|
||||
* Otherwise call the v2 pipeline (tokenize → annotate → assemble) and
|
||||
wrap the result in :class:`ParsedRelease`.
|
||||
* Score the result and decide the road (EASY / SHITTY / PATH_OF_PAIN)
|
||||
via :mod:`alfred.domain.release.parser.scoring`.
|
||||
|
||||
All structural and enricher logic now lives in the pipeline. This file
|
||||
no longer carries field extractors — the heuristic SHITTY path is part
|
||||
of :func:`~alfred.domain.release.parser.pipeline.annotate`.
|
||||
The public entry point is :func:`parse_release`, which returns
|
||||
``(ParsedRelease, ParseReport)``. The report carries the confidence
|
||||
score, the road, and diagnostic info for downstream callers.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from .parser import pipeline as _v2
|
||||
from .parser import scoring as _scoring
|
||||
from .ports import ReleaseKnowledge
|
||||
from .value_objects import MediaTypeToken, ParsedRelease, ParsePath
|
||||
from .value_objects import MediaTypeToken, ParsedRelease, ParsePath, ParseReport
|
||||
|
||||
|
||||
def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
|
||||
"""Parse a release name and return a :class:`ParsedRelease`.
|
||||
def parse_release(
|
||||
name: str, kb: ReleaseKnowledge
|
||||
) -> tuple[ParsedRelease, ParseReport]:
|
||||
"""Parse a release name.
|
||||
|
||||
Returns a tuple ``(ParsedRelease, ParseReport)``. The structural VO
|
||||
is unchanged from the previous single-return contract; the report
|
||||
is new and carries the confidence score + road decision.
|
||||
|
||||
Flow:
|
||||
|
||||
@@ -30,10 +39,10 @@ def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
|
||||
``parse_path="sanitized"``).
|
||||
2. If the remainder still contains truly forbidden chars (anything
|
||||
not in the configured separators), short-circuit to
|
||||
``media_type="unknown"`` / ``parse_path="ai"`` — the LLM handles
|
||||
these.
|
||||
``media_type="unknown"`` / ``parse_path="ai"`` and emit a
|
||||
PATH_OF_PAIN report — the LLM handles these.
|
||||
3. Otherwise run the v2 pipeline: tokenize → annotate (EASY when a
|
||||
group schema is known, SHITTY otherwise) → assemble.
|
||||
group schema is known, SHITTY otherwise) → assemble → score.
|
||||
"""
|
||||
parse_path = ParsePath.DIRECT.value
|
||||
|
||||
@@ -42,7 +51,7 @@ def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
|
||||
parse_path = ParsePath.SANITIZED.value
|
||||
|
||||
if not _is_well_formed(clean, kb):
|
||||
return ParsedRelease(
|
||||
parsed = ParsedRelease(
|
||||
raw=name,
|
||||
normalised=clean,
|
||||
title=clean,
|
||||
@@ -60,18 +69,36 @@ def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
|
||||
site_tag=site_tag,
|
||||
parse_path=ParsePath.AI.value,
|
||||
)
|
||||
report = ParseReport(
|
||||
confidence=0,
|
||||
road=_scoring.Road.PATH_OF_PAIN.value,
|
||||
unknown_tokens=(clean,),
|
||||
missing_critical=("title", "media_type", "year"),
|
||||
)
|
||||
return parsed, report
|
||||
|
||||
tokens, v2_tag = _v2.tokenize(name, kb)
|
||||
annotated = _v2.annotate(tokens, kb)
|
||||
fields = _v2.assemble(annotated, v2_tag, name, kb)
|
||||
|
||||
return ParsedRelease(
|
||||
parsed = ParsedRelease(
|
||||
raw=name,
|
||||
normalised=clean,
|
||||
parse_path=parse_path,
|
||||
**fields,
|
||||
)
|
||||
|
||||
has_schema = _v2.has_known_schema(tokens, kb)
|
||||
score = _scoring.compute_score(parsed, annotated, kb)
|
||||
road = _scoring.decide_road(score, has_schema, kb)
|
||||
report = ParseReport(
|
||||
confidence=score,
|
||||
road=road.value,
|
||||
unknown_tokens=_scoring.collect_unknown_tokens(annotated),
|
||||
missing_critical=_scoring.collect_missing_critical(parsed),
|
||||
)
|
||||
return parsed, report
|
||||
|
||||
|
||||
def _is_well_formed(name: str, kb: ReleaseKnowledge) -> bool:
|
||||
"""Return True if ``name`` contains no forbidden characters per scene
|
||||
|
||||
@@ -72,6 +72,40 @@ def _strip_episode_from_normalized(normalized: str) -> str:
|
||||
return ".".join(result)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ParseReport:
|
||||
"""Diagnostic report attached to a :class:`ParsedRelease`.
|
||||
|
||||
``parse_release`` returns ``(ParsedRelease, ParseReport)``. The
|
||||
report describes *how confident* the parser is in the result and
|
||||
*which road* produced it. It is intentionally separate from
|
||||
``ParsedRelease`` so the structural VO stays free of meta-concerns
|
||||
about its own quality.
|
||||
|
||||
Fields:
|
||||
|
||||
- ``confidence``: integer 0–100 (see :func:`parser.scoring.compute_score`).
|
||||
- ``road``: ``"easy"`` / ``"shitty"`` / ``"path_of_pain"`` — distinct
|
||||
from ``ParsedRelease.parse_path`` (which describes the
|
||||
tokenization route, not the confidence tier).
|
||||
- ``unknown_tokens``: tokens that finished annotation with role
|
||||
UNKNOWN, in order of appearance.
|
||||
- ``missing_critical``: names of critical structural fields the
|
||||
parser couldn't fill (subset of ``{"title", "media_type", "year"}``).
|
||||
"""
|
||||
|
||||
confidence: int
|
||||
road: str # one of parser.scoring.Road values
|
||||
unknown_tokens: tuple[str, ...] = ()
|
||||
missing_critical: tuple[str, ...] = ()
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not (0 <= self.confidence <= 100):
|
||||
raise ValidationError(
|
||||
f"ParseReport.confidence out of range: {self.confidence}"
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ParsedRelease:
|
||||
"""Structured representation of a parsed release name.
|
||||
|
||||
@@ -160,6 +160,37 @@ def load_group_schemas() -> dict:
|
||||
return result
|
||||
|
||||
|
||||
def load_scoring() -> dict:
|
||||
"""Load the parse-scoring config.
|
||||
|
||||
Returns a dict with three top-level keys: ``weights``, ``penalties``,
|
||||
``thresholds``. Defaults are baked in so a missing or partial YAML
|
||||
never breaks the parser — only de-tunes it.
|
||||
"""
|
||||
raw = _load("scoring.yaml")
|
||||
weights = {
|
||||
"title": 30,
|
||||
"media_type": 20,
|
||||
"year": 15,
|
||||
"season": 10,
|
||||
"episode": 5,
|
||||
"resolution": 5,
|
||||
"source": 5,
|
||||
"codec": 5,
|
||||
"group": 5,
|
||||
}
|
||||
weights.update(raw.get("weights", {}) or {})
|
||||
penalties = {"unknown_token": 5, "max_unknown_penalty": 30}
|
||||
penalties.update(raw.get("penalties", {}) or {})
|
||||
thresholds = {"shitty_min": 60}
|
||||
thresholds.update(raw.get("thresholds", {}) or {})
|
||||
return {
|
||||
"weights": weights,
|
||||
"penalties": penalties,
|
||||
"thresholds": thresholds,
|
||||
}
|
||||
|
||||
|
||||
def load_separators() -> list[str]:
|
||||
"""Single-char token separators used by the release name tokenizer.
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ from .release import (
|
||||
load_metadata_extensions,
|
||||
load_non_video_extensions,
|
||||
load_resolutions,
|
||||
load_scoring,
|
||||
load_separators,
|
||||
load_sources,
|
||||
load_sources_extra,
|
||||
@@ -85,6 +86,9 @@ class YamlReleaseKnowledge:
|
||||
|
||||
self.separators: list[str] = load_separators()
|
||||
|
||||
# Parse-scoring config (weights / penalties / thresholds).
|
||||
self.scoring: dict = load_scoring()
|
||||
|
||||
# File-extension sets (used by application/infra modules, not by
|
||||
# the parser itself — kept here so there is a single ownership
|
||||
# point for release knowledge).
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
# Release parse scoring.
|
||||
#
|
||||
# `parse_release` returns a `ParseReport` alongside the `ParsedRelease`.
|
||||
# The report carries a 0-100 confidence score computed from the annotated
|
||||
# tokens, plus the road decision (EASY / SHITTY / PATH_OF_PAIN).
|
||||
#
|
||||
# Why YAML: the weights and the SHITTY/PoP cutoff are tuning knobs we
|
||||
# expect to iterate on as fixtures grow. Keeping them in code would
|
||||
# mean a commit per tweak; here the user can adjust without touching
|
||||
# Python.
|
||||
#
|
||||
# Weights are awarded when the corresponding ParsedRelease field is
|
||||
# populated (non-None, non-"UNKNOWN" for group). Season and episode
|
||||
# only contribute when the parse looks like TV (season is not None).
|
||||
|
||||
weights:
|
||||
title: 30 # structural pivot — without it nothing else matters
|
||||
media_type: 20 # movie / tv_show / tv_complete / …
|
||||
year: 15
|
||||
season: 10 # only counted for TV-shaped releases
|
||||
episode: 5
|
||||
resolution: 5
|
||||
source: 5
|
||||
codec: 5
|
||||
group: 5 # "UNKNOWN" yields 0
|
||||
|
||||
# Penalty applied per UNKNOWN token left in the annotated stream.
|
||||
# Capped at `max_unknown_penalty` to keep a long-tail of garbage from
|
||||
# pushing every release into PoP.
|
||||
penalties:
|
||||
unknown_token: 5
|
||||
max_unknown_penalty: 30
|
||||
|
||||
# Decision thresholds.
|
||||
#
|
||||
# EASY is decided structurally (a known group schema matched) — it does
|
||||
# not look at the score. SHITTY vs PATH_OF_PAIN is decided here:
|
||||
#
|
||||
# score >= shitty_min → SHITTY (best-effort parse usable)
|
||||
# score < shitty_min → PATH_OF_PAIN (needs user / LLM help)
|
||||
thresholds:
|
||||
shitty_min: 60
|
||||
@@ -28,11 +28,14 @@ _KB = YamlReleaseKnowledge()
|
||||
def _parsed(media_type: str = "movie"):
|
||||
"""Build a ParsedRelease with the requested media_type via the real parser."""
|
||||
if media_type == "tv_show":
|
||||
return parse_release("Show.S01E01.1080p-GRP", _KB)
|
||||
parsed, _ = parse_release("Show.S01E01.1080p-GRP", _KB)
|
||||
return parsed
|
||||
if media_type == "movie":
|
||||
return parse_release("Movie.2020.1080p-GRP", _KB)
|
||||
parsed, _ = parse_release("Movie.2020.1080p-GRP", _KB)
|
||||
return parsed
|
||||
# "unknown" / other — feed a name the parser can't classify
|
||||
return parse_release("randomthing", _KB)
|
||||
parsed, _ = parse_release("randomthing", _KB)
|
||||
return parsed
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
"""Tests for the pre-pipeline exclusion helpers (Phase A bis)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from alfred.application.release.supported_media import (
|
||||
find_main_video,
|
||||
is_supported_video,
|
||||
)
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
|
||||
_KB = YamlReleaseKnowledge()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# is_supported_video #
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestIsSupportedVideo:
|
||||
def test_mkv_is_supported(self, tmp_path: Path) -> None:
|
||||
f = tmp_path / "movie.mkv"
|
||||
f.touch()
|
||||
assert is_supported_video(f, _KB) is True
|
||||
|
||||
def test_mp4_is_supported(self, tmp_path: Path) -> None:
|
||||
f = tmp_path / "movie.mp4"
|
||||
f.touch()
|
||||
assert is_supported_video(f, _KB) is True
|
||||
|
||||
def test_uppercase_extension_is_supported(self, tmp_path: Path) -> None:
|
||||
# File systems can return mixed case; we lowercase the suffix.
|
||||
f = tmp_path / "movie.MKV"
|
||||
f.touch()
|
||||
assert is_supported_video(f, _KB) is True
|
||||
|
||||
def test_srt_is_not_video(self, tmp_path: Path) -> None:
|
||||
f = tmp_path / "movie.srt"
|
||||
f.touch()
|
||||
assert is_supported_video(f, _KB) is False
|
||||
|
||||
def test_nfo_is_not_video(self, tmp_path: Path) -> None:
|
||||
f = tmp_path / "movie.nfo"
|
||||
f.touch()
|
||||
assert is_supported_video(f, _KB) is False
|
||||
|
||||
def test_no_extension_is_not_video(self, tmp_path: Path) -> None:
|
||||
f = tmp_path / "README"
|
||||
f.touch()
|
||||
assert is_supported_video(f, _KB) is False
|
||||
|
||||
def test_directory_is_not_video(self, tmp_path: Path) -> None:
|
||||
d = tmp_path / "subdir.mkv" # even with a video extension
|
||||
d.mkdir()
|
||||
assert is_supported_video(d, _KB) is False
|
||||
|
||||
def test_nonexistent_path_is_not_video(self, tmp_path: Path) -> None:
|
||||
assert is_supported_video(tmp_path / "ghost.mkv", _KB) is False
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# find_main_video #
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestFindMainVideo:
|
||||
def test_single_video_file_in_folder(self, tmp_path: Path) -> None:
|
||||
main = tmp_path / "Movie.2020.mkv"
|
||||
main.touch()
|
||||
assert find_main_video(tmp_path, _KB) == main
|
||||
|
||||
def test_returns_lexicographically_first_among_multiple(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
# Legitimate for season packs: pick the first episode by name.
|
||||
ep2 = tmp_path / "Show.S01E02.mkv"
|
||||
ep1 = tmp_path / "Show.S01E01.mkv"
|
||||
ep2.touch()
|
||||
ep1.touch()
|
||||
assert find_main_video(tmp_path, _KB) == ep1
|
||||
|
||||
def test_skips_non_video_files(self, tmp_path: Path) -> None:
|
||||
# nfo and srt come alphabetically before .mkv, must not win.
|
||||
(tmp_path / "Movie.nfo").touch()
|
||||
(tmp_path / "Movie.srt").touch()
|
||||
vid = tmp_path / "Movie.mkv"
|
||||
vid.touch()
|
||||
assert find_main_video(tmp_path, _KB) == vid
|
||||
|
||||
def test_ignores_subdirectories(self, tmp_path: Path) -> None:
|
||||
# A Sample/ subdir must NOT be descended into.
|
||||
sample_dir = tmp_path / "Sample"
|
||||
sample_dir.mkdir()
|
||||
(sample_dir / "sample.mkv").touch()
|
||||
main = tmp_path / "Movie.mkv"
|
||||
main.touch()
|
||||
assert find_main_video(tmp_path, _KB) == main
|
||||
|
||||
def test_only_subdirectory_with_video_returns_none(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
# No top-level video, only one inside a subdir → None.
|
||||
sub = tmp_path / "Sample"
|
||||
sub.mkdir()
|
||||
(sub / "video.mkv").touch()
|
||||
assert find_main_video(tmp_path, _KB) is None
|
||||
|
||||
def test_empty_folder_returns_none(self, tmp_path: Path) -> None:
|
||||
assert find_main_video(tmp_path, _KB) is None
|
||||
|
||||
def test_nonexistent_folder_returns_none(self, tmp_path: Path) -> None:
|
||||
assert find_main_video(tmp_path / "ghost", _KB) is None
|
||||
|
||||
def test_single_file_release_passed_as_folder_arg(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
# Some releases are a bare .mkv with no enclosing folder.
|
||||
f = tmp_path / "Movie.2020.1080p.mkv"
|
||||
f.touch()
|
||||
assert find_main_video(f, _KB) == f
|
||||
|
||||
def test_single_file_non_video_passed_as_folder_arg(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
f = tmp_path / "README.nfo"
|
||||
f.touch()
|
||||
assert find_main_video(f, _KB) is None
|
||||
@@ -0,0 +1,282 @@
|
||||
"""Phase A — parse-confidence scoring.
|
||||
|
||||
These tests pin the score / road semantics without going through
|
||||
fixtures. They exercise the small pure functions in
|
||||
``alfred.domain.release.parser.scoring`` and the end-to-end contract
|
||||
that ``parse_release`` returns a ``(ParsedRelease, ParseReport)`` tuple.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from alfred.domain.release.parser.scoring import (
|
||||
Road,
|
||||
collect_missing_critical,
|
||||
collect_unknown_tokens,
|
||||
compute_score,
|
||||
decide_road,
|
||||
)
|
||||
from alfred.domain.release.parser.tokens import Token, TokenRole
|
||||
from alfred.domain.release.services import parse_release
|
||||
from alfred.domain.release.value_objects import (
|
||||
MediaTypeToken,
|
||||
ParsedRelease,
|
||||
ParsePath,
|
||||
ParseReport,
|
||||
)
|
||||
from alfred.domain.shared.exceptions import ValidationError
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
|
||||
_KB = YamlReleaseKnowledge()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# ParseReport VO #
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestParseReport:
|
||||
def test_construct_with_defaults(self) -> None:
|
||||
report = ParseReport(confidence=80, road="easy")
|
||||
assert report.confidence == 80
|
||||
assert report.road == "easy"
|
||||
assert report.unknown_tokens == ()
|
||||
assert report.missing_critical == ()
|
||||
|
||||
def test_is_frozen(self) -> None:
|
||||
report = ParseReport(confidence=50, road="shitty")
|
||||
with pytest.raises(Exception): # FrozenInstanceError
|
||||
report.confidence = 99 # type: ignore[misc]
|
||||
|
||||
def test_confidence_lower_bound(self) -> None:
|
||||
with pytest.raises(ValidationError):
|
||||
ParseReport(confidence=-1, road="easy")
|
||||
|
||||
def test_confidence_upper_bound(self) -> None:
|
||||
with pytest.raises(ValidationError):
|
||||
ParseReport(confidence=101, road="easy")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# compute_score #
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
|
||||
def _movie(year: int = 2020, **overrides) -> ParsedRelease:
|
||||
"""Build a populated movie ParsedRelease for scoring tests."""
|
||||
base = dict(
|
||||
raw="Inception.2010.1080p.BluRay.x264-GROUP",
|
||||
normalised="Inception.2010.1080p.BluRay.x264-GROUP",
|
||||
title="Inception",
|
||||
title_sanitized="Inception",
|
||||
year=year,
|
||||
season=None,
|
||||
episode=None,
|
||||
episode_end=None,
|
||||
quality="1080p",
|
||||
source="BluRay",
|
||||
codec="x264",
|
||||
group="GROUP",
|
||||
tech_string="1080p.BluRay.x264",
|
||||
media_type=MediaTypeToken.MOVIE.value,
|
||||
parse_path=ParsePath.DIRECT.value,
|
||||
)
|
||||
base.update(overrides)
|
||||
return ParsedRelease(**base)
|
||||
|
||||
|
||||
def _all_annotated() -> list[Token]:
|
||||
"""Token stream where everything is annotated — zero penalty."""
|
||||
return [
|
||||
Token("Inception", 0, TokenRole.TITLE),
|
||||
Token("2010", 1, TokenRole.YEAR),
|
||||
Token("1080p", 2, TokenRole.RESOLUTION),
|
||||
Token("BluRay", 3, TokenRole.SOURCE),
|
||||
Token("x264", 4, TokenRole.CODEC),
|
||||
Token("GROUP", 5, TokenRole.GROUP),
|
||||
]
|
||||
|
||||
|
||||
class TestComputeScore:
|
||||
def test_fully_populated_movie_scores_high(self) -> None:
|
||||
parsed = _movie()
|
||||
score = compute_score(parsed, _all_annotated(), _KB)
|
||||
# title 30 + media_type 20 + year 15 + resolution 5 + source 5
|
||||
# + codec 5 + group 5 = 85
|
||||
assert score == 85
|
||||
|
||||
def test_tv_show_gets_season_and_episode_weight(self) -> None:
|
||||
parsed = ParsedRelease(
|
||||
raw="Oz.S01E01.1080p.WEBRip.x265-KONTRAST",
|
||||
normalised="Oz.S01E01.1080p.WEBRip.x265-KONTRAST",
|
||||
title="Oz",
|
||||
title_sanitized="Oz",
|
||||
year=None,
|
||||
season=1,
|
||||
episode=1,
|
||||
episode_end=None,
|
||||
quality="1080p",
|
||||
source="WEBRip",
|
||||
codec="x265",
|
||||
group="KONTRAST",
|
||||
tech_string="1080p.WEBRip.x265",
|
||||
media_type=MediaTypeToken.TV_SHOW.value,
|
||||
parse_path=ParsePath.DIRECT.value,
|
||||
)
|
||||
tokens = [
|
||||
Token("Oz", 0, TokenRole.TITLE),
|
||||
Token("S01E01", 1, TokenRole.SEASON_EPISODE),
|
||||
Token("1080p", 2, TokenRole.RESOLUTION),
|
||||
Token("WEBRip", 3, TokenRole.SOURCE),
|
||||
Token("x265", 4, TokenRole.CODEC),
|
||||
Token("KONTRAST", 5, TokenRole.GROUP),
|
||||
]
|
||||
score = compute_score(parsed, tokens, _KB)
|
||||
# title 30 + media_type 20 + season 10 + episode 5 + resolution 5
|
||||
# + source 5 + codec 5 + group 5 = 85 (no year)
|
||||
assert score == 85
|
||||
|
||||
def test_unknown_tokens_subtract_penalty(self) -> None:
|
||||
parsed = _movie()
|
||||
tokens = _all_annotated() + [
|
||||
Token("noise", 6, TokenRole.UNKNOWN),
|
||||
Token("more", 7, TokenRole.UNKNOWN),
|
||||
]
|
||||
score = compute_score(parsed, tokens, _KB)
|
||||
# 85 baseline - 2*5 unknown tokens = 75
|
||||
assert score == 75
|
||||
|
||||
def test_unknown_penalty_capped(self) -> None:
|
||||
parsed = _movie()
|
||||
# 20 unknown tokens × 5 = 100 raw, capped at 30
|
||||
tokens = _all_annotated() + [
|
||||
Token(f"t{i}", 6 + i, TokenRole.UNKNOWN) for i in range(20)
|
||||
]
|
||||
score = compute_score(parsed, tokens, _KB)
|
||||
assert score == 85 - 30
|
||||
|
||||
def test_score_clamped_to_zero(self) -> None:
|
||||
# Empty-ish parse with lots of unknown tokens
|
||||
parsed = _movie(year=None, quality=None, source=None, codec=None)
|
||||
tokens = [Token(f"t{i}", i, TokenRole.UNKNOWN) for i in range(10)]
|
||||
score = compute_score(parsed, tokens, _KB)
|
||||
# title 30 + media_type 20 + group 5 = 55, -30 cap = 25
|
||||
# Sanity: still clamped at 0 minimum even if math goes weird
|
||||
assert 0 <= score <= 100
|
||||
|
||||
def test_unknown_media_type_does_not_count(self) -> None:
|
||||
parsed = _movie(media_type=MediaTypeToken.UNKNOWN.value)
|
||||
score = compute_score(parsed, _all_annotated(), _KB)
|
||||
# Loses the 20 of media_type vs baseline
|
||||
assert score == 85 - 20
|
||||
|
||||
def test_unknown_group_does_not_count(self) -> None:
|
||||
parsed = _movie(group="UNKNOWN")
|
||||
score = compute_score(parsed, _all_annotated(), _KB)
|
||||
assert score == 85 - 5
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# decide_road #
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestDecideRoad:
|
||||
def test_known_schema_is_easy_regardless_of_score(self) -> None:
|
||||
# Even a terrible score returns EASY when a schema matched.
|
||||
assert decide_road(score=0, has_schema=True, kb=_KB) is Road.EASY
|
||||
|
||||
def test_no_schema_high_score_is_shitty(self) -> None:
|
||||
assert decide_road(score=80, has_schema=False, kb=_KB) is Road.SHITTY
|
||||
|
||||
def test_no_schema_low_score_is_pop(self) -> None:
|
||||
assert decide_road(score=10, has_schema=False, kb=_KB) is Road.PATH_OF_PAIN
|
||||
|
||||
def test_threshold_boundary_is_inclusive(self) -> None:
|
||||
threshold = _KB.scoring["thresholds"]["shitty_min"]
|
||||
assert decide_road(threshold, has_schema=False, kb=_KB) is Road.SHITTY
|
||||
assert (
|
||||
decide_road(threshold - 1, has_schema=False, kb=_KB)
|
||||
is Road.PATH_OF_PAIN
|
||||
)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# Collectors #
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestCollectors:
|
||||
def test_collect_unknown_tokens_preserves_order(self) -> None:
|
||||
tokens = [
|
||||
Token("A", 0, TokenRole.TITLE),
|
||||
Token("X", 1, TokenRole.UNKNOWN),
|
||||
Token("B", 2, TokenRole.RESOLUTION),
|
||||
Token("Y", 3, TokenRole.UNKNOWN),
|
||||
]
|
||||
assert collect_unknown_tokens(tokens) == ("X", "Y")
|
||||
|
||||
def test_collect_missing_critical_full(self) -> None:
|
||||
empty = ParsedRelease(
|
||||
raw="x",
|
||||
normalised="x",
|
||||
title="",
|
||||
title_sanitized="",
|
||||
year=None,
|
||||
season=None,
|
||||
episode=None,
|
||||
episode_end=None,
|
||||
quality=None,
|
||||
source=None,
|
||||
codec=None,
|
||||
group="UNKNOWN",
|
||||
tech_string="",
|
||||
media_type=MediaTypeToken.UNKNOWN.value,
|
||||
parse_path=ParsePath.DIRECT.value,
|
||||
)
|
||||
assert set(collect_missing_critical(empty)) == {
|
||||
"title",
|
||||
"media_type",
|
||||
"year",
|
||||
}
|
||||
|
||||
def test_collect_missing_critical_none(self) -> None:
|
||||
parsed = _movie()
|
||||
assert collect_missing_critical(parsed) == ()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# End-to-end contract #
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestParseReleaseReturnsReport:
|
||||
def test_returns_tuple(self) -> None:
|
||||
result = parse_release("Inception.2010.1080p.BluRay.x264-GROUP", _KB)
|
||||
assert isinstance(result, tuple)
|
||||
assert len(result) == 2
|
||||
parsed, report = result
|
||||
assert isinstance(parsed, ParsedRelease)
|
||||
assert isinstance(report, ParseReport)
|
||||
|
||||
def test_known_group_is_easy_road(self) -> None:
|
||||
# KONTRAST has a schema in release_groups/
|
||||
_, report = parse_release(
|
||||
"Oz.S03E01.1080p.WEBRip.x265-KONTRAST", _KB
|
||||
)
|
||||
assert report.road == Road.EASY.value
|
||||
assert report.confidence > 0
|
||||
|
||||
def test_unknown_group_well_formed_is_shitty(self) -> None:
|
||||
# No registered schema but well-formed scene name → SHITTY
|
||||
_, report = parse_release(
|
||||
"Inception.2010.1080p.BluRay.x264-NOSCHEMA", _KB
|
||||
)
|
||||
assert report.road == Road.SHITTY.value
|
||||
|
||||
def test_malformed_name_is_pop(self) -> None:
|
||||
# Forbidden chars (@) — short-circuits to AI / PoP.
|
||||
_, report = parse_release("garbage@#%name", _KB)
|
||||
assert report.road == Road.PATH_OF_PAIN.value
|
||||
assert report.confidence == 0
|
||||
@@ -26,7 +26,8 @@ _KB = YamlReleaseKnowledge()
|
||||
|
||||
|
||||
def _parse(name: str) -> ParsedRelease:
|
||||
return parse_release(name, _KB)
|
||||
parsed, _report = parse_release(name, _KB)
|
||||
return parsed
|
||||
|
||||
|
||||
class TestParseTVEpisode:
|
||||
|
||||
@@ -42,9 +42,10 @@ def test_parse_matches_fixture(fixture: ReleaseFixture, tmp_path) -> None:
|
||||
# plausible filesystem paths. Catches typos / missing leading dirs early.
|
||||
fixture.materialize(tmp_path)
|
||||
|
||||
result = asdict(parse_release(fixture.release_name, _KB))
|
||||
parsed, _report = parse_release(fixture.release_name, _KB)
|
||||
result = asdict(parsed)
|
||||
# ``is_season_pack`` is a @property — asdict() does not include it.
|
||||
result["is_season_pack"] = parse_release(fixture.release_name, _KB).is_season_pack
|
||||
result["is_season_pack"] = parsed.is_season_pack
|
||||
|
||||
for field, expected in fixture.expected_parsed.items():
|
||||
assert field in result, (
|
||||
|
||||
Reference in New Issue
Block a user