feat(release): parse_release returns (ParsedRelease, ParseReport)

Wire the scoring foundations into the parser entry point. parse_release
now returns a tuple — the structural ParsedRelease and a diagnostic
ParseReport carrying confidence (0-100), road
(EASY / SHITTY / PATH_OF_PAIN), the residual UNKNOWN tokens, and the
list of critical fields that couldn't be filled.

EASY is decided structurally (a group schema matched), independently
of the score. SHITTY vs PATH_OF_PAIN is decided by score against the
60 cutoff from scoring.yaml. Malformed names (forbidden chars) emit a
zero-confidence PoP report and short-circuit to parse_path=AI as
before.

ParsePath stays as-is (DIRECT / SANITIZED / AI) — it records *how* we
tokenized, not how confident we are. The two dimensions are now
properly separated.

Call sites propagated:
- alfred/application/filesystem/resolve_destination.py (4 occurrences)
- alfred/agent/tools/filesystem.py
- tests/domain/test_release.py
- tests/domain/test_release_fixtures.py
- tests/application/test_detect_media_type.py

New tests/domain/release/test_parser_v2_scoring.py (22 cases) locks
ParseReport validation, compute_score arithmetic, decide_road
thresholding, the collector helpers, and the end-to-end tuple contract.
This commit is contained in:
2026-05-20 01:21:30 +02:00
parent 98c688f29b
commit b4c9efd13b
7 changed files with 336 additions and 22 deletions
+1 -1
View File
@@ -194,7 +194,7 @@ def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
from alfred.domain.release.services import parse_release # noqa: PLC0415
path = Path(source_path)
parsed = parse_release(release_name, _KB)
parsed, _ = parse_release(release_name, _KB)
parsed.media_type = detect_media_type(parsed, path, _KB)
probe_used = False
@@ -252,7 +252,7 @@ def resolve_season_destination(
message="TV show library path is not configured.",
)
parsed = parse_release(release_name, _KB)
parsed, _ = parse_release(release_name, _KB)
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
@@ -302,7 +302,7 @@ def resolve_episode_destination(
message="TV show library path is not configured.",
)
parsed = parse_release(release_name, _KB)
parsed, _ = parse_release(release_name, _KB)
ext = Path(source_file).suffix
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
tmdb_episode_title_safe = (
@@ -360,7 +360,7 @@ def resolve_movie_destination(
message="Movie library path is not configured.",
)
parsed = parse_release(release_name, _KB)
parsed, _ = parse_release(release_name, _KB)
ext = Path(source_file).suffix
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
@@ -399,7 +399,7 @@ def resolve_series_destination(
message="TV show library path is not configured.",
)
parsed = parse_release(release_name, _KB)
parsed, _ = parse_release(release_name, _KB)
tmdb_title_safe = _KB.sanitize_for_fs(tmdb_title)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
+38 -11
View File
@@ -8,21 +8,30 @@ Thin orchestrator over the annotate-based pipeline in
the LLM can clean them up.
* Otherwise call the v2 pipeline (tokenize → annotate → assemble) and
wrap the result in :class:`ParsedRelease`.
* Score the result and decide the road (EASY / SHITTY / PATH_OF_PAIN)
via :mod:`alfred.domain.release.parser.scoring`.
All structural and enricher logic now lives in the pipeline. This file
no longer carries field extractors — the heuristic SHITTY path is part
of :func:`~alfred.domain.release.parser.pipeline.annotate`.
The public entry point is :func:`parse_release`, which returns
``(ParsedRelease, ParseReport)``. The report carries the confidence
score, the road, and diagnostic info for downstream callers.
"""
from __future__ import annotations
from .parser import pipeline as _v2
from .parser import scoring as _scoring
from .ports import ReleaseKnowledge
from .value_objects import MediaTypeToken, ParsedRelease, ParsePath
from .value_objects import MediaTypeToken, ParsedRelease, ParsePath, ParseReport
def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
"""Parse a release name and return a :class:`ParsedRelease`.
def parse_release(
name: str, kb: ReleaseKnowledge
) -> tuple[ParsedRelease, ParseReport]:
"""Parse a release name.
Returns a tuple ``(ParsedRelease, ParseReport)``. The structural VO
is unchanged from the previous single-return contract; the report
is new and carries the confidence score + road decision.
Flow:
@@ -30,10 +39,10 @@ def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
``parse_path="sanitized"``).
2. If the remainder still contains truly forbidden chars (anything
not in the configured separators), short-circuit to
``media_type="unknown"`` / ``parse_path="ai"`` — the LLM handles
these.
``media_type="unknown"`` / ``parse_path="ai"`` and emit a
PATH_OF_PAIN report — the LLM handles these.
3. Otherwise run the v2 pipeline: tokenize → annotate (EASY when a
group schema is known, SHITTY otherwise) → assemble.
group schema is known, SHITTY otherwise) → assemble → score.
"""
parse_path = ParsePath.DIRECT.value
@@ -42,7 +51,7 @@ def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
parse_path = ParsePath.SANITIZED.value
if not _is_well_formed(clean, kb):
return ParsedRelease(
parsed = ParsedRelease(
raw=name,
normalised=clean,
title=clean,
@@ -60,18 +69,36 @@ def parse_release(name: str, kb: ReleaseKnowledge) -> ParsedRelease:
site_tag=site_tag,
parse_path=ParsePath.AI.value,
)
report = ParseReport(
confidence=0,
road=_scoring.Road.PATH_OF_PAIN.value,
unknown_tokens=(clean,),
missing_critical=("title", "media_type", "year"),
)
return parsed, report
tokens, v2_tag = _v2.tokenize(name, kb)
annotated = _v2.annotate(tokens, kb)
fields = _v2.assemble(annotated, v2_tag, name, kb)
return ParsedRelease(
parsed = ParsedRelease(
raw=name,
normalised=clean,
parse_path=parse_path,
**fields,
)
has_schema = _v2.has_known_schema(tokens, kb)
score = _scoring.compute_score(parsed, annotated, kb)
road = _scoring.decide_road(score, has_schema, kb)
report = ParseReport(
confidence=score,
road=road.value,
unknown_tokens=_scoring.collect_unknown_tokens(annotated),
missing_critical=_scoring.collect_missing_critical(parsed),
)
return parsed, report
def _is_well_formed(name: str, kb: ReleaseKnowledge) -> bool:
"""Return True if ``name`` contains no forbidden characters per scene
+6 -3
View File
@@ -28,11 +28,14 @@ _KB = YamlReleaseKnowledge()
def _parsed(media_type: str = "movie"):
"""Build a ParsedRelease with the requested media_type via the real parser."""
if media_type == "tv_show":
return parse_release("Show.S01E01.1080p-GRP", _KB)
parsed, _ = parse_release("Show.S01E01.1080p-GRP", _KB)
return parsed
if media_type == "movie":
return parse_release("Movie.2020.1080p-GRP", _KB)
parsed, _ = parse_release("Movie.2020.1080p-GRP", _KB)
return parsed
# "unknown" / other — feed a name the parser can't classify
return parse_release("randomthing", _KB)
parsed, _ = parse_release("randomthing", _KB)
return parsed
# --------------------------------------------------------------------------- #
@@ -0,0 +1,282 @@
"""Phase A — parse-confidence scoring.
These tests pin the score / road semantics without going through
fixtures. They exercise the small pure functions in
``alfred.domain.release.parser.scoring`` and the end-to-end contract
that ``parse_release`` returns a ``(ParsedRelease, ParseReport)`` tuple.
"""
from __future__ import annotations
import pytest
from alfred.domain.release.parser.scoring import (
Road,
collect_missing_critical,
collect_unknown_tokens,
compute_score,
decide_road,
)
from alfred.domain.release.parser.tokens import Token, TokenRole
from alfred.domain.release.services import parse_release
from alfred.domain.release.value_objects import (
MediaTypeToken,
ParsedRelease,
ParsePath,
ParseReport,
)
from alfred.domain.shared.exceptions import ValidationError
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
_KB = YamlReleaseKnowledge()
# --------------------------------------------------------------------- #
# ParseReport VO #
# --------------------------------------------------------------------- #
class TestParseReport:
def test_construct_with_defaults(self) -> None:
report = ParseReport(confidence=80, road="easy")
assert report.confidence == 80
assert report.road == "easy"
assert report.unknown_tokens == ()
assert report.missing_critical == ()
def test_is_frozen(self) -> None:
report = ParseReport(confidence=50, road="shitty")
with pytest.raises(Exception): # FrozenInstanceError
report.confidence = 99 # type: ignore[misc]
def test_confidence_lower_bound(self) -> None:
with pytest.raises(ValidationError):
ParseReport(confidence=-1, road="easy")
def test_confidence_upper_bound(self) -> None:
with pytest.raises(ValidationError):
ParseReport(confidence=101, road="easy")
# --------------------------------------------------------------------- #
# compute_score #
# --------------------------------------------------------------------- #
def _movie(year: int = 2020, **overrides) -> ParsedRelease:
"""Build a populated movie ParsedRelease for scoring tests."""
base = dict(
raw="Inception.2010.1080p.BluRay.x264-GROUP",
normalised="Inception.2010.1080p.BluRay.x264-GROUP",
title="Inception",
title_sanitized="Inception",
year=year,
season=None,
episode=None,
episode_end=None,
quality="1080p",
source="BluRay",
codec="x264",
group="GROUP",
tech_string="1080p.BluRay.x264",
media_type=MediaTypeToken.MOVIE.value,
parse_path=ParsePath.DIRECT.value,
)
base.update(overrides)
return ParsedRelease(**base)
def _all_annotated() -> list[Token]:
"""Token stream where everything is annotated — zero penalty."""
return [
Token("Inception", 0, TokenRole.TITLE),
Token("2010", 1, TokenRole.YEAR),
Token("1080p", 2, TokenRole.RESOLUTION),
Token("BluRay", 3, TokenRole.SOURCE),
Token("x264", 4, TokenRole.CODEC),
Token("GROUP", 5, TokenRole.GROUP),
]
class TestComputeScore:
def test_fully_populated_movie_scores_high(self) -> None:
parsed = _movie()
score = compute_score(parsed, _all_annotated(), _KB)
# title 30 + media_type 20 + year 15 + resolution 5 + source 5
# + codec 5 + group 5 = 85
assert score == 85
def test_tv_show_gets_season_and_episode_weight(self) -> None:
parsed = ParsedRelease(
raw="Oz.S01E01.1080p.WEBRip.x265-KONTRAST",
normalised="Oz.S01E01.1080p.WEBRip.x265-KONTRAST",
title="Oz",
title_sanitized="Oz",
year=None,
season=1,
episode=1,
episode_end=None,
quality="1080p",
source="WEBRip",
codec="x265",
group="KONTRAST",
tech_string="1080p.WEBRip.x265",
media_type=MediaTypeToken.TV_SHOW.value,
parse_path=ParsePath.DIRECT.value,
)
tokens = [
Token("Oz", 0, TokenRole.TITLE),
Token("S01E01", 1, TokenRole.SEASON_EPISODE),
Token("1080p", 2, TokenRole.RESOLUTION),
Token("WEBRip", 3, TokenRole.SOURCE),
Token("x265", 4, TokenRole.CODEC),
Token("KONTRAST", 5, TokenRole.GROUP),
]
score = compute_score(parsed, tokens, _KB)
# title 30 + media_type 20 + season 10 + episode 5 + resolution 5
# + source 5 + codec 5 + group 5 = 85 (no year)
assert score == 85
def test_unknown_tokens_subtract_penalty(self) -> None:
parsed = _movie()
tokens = _all_annotated() + [
Token("noise", 6, TokenRole.UNKNOWN),
Token("more", 7, TokenRole.UNKNOWN),
]
score = compute_score(parsed, tokens, _KB)
# 85 baseline - 2*5 unknown tokens = 75
assert score == 75
def test_unknown_penalty_capped(self) -> None:
parsed = _movie()
# 20 unknown tokens × 5 = 100 raw, capped at 30
tokens = _all_annotated() + [
Token(f"t{i}", 6 + i, TokenRole.UNKNOWN) for i in range(20)
]
score = compute_score(parsed, tokens, _KB)
assert score == 85 - 30
def test_score_clamped_to_zero(self) -> None:
# Empty-ish parse with lots of unknown tokens
parsed = _movie(year=None, quality=None, source=None, codec=None)
tokens = [Token(f"t{i}", i, TokenRole.UNKNOWN) for i in range(10)]
score = compute_score(parsed, tokens, _KB)
# title 30 + media_type 20 + group 5 = 55, -30 cap = 25
# Sanity: still clamped at 0 minimum even if math goes weird
assert 0 <= score <= 100
def test_unknown_media_type_does_not_count(self) -> None:
parsed = _movie(media_type=MediaTypeToken.UNKNOWN.value)
score = compute_score(parsed, _all_annotated(), _KB)
# Loses the 20 of media_type vs baseline
assert score == 85 - 20
def test_unknown_group_does_not_count(self) -> None:
parsed = _movie(group="UNKNOWN")
score = compute_score(parsed, _all_annotated(), _KB)
assert score == 85 - 5
# --------------------------------------------------------------------- #
# decide_road #
# --------------------------------------------------------------------- #
class TestDecideRoad:
def test_known_schema_is_easy_regardless_of_score(self) -> None:
# Even a terrible score returns EASY when a schema matched.
assert decide_road(score=0, has_schema=True, kb=_KB) is Road.EASY
def test_no_schema_high_score_is_shitty(self) -> None:
assert decide_road(score=80, has_schema=False, kb=_KB) is Road.SHITTY
def test_no_schema_low_score_is_pop(self) -> None:
assert decide_road(score=10, has_schema=False, kb=_KB) is Road.PATH_OF_PAIN
def test_threshold_boundary_is_inclusive(self) -> None:
threshold = _KB.scoring["thresholds"]["shitty_min"]
assert decide_road(threshold, has_schema=False, kb=_KB) is Road.SHITTY
assert (
decide_road(threshold - 1, has_schema=False, kb=_KB)
is Road.PATH_OF_PAIN
)
# --------------------------------------------------------------------- #
# Collectors #
# --------------------------------------------------------------------- #
class TestCollectors:
def test_collect_unknown_tokens_preserves_order(self) -> None:
tokens = [
Token("A", 0, TokenRole.TITLE),
Token("X", 1, TokenRole.UNKNOWN),
Token("B", 2, TokenRole.RESOLUTION),
Token("Y", 3, TokenRole.UNKNOWN),
]
assert collect_unknown_tokens(tokens) == ("X", "Y")
def test_collect_missing_critical_full(self) -> None:
empty = ParsedRelease(
raw="x",
normalised="x",
title="",
title_sanitized="",
year=None,
season=None,
episode=None,
episode_end=None,
quality=None,
source=None,
codec=None,
group="UNKNOWN",
tech_string="",
media_type=MediaTypeToken.UNKNOWN.value,
parse_path=ParsePath.DIRECT.value,
)
assert set(collect_missing_critical(empty)) == {
"title",
"media_type",
"year",
}
def test_collect_missing_critical_none(self) -> None:
parsed = _movie()
assert collect_missing_critical(parsed) == ()
# --------------------------------------------------------------------- #
# End-to-end contract #
# --------------------------------------------------------------------- #
class TestParseReleaseReturnsReport:
def test_returns_tuple(self) -> None:
result = parse_release("Inception.2010.1080p.BluRay.x264-GROUP", _KB)
assert isinstance(result, tuple)
assert len(result) == 2
parsed, report = result
assert isinstance(parsed, ParsedRelease)
assert isinstance(report, ParseReport)
def test_known_group_is_easy_road(self) -> None:
# KONTRAST has a schema in release_groups/
_, report = parse_release(
"Oz.S03E01.1080p.WEBRip.x265-KONTRAST", _KB
)
assert report.road == Road.EASY.value
assert report.confidence > 0
def test_unknown_group_well_formed_is_shitty(self) -> None:
# No registered schema but well-formed scene name → SHITTY
_, report = parse_release(
"Inception.2010.1080p.BluRay.x264-NOSCHEMA", _KB
)
assert report.road == Road.SHITTY.value
def test_malformed_name_is_pop(self) -> None:
# Forbidden chars (@) — short-circuits to AI / PoP.
_, report = parse_release("garbage@#%name", _KB)
assert report.road == Road.PATH_OF_PAIN.value
assert report.confidence == 0
+2 -1
View File
@@ -26,7 +26,8 @@ _KB = YamlReleaseKnowledge()
def _parse(name: str) -> ParsedRelease:
return parse_release(name, _KB)
parsed, _report = parse_release(name, _KB)
return parsed
class TestParseTVEpisode:
+3 -2
View File
@@ -42,9 +42,10 @@ def test_parse_matches_fixture(fixture: ReleaseFixture, tmp_path) -> None:
# plausible filesystem paths. Catches typos / missing leading dirs early.
fixture.materialize(tmp_path)
result = asdict(parse_release(fixture.release_name, _KB))
parsed, _report = parse_release(fixture.release_name, _KB)
result = asdict(parsed)
# ``is_season_pack`` is a @property — asdict() does not include it.
result["is_season_pack"] = parse_release(fixture.release_name, _KB).is_season_pack
result["is_season_pack"] = parsed.is_season_pack
for field, expected in fixture.expected_parsed.items():
assert field in result, (