feat(release): v2 enricher pass for audio/video-meta/edition/language
The EASY pipeline now extracts the full ParsedRelease surface from
known-group releases, not just the structural backbone. Behavior is
unchanged for releases that don't carry these tokens.
Pipeline (parser/pipeline.py):
- Structural walk (renamed _annotate_structural): no longer requires
body to be fully consumed. Tokens passed over between schema chunks
remain UNKNOWN so the enricher pass can claim them.
- _find_chunk(): scans forward in the body for the next token matching
a given role, skipping already-annotated tokens. Lets optional and
mandatory chunks both tolerate intercalated enricher tokens.
- _annotate_enrichers(): new non-positional pass. Walks UNKNOWN tokens
and tags AUDIO_CODEC / AUDIO_CHANNELS / BIT_DEPTH / HDR / EDITION /
LANGUAGE. Multi-token sequences from kb.audio / kb.video_meta /
kb.editions are matched first (longest-first ordering preserved from
the YAML), single tokens after.
- _apply_sequences(): mutates the token list, tagging the first token
of a matched sequence with extra['sequence']=<canonical value> and
trailing members with extra['sequence_member']='True' so assemble
skips them.
- _detect_channel_pairs(): handles the '5.1' / '7.1' case where the
'.' separator splits the layout into two tokens. Strips a trailing
'-GROUP' suffix on the second before joining.
Assemble:
- New fields populated: languages (list), audio_codec, audio_channels,
bit_depth, hdr_format, edition. Each role-handler skips
sequence_member tokens.
- media_type heuristic extended: edition in {COMPLETE, INTEGRALE,
COLLECTION} + no season → tv_complete (mirrors legacy).
Tests:
- 4 new TestEnrichers cases covering bit_depth+audio_codec+channels,
HDR sequence + edition sequence + TrueHD.Atmos + 7.1, multi-language
with DTS-HD.MA sequence, TV episode with single language.
- All 14 v2 tests + 30 fixture tests still green. Suite: 1011 passed,
8 skipped.
Refs: project_release_parser_v2_specs (memory)
This commit is contained in:
@@ -43,6 +43,19 @@ callers).
|
|||||||
annotation (movie, TV episode, season pack with optional source),
|
annotation (movie, TV episode, season pack with optional source),
|
||||||
and field assembly.
|
and field assembly.
|
||||||
|
|
||||||
|
- **Release parser v2 — enricher pass** completes the EASY pipeline.
|
||||||
|
The structural schema walk now tolerates non-positional tokens
|
||||||
|
between chunks (instead of aborting on leftover tokens), and a second
|
||||||
|
pass tags them with audio / video-meta / edition / language roles.
|
||||||
|
Multi-token sequences from `audio.yaml`, `video.yaml`, `editions.yaml`
|
||||||
|
(e.g. `DTS.HD.MA`, `DV.HDR10`, `TrueHD.Atmos`, `DIRECTORS.CUT`) are
|
||||||
|
matched before single tokens. Channel layouts like `5.1` and `7.1`
|
||||||
|
(split into two tokens by the `.` separator) are detected as
|
||||||
|
consecutive pairs. Sequence members carry an `extra["sequence_member"]`
|
||||||
|
marker so `assemble` extracts the canonical value only from the
|
||||||
|
primary token. KONTRAST releases with audio / HDR / edition / language
|
||||||
|
metadata now produce a fully populated `ParsedRelease`.
|
||||||
|
|
||||||
- **Real-world release fixtures** under `tests/fixtures/releases/{easy,shitty,path_of_pain}/`,
|
- **Real-world release fixtures** under `tests/fixtures/releases/{easy,shitty,path_of_pain}/`,
|
||||||
each documenting an expected `ParsedRelease` plus the future `routing`
|
each documenting an expected `ParsedRelease` plus the future `routing`
|
||||||
(library / torrents / seed_hardlinks) for the upcoming `organize_media`
|
(library / torrents / seed_hardlinks) for the upcoming `organize_media`
|
||||||
|
|||||||
@@ -6,13 +6,21 @@ Three stages:
|
|||||||
a separately-returned site tag (e.g. ``[YTS.MX]``) that is never
|
a separately-returned site tag (e.g. ``[YTS.MX]``) that is never
|
||||||
tokenized.
|
tokenized.
|
||||||
2. :func:`annotate` — promote each token's :class:`TokenRole` using the
|
2. :func:`annotate` — promote each token's :class:`TokenRole` using the
|
||||||
injected knowledge base. Group detection is right-to-left; if the
|
injected knowledge base. Two sub-passes:
|
||||||
group has a registered :class:`GroupSchema` we run :func:`_annotate_easy`
|
|
||||||
(schema-driven, lockstep walk); otherwise we return the tokens with
|
a. **Structural** (schema-driven, EASY only). Detects the group at
|
||||||
only the group annotated and the caller falls back to SHITTY in
|
the right end, looks up its :class:`GroupSchema`, then matches
|
||||||
:func:`_legacy_assemble` (see :mod:`..services`).
|
the schema's chunk sequence against the token stream. Between
|
||||||
|
two structural chunks, any number of unmatched tokens may
|
||||||
|
remain — they are left UNKNOWN for the enricher pass to handle.
|
||||||
|
b. **Enrichers** (non-positional). Walks UNKNOWN tokens and tags
|
||||||
|
audio / video-meta / edition / language roles. Multi-token
|
||||||
|
sequences (``DTS.HD.MA``, ``DV.HDR10``, ``DIRECTORS.CUT``) are
|
||||||
|
matched first, single tokens after.
|
||||||
|
|
||||||
3. :func:`assemble` — fold annotated tokens into a
|
3. :func:`assemble` — fold annotated tokens into a
|
||||||
:class:`~alfred.domain.release.value_objects.ParsedRelease`.
|
:class:`~alfred.domain.release.value_objects.ParsedRelease`-compatible
|
||||||
|
dict.
|
||||||
|
|
||||||
The pipeline is **pure**: no I/O, no TMDB, no probe. All knowledge
|
The pipeline is **pure**: no I/O, no TMDB, no probe. All knowledge
|
||||||
arrives through ``kb: ReleaseKnowledge``.
|
arrives through ``kb: ReleaseKnowledge``.
|
||||||
@@ -78,7 +86,7 @@ def tokenize(name: str, kb: ReleaseKnowledge) -> tuple[list[Token], str | None]:
|
|||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Stage 2 — annotate
|
# Helpers shared across passes
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
@@ -138,157 +146,8 @@ def _split_codec_group(text: str, kb: ReleaseKnowledge) -> tuple[str, str] | Non
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _detect_group(tokens: list[Token], kb: ReleaseKnowledge) -> tuple[str, int | None]:
|
|
||||||
"""Identify the release group by walking tokens right-to-left.
|
|
||||||
|
|
||||||
Returns ``(group_name, token_index_carrying_group)`` — the index is
|
|
||||||
``None`` when the group is missing entirely (no trailing ``-`` token
|
|
||||||
in the stream).
|
|
||||||
|
|
||||||
Priority:
|
|
||||||
1. Rightmost token of shape ``codec-GROUP`` (clearest signal).
|
|
||||||
2. Rightmost token containing ``-`` whose head is *not* a known
|
|
||||||
source token (Web-DL etc. shouldn't be confused with a group).
|
|
||||||
"""
|
|
||||||
# Priority 1: codec-GROUP
|
|
||||||
for tok in reversed(tokens):
|
|
||||||
split = _split_codec_group(tok.text, kb)
|
|
||||||
if split is not None:
|
|
||||||
_, group = split
|
|
||||||
return (group or "UNKNOWN"), tok.index
|
|
||||||
|
|
||||||
# Priority 2: rightmost dash, excluding known dashed sources
|
|
||||||
for tok in reversed(tokens):
|
|
||||||
if "-" not in tok.text:
|
|
||||||
continue
|
|
||||||
head, _, tail = tok.text.rpartition("-")
|
|
||||||
# Skip dashed-source tokens like "Web-DL"
|
|
||||||
if (
|
|
||||||
head.lower() in kb.sources
|
|
||||||
or tok.text.lower().replace("-", "") in kb.sources
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
if tail:
|
|
||||||
return tail, tok.index
|
|
||||||
|
|
||||||
return "UNKNOWN", None
|
|
||||||
|
|
||||||
|
|
||||||
def _annotate_easy(
|
|
||||||
tokens: list[Token],
|
|
||||||
kb: ReleaseKnowledge,
|
|
||||||
schema: GroupSchema,
|
|
||||||
group_token_index: int,
|
|
||||||
) -> list[Token] | None:
|
|
||||||
"""Annotate tokens following a known group schema (EASY path).
|
|
||||||
|
|
||||||
Returns the new token list on success, or ``None`` if the schema
|
|
||||||
walk fails — a mandatory chunk that doesn't match aborts EASY and
|
|
||||||
lets the caller fall back to SHITTY without crashing.
|
|
||||||
"""
|
|
||||||
result = list(tokens)
|
|
||||||
|
|
||||||
# The codec-GROUP token is special: it carries TWO roles (CODEC +
|
|
||||||
# GROUP). We split it conceptually and tag it as CODEC here; the
|
|
||||||
# group itself is propagated via ``extra["group"]`` so the assemble
|
|
||||||
# step can recover both pieces from one token. When we do this,
|
|
||||||
# ``codec_pre_consumed`` is True so the schema walk knows to skip
|
|
||||||
# the CODEC chunk (it has nothing left to match in the body).
|
|
||||||
group_token = result[group_token_index]
|
|
||||||
cg_split = _split_codec_group(group_token.text, kb)
|
|
||||||
codec_pre_consumed = False
|
|
||||||
if cg_split is not None:
|
|
||||||
codec, group = cg_split
|
|
||||||
result[group_token_index] = group_token.with_role(
|
|
||||||
TokenRole.CODEC, codec=codec, group=group or "UNKNOWN"
|
|
||||||
)
|
|
||||||
codec_pre_consumed = True
|
|
||||||
else:
|
|
||||||
# Group on a non-codec token (e.g. release without codec).
|
|
||||||
head, _, tail = group_token.text.rpartition("-")
|
|
||||||
result[group_token_index] = group_token.with_role(
|
|
||||||
TokenRole.GROUP, group=tail or "UNKNOWN", prefix=head
|
|
||||||
)
|
|
||||||
|
|
||||||
# Walk the schema left-to-right against tokens [0 .. group_token_index].
|
|
||||||
# The codec-GROUP token at `group_token_index` already consumed CODEC
|
|
||||||
# + GROUP, so we walk up to (not including) it.
|
|
||||||
body = result[:group_token_index]
|
|
||||||
chunk_idx = 0
|
|
||||||
tok_idx = 0
|
|
||||||
|
|
||||||
# 1) TITLE — special: consume contiguous UNKNOWN tokens until we hit
|
|
||||||
# a token whose text matches a non-title role.
|
|
||||||
while chunk_idx < len(schema.chunks) and schema.chunks[chunk_idx].role is TokenRole.TITLE:
|
|
||||||
title_end = _find_title_end(body, kb)
|
|
||||||
# All body tokens up to title_end are title parts.
|
|
||||||
for i in range(tok_idx, title_end):
|
|
||||||
result[i] = body[i].with_role(TokenRole.TITLE)
|
|
||||||
tok_idx = title_end
|
|
||||||
chunk_idx += 1
|
|
||||||
|
|
||||||
# 2) Remaining chunks. CODEC and GROUP that were pre-consumed by the
|
|
||||||
# codec-GROUP token at the end of the stream are skipped here.
|
|
||||||
for chunk in schema.chunks[chunk_idx:]:
|
|
||||||
if chunk.role is TokenRole.GROUP:
|
|
||||||
# Handled above via the trailing token.
|
|
||||||
continue
|
|
||||||
if chunk.role is TokenRole.CODEC and codec_pre_consumed:
|
|
||||||
# Already attached to the trailing token's extras.
|
|
||||||
continue
|
|
||||||
|
|
||||||
if tok_idx >= len(body):
|
|
||||||
if chunk.optional:
|
|
||||||
continue
|
|
||||||
return None
|
|
||||||
|
|
||||||
tok = body[tok_idx]
|
|
||||||
matched_role = _match_role(tok.text, chunk.role, kb)
|
|
||||||
|
|
||||||
if matched_role is None:
|
|
||||||
if chunk.optional:
|
|
||||||
continue
|
|
||||||
return None
|
|
||||||
|
|
||||||
result[tok_idx] = tok.with_role(matched_role)
|
|
||||||
tok_idx += 1
|
|
||||||
|
|
||||||
# Body must be fully consumed for EASY to succeed. Leftover tokens
|
|
||||||
# would mean we missed a chunk (e.g. extra audio/HDR tokens not in
|
|
||||||
# the schema yet) — fall back to SHITTY rather than silently dropping.
|
|
||||||
if tok_idx < len(body):
|
|
||||||
return None
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def _find_title_end(body: list[Token], kb: ReleaseKnowledge) -> int:
|
|
||||||
"""Return the exclusive index where the title ends.
|
|
||||||
|
|
||||||
The title is the leftmost run of tokens that don't match any known
|
|
||||||
structural/technical role. Stops at the first token that does.
|
|
||||||
"""
|
|
||||||
for i, tok in enumerate(body):
|
|
||||||
if _parse_season_episode(tok.text) is not None:
|
|
||||||
return i
|
|
||||||
if _is_year(tok.text):
|
|
||||||
return i
|
|
||||||
if tok.text.lower() in kb.resolutions:
|
|
||||||
return i
|
|
||||||
if tok.text.lower() in kb.sources:
|
|
||||||
return i
|
|
||||||
if tok.text.lower() in kb.codecs:
|
|
||||||
return i
|
|
||||||
return len(body)
|
|
||||||
|
|
||||||
|
|
||||||
def _match_role(text: str, role: TokenRole, kb: ReleaseKnowledge) -> TokenRole | None:
|
def _match_role(text: str, role: TokenRole, kb: ReleaseKnowledge) -> TokenRole | None:
|
||||||
"""Return ``role`` if ``text`` matches it under ``kb``, else ``None``.
|
"""Return ``role`` if ``text`` matches it under ``kb``, else ``None``."""
|
||||||
|
|
||||||
Used by the schema walk: each chunk requests a specific role, and
|
|
||||||
this checks whether the current token can play it. Optional chunks
|
|
||||||
that don't match are silently skipped.
|
|
||||||
"""
|
|
||||||
lower = text.lower()
|
lower = text.lower()
|
||||||
|
|
||||||
if role is TokenRole.YEAR:
|
if role is TokenRole.YEAR:
|
||||||
@@ -313,12 +172,314 @@ def _match_role(text: str, role: TokenRole, kb: ReleaseKnowledge) -> TokenRole |
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Stage 2a — group detection
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_group(tokens: list[Token], kb: ReleaseKnowledge) -> tuple[str, int | None]:
|
||||||
|
"""Identify the release group by walking tokens right-to-left.
|
||||||
|
|
||||||
|
Returns ``(group_name, token_index_carrying_group)``. ``index`` is
|
||||||
|
``None`` when the group is absent (no trailing ``-`` in the stream).
|
||||||
|
"""
|
||||||
|
# Priority 1: codec-GROUP shape (clearest signal).
|
||||||
|
for tok in reversed(tokens):
|
||||||
|
split = _split_codec_group(tok.text, kb)
|
||||||
|
if split is not None:
|
||||||
|
_, group = split
|
||||||
|
return (group or "UNKNOWN"), tok.index
|
||||||
|
|
||||||
|
# Priority 2: rightmost dash, excluding dashed sources (Web-DL, etc.).
|
||||||
|
for tok in reversed(tokens):
|
||||||
|
if "-" not in tok.text:
|
||||||
|
continue
|
||||||
|
head, _, tail = tok.text.rpartition("-")
|
||||||
|
if (
|
||||||
|
head.lower() in kb.sources
|
||||||
|
or tok.text.lower().replace("-", "") in kb.sources
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
if tail:
|
||||||
|
return tail, tok.index
|
||||||
|
|
||||||
|
return "UNKNOWN", None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Stage 2b — structural annotation (schema-driven)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _annotate_structural(
|
||||||
|
tokens: list[Token],
|
||||||
|
kb: ReleaseKnowledge,
|
||||||
|
schema: GroupSchema,
|
||||||
|
group_token_index: int,
|
||||||
|
) -> list[Token] | None:
|
||||||
|
"""Annotate structural tokens following a known group schema.
|
||||||
|
|
||||||
|
Walks the schema's chunks against the body (tokens up to the group
|
||||||
|
token). For each chunk, scans forward in the body for a matching
|
||||||
|
token — tokens passed over without match are left UNKNOWN (the
|
||||||
|
enricher pass will handle them).
|
||||||
|
|
||||||
|
Returns ``None`` if any mandatory chunk fails to find a match.
|
||||||
|
"""
|
||||||
|
result = list(tokens)
|
||||||
|
|
||||||
|
# The codec-GROUP token carries CODEC + GROUP. Split it now so the
|
||||||
|
# schema walk knows the codec is "pre-consumed" at the end.
|
||||||
|
group_token = result[group_token_index]
|
||||||
|
cg_split = _split_codec_group(group_token.text, kb)
|
||||||
|
codec_pre_consumed = False
|
||||||
|
if cg_split is not None:
|
||||||
|
codec, group = cg_split
|
||||||
|
result[group_token_index] = group_token.with_role(
|
||||||
|
TokenRole.CODEC, codec=codec, group=group or "UNKNOWN"
|
||||||
|
)
|
||||||
|
codec_pre_consumed = True
|
||||||
|
else:
|
||||||
|
head, _, tail = group_token.text.rpartition("-")
|
||||||
|
result[group_token_index] = group_token.with_role(
|
||||||
|
TokenRole.GROUP, group=tail or "UNKNOWN", prefix=head
|
||||||
|
)
|
||||||
|
|
||||||
|
body_end = group_token_index # exclusive
|
||||||
|
tok_idx = 0
|
||||||
|
chunk_idx = 0
|
||||||
|
|
||||||
|
# 1) TITLE — leftmost contiguous tokens up to the first structural
|
||||||
|
# boundary. Title is special because it can be multi-token.
|
||||||
|
while (
|
||||||
|
chunk_idx < len(schema.chunks)
|
||||||
|
and schema.chunks[chunk_idx].role is TokenRole.TITLE
|
||||||
|
):
|
||||||
|
title_end = _find_title_end(result, body_end, kb)
|
||||||
|
for i in range(tok_idx, title_end):
|
||||||
|
result[i] = result[i].with_role(TokenRole.TITLE)
|
||||||
|
tok_idx = title_end
|
||||||
|
chunk_idx += 1
|
||||||
|
|
||||||
|
# 2) Remaining structural chunks. For each, scan forward in the body
|
||||||
|
# for a matching token; tokens passed over remain UNKNOWN.
|
||||||
|
for chunk in schema.chunks[chunk_idx:]:
|
||||||
|
if chunk.role is TokenRole.GROUP:
|
||||||
|
continue
|
||||||
|
if chunk.role is TokenRole.CODEC and codec_pre_consumed:
|
||||||
|
continue
|
||||||
|
|
||||||
|
match_idx = _find_chunk(result, tok_idx, body_end, chunk.role, kb)
|
||||||
|
if match_idx is None:
|
||||||
|
if chunk.optional:
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
|
result[match_idx] = result[match_idx].with_role(chunk.role)
|
||||||
|
tok_idx = match_idx + 1
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _find_title_end(
|
||||||
|
tokens: list[Token], body_end: int, kb: ReleaseKnowledge
|
||||||
|
) -> int:
|
||||||
|
"""Return the exclusive index where the title ends.
|
||||||
|
|
||||||
|
The title is the leftmost run of tokens whose text does not match
|
||||||
|
any structural role (year, season/episode, resolution, source,
|
||||||
|
codec). Enricher tokens (audio, HDR, language) are *not* boundaries
|
||||||
|
because they can appear in the middle of the structural sequence;
|
||||||
|
however, in canonical scene names they don't appear inside the title
|
||||||
|
itself, so this heuristic holds in practice.
|
||||||
|
"""
|
||||||
|
for i in range(body_end):
|
||||||
|
text = tokens[i].text
|
||||||
|
if _parse_season_episode(text) is not None:
|
||||||
|
return i
|
||||||
|
if _is_year(text):
|
||||||
|
return i
|
||||||
|
lower = text.lower()
|
||||||
|
if lower in kb.resolutions:
|
||||||
|
return i
|
||||||
|
if lower in kb.sources:
|
||||||
|
return i
|
||||||
|
if lower in kb.codecs:
|
||||||
|
return i
|
||||||
|
return body_end
|
||||||
|
|
||||||
|
|
||||||
|
def _find_chunk(
|
||||||
|
tokens: list[Token],
|
||||||
|
start: int,
|
||||||
|
end: int,
|
||||||
|
role: TokenRole,
|
||||||
|
kb: ReleaseKnowledge,
|
||||||
|
) -> int | None:
|
||||||
|
"""Return the first index in ``[start, end)`` whose token matches ``role``.
|
||||||
|
|
||||||
|
Returns ``None`` if no token in the range matches. Tokens already
|
||||||
|
annotated (non-UNKNOWN) are skipped — they belong to another chunk.
|
||||||
|
"""
|
||||||
|
for i in range(start, end):
|
||||||
|
if tokens[i].role is not TokenRole.UNKNOWN:
|
||||||
|
continue
|
||||||
|
if _match_role(tokens[i].text, role, kb) is not None:
|
||||||
|
return i
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Stage 2c — enricher pass (non-positional roles)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _annotate_enrichers(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]:
|
||||||
|
"""Tag the remaining UNKNOWN tokens with non-positional roles.
|
||||||
|
|
||||||
|
Multi-token sequences are matched first (so ``DTS.HD.MA`` wins over
|
||||||
|
a single-token ``DTS``). For each sequence match, the first token
|
||||||
|
receives the role + ``extra["sequence"]`` (the canonical joined
|
||||||
|
value), and the trailing members are marked with the same role +
|
||||||
|
``extra["sequence_member"]=True`` so :func:`assemble` extracts the
|
||||||
|
value only from the primary.
|
||||||
|
"""
|
||||||
|
result = list(tokens)
|
||||||
|
|
||||||
|
# Multi-token sequences first.
|
||||||
|
_apply_sequences(
|
||||||
|
result, kb.audio.get("sequences", []), "codec", TokenRole.AUDIO_CODEC
|
||||||
|
)
|
||||||
|
_apply_sequences(
|
||||||
|
result, kb.video_meta.get("sequences", []), "hdr", TokenRole.HDR
|
||||||
|
)
|
||||||
|
_apply_sequences(
|
||||||
|
result, kb.editions.get("sequences", []), "edition", TokenRole.EDITION
|
||||||
|
)
|
||||||
|
|
||||||
|
# Single tokens.
|
||||||
|
known_audio_codecs = {c.upper() for c in kb.audio.get("codecs", [])}
|
||||||
|
known_audio_channels = set(kb.audio.get("channels", []))
|
||||||
|
known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra
|
||||||
|
known_bit_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])}
|
||||||
|
known_editions = {t.upper() for t in kb.editions.get("tokens", [])}
|
||||||
|
|
||||||
|
# Channel layouts like "5.1" are tokenized as two tokens ("5", "1")
|
||||||
|
# because "." is a separator. Detect consecutive pairs whose joined
|
||||||
|
# value (without any trailing "-GROUP") is in the channel set.
|
||||||
|
_detect_channel_pairs(result, known_audio_channels)
|
||||||
|
|
||||||
|
for i, tok in enumerate(result):
|
||||||
|
if tok.role is not TokenRole.UNKNOWN:
|
||||||
|
continue
|
||||||
|
text = tok.text
|
||||||
|
upper = text.upper()
|
||||||
|
lower = text.lower()
|
||||||
|
|
||||||
|
if upper in known_audio_codecs:
|
||||||
|
result[i] = tok.with_role(TokenRole.AUDIO_CODEC)
|
||||||
|
continue
|
||||||
|
if text in known_audio_channels:
|
||||||
|
result[i] = tok.with_role(TokenRole.AUDIO_CHANNELS)
|
||||||
|
continue
|
||||||
|
if upper in known_hdr:
|
||||||
|
result[i] = tok.with_role(TokenRole.HDR)
|
||||||
|
continue
|
||||||
|
if lower in known_bit_depth:
|
||||||
|
result[i] = tok.with_role(TokenRole.BIT_DEPTH)
|
||||||
|
continue
|
||||||
|
if upper in known_editions:
|
||||||
|
result[i] = tok.with_role(TokenRole.EDITION)
|
||||||
|
continue
|
||||||
|
if upper in kb.language_tokens:
|
||||||
|
result[i] = tok.with_role(TokenRole.LANGUAGE)
|
||||||
|
continue
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_sequences(
|
||||||
|
tokens: list[Token],
|
||||||
|
sequences: list[dict],
|
||||||
|
value_key: str,
|
||||||
|
role: TokenRole,
|
||||||
|
) -> None:
|
||||||
|
"""Mark the first occurrence of each sequence in place.
|
||||||
|
|
||||||
|
Mutates ``tokens`` (replacing entries with new role-tagged Token
|
||||||
|
instances). Sequences in the YAML must be ordered most-specific
|
||||||
|
first; the first match wins per starting position.
|
||||||
|
"""
|
||||||
|
if not sequences:
|
||||||
|
return
|
||||||
|
|
||||||
|
upper_texts = [t.text.upper() for t in tokens]
|
||||||
|
consumed: set[int] = set()
|
||||||
|
|
||||||
|
for seq in sequences:
|
||||||
|
seq_upper = [s.upper() for s in seq["tokens"]]
|
||||||
|
n = len(seq_upper)
|
||||||
|
for start in range(len(tokens) - n + 1):
|
||||||
|
if any(idx in consumed for idx in range(start, start + n)):
|
||||||
|
continue
|
||||||
|
if any(
|
||||||
|
tokens[start + k].role is not TokenRole.UNKNOWN for k in range(n)
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
if upper_texts[start : start + n] == seq_upper:
|
||||||
|
tokens[start] = tokens[start].with_role(
|
||||||
|
role, sequence=seq[value_key]
|
||||||
|
)
|
||||||
|
for k in range(1, n):
|
||||||
|
tokens[start + k] = tokens[start + k].with_role(
|
||||||
|
role, sequence_member="True"
|
||||||
|
)
|
||||||
|
consumed.update(range(start, start + n))
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_channel_pairs(
|
||||||
|
tokens: list[Token], known_channels: set[str]
|
||||||
|
) -> None:
|
||||||
|
"""Spot two consecutive numeric tokens that form a channel layout.
|
||||||
|
|
||||||
|
Example: ``["5", "1-KTH"]`` → joined ``"5.1"`` (after stripping the
|
||||||
|
``-GROUP`` suffix on the second). The second token may be the trailing
|
||||||
|
codec-GROUP token, in which case it's already tagged CODEC and we
|
||||||
|
skip — we'd corrupt its role.
|
||||||
|
"""
|
||||||
|
for i in range(len(tokens) - 1):
|
||||||
|
first = tokens[i]
|
||||||
|
second = tokens[i + 1]
|
||||||
|
if first.role is not TokenRole.UNKNOWN:
|
||||||
|
continue
|
||||||
|
# Strip a "-GROUP" suffix on the second token before joining.
|
||||||
|
second_text = second.text.split("-")[0]
|
||||||
|
candidate = f"{first.text}.{second_text}"
|
||||||
|
if candidate not in known_channels:
|
||||||
|
continue
|
||||||
|
# Only tag the first token (carries the channel value). The
|
||||||
|
# second token may legitimately remain UNKNOWN (or be the
|
||||||
|
# codec-GROUP token, already tagged CODEC).
|
||||||
|
tokens[i] = first.with_role(
|
||||||
|
TokenRole.AUDIO_CHANNELS, sequence=candidate
|
||||||
|
)
|
||||||
|
if second.role is TokenRole.UNKNOWN:
|
||||||
|
tokens[i + 1] = second.with_role(
|
||||||
|
TokenRole.AUDIO_CHANNELS, sequence_member="True"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Stage 2 entry point
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def annotate(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token] | None:
|
def annotate(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token] | None:
|
||||||
"""Annotate token roles. Returns ``None`` when the EASY path fails.
|
"""Annotate token roles. Returns ``None`` when the EASY path fails.
|
||||||
|
|
||||||
A ``None`` return means: the group is unknown, OR the schema walk
|
A ``None`` return means: the group is unknown, OR the schema walk
|
||||||
aborted on a mandatory mismatch. The caller (``services.parse_release``)
|
aborted on a mandatory mismatch. The caller falls back to the legacy
|
||||||
falls back to the legacy SHITTY heuristic in that case.
|
SHITTY heuristic in that case.
|
||||||
"""
|
"""
|
||||||
group_name, group_index = _detect_group(tokens, kb)
|
group_name, group_index = _detect_group(tokens, kb)
|
||||||
if group_index is None:
|
if group_index is None:
|
||||||
@@ -328,7 +489,11 @@ def annotate(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token] | None:
|
|||||||
if schema is None:
|
if schema is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return _annotate_easy(tokens, kb, schema, group_index)
|
structural = _annotate_structural(tokens, kb, schema, group_index)
|
||||||
|
if structural is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return _annotate_enrichers(structural, kb)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -345,9 +510,8 @@ def assemble(
|
|||||||
"""Fold annotated tokens into a ``ParsedRelease``-compatible dict.
|
"""Fold annotated tokens into a ``ParsedRelease``-compatible dict.
|
||||||
|
|
||||||
Returns a dict (not a ``ParsedRelease`` instance) so the caller can
|
Returns a dict (not a ``ParsedRelease`` instance) so the caller can
|
||||||
layer in additional fields (``parse_path``, etc.) before instantiation.
|
layer in additional fields (``parse_path``, ``raw``, …) before
|
||||||
The dict's keys mirror the :class:`ParsedRelease` constructor
|
instantiation.
|
||||||
arguments.
|
|
||||||
"""
|
"""
|
||||||
title_parts = [t.text for t in annotated if t.role is TokenRole.TITLE]
|
title_parts = [t.text for t in annotated if t.role is TokenRole.TITLE]
|
||||||
title = ".".join(title_parts) if title_parts else (
|
title = ".".join(title_parts) if title_parts else (
|
||||||
@@ -362,33 +526,62 @@ def assemble(
|
|||||||
source: str | None = None
|
source: str | None = None
|
||||||
codec: str | None = None
|
codec: str | None = None
|
||||||
group = "UNKNOWN"
|
group = "UNKNOWN"
|
||||||
|
audio_codec: str | None = None
|
||||||
|
audio_channels: str | None = None
|
||||||
|
bit_depth: str | None = None
|
||||||
|
hdr_format: str | None = None
|
||||||
|
edition: str | None = None
|
||||||
|
languages: list[str] = []
|
||||||
|
|
||||||
for tok in annotated:
|
for tok in annotated:
|
||||||
if tok.role is TokenRole.YEAR:
|
# Skip non-primary members of a multi-token sequence.
|
||||||
|
if tok.extra.get("sequence_member") == "True":
|
||||||
|
continue
|
||||||
|
|
||||||
|
role = tok.role
|
||||||
|
if role is TokenRole.YEAR:
|
||||||
year = int(tok.text)
|
year = int(tok.text)
|
||||||
elif tok.role is TokenRole.SEASON_EPISODE:
|
elif role is TokenRole.SEASON_EPISODE:
|
||||||
parsed = _parse_season_episode(tok.text)
|
parsed = _parse_season_episode(tok.text)
|
||||||
if parsed is not None:
|
if parsed is not None:
|
||||||
season, episode, episode_end = parsed
|
season, episode, episode_end = parsed
|
||||||
elif tok.role is TokenRole.RESOLUTION:
|
elif role is TokenRole.RESOLUTION:
|
||||||
quality = tok.text
|
quality = tok.text
|
||||||
elif tok.role is TokenRole.SOURCE:
|
elif role is TokenRole.SOURCE:
|
||||||
source = tok.text
|
source = tok.text
|
||||||
elif tok.role is TokenRole.CODEC:
|
elif role is TokenRole.CODEC:
|
||||||
# CODEC token may also carry the group (codec-GROUP shape).
|
|
||||||
codec = tok.extra.get("codec", tok.text)
|
codec = tok.extra.get("codec", tok.text)
|
||||||
if "group" in tok.extra:
|
if "group" in tok.extra:
|
||||||
group = tok.extra["group"] or "UNKNOWN"
|
group = tok.extra["group"] or "UNKNOWN"
|
||||||
elif tok.role is TokenRole.GROUP:
|
elif role is TokenRole.GROUP:
|
||||||
group = tok.extra.get("group", tok.text) or "UNKNOWN"
|
group = tok.extra.get("group", tok.text) or "UNKNOWN"
|
||||||
|
elif role is TokenRole.AUDIO_CODEC:
|
||||||
|
if audio_codec is None:
|
||||||
|
audio_codec = tok.extra.get("sequence", tok.text)
|
||||||
|
elif role is TokenRole.AUDIO_CHANNELS:
|
||||||
|
if audio_channels is None:
|
||||||
|
audio_channels = tok.extra.get("sequence", tok.text)
|
||||||
|
elif role is TokenRole.BIT_DEPTH:
|
||||||
|
if bit_depth is None:
|
||||||
|
bit_depth = tok.text.lower()
|
||||||
|
elif role is TokenRole.HDR:
|
||||||
|
if hdr_format is None:
|
||||||
|
hdr_format = tok.extra.get("sequence", tok.text.upper())
|
||||||
|
elif role is TokenRole.EDITION:
|
||||||
|
if edition is None:
|
||||||
|
edition = tok.extra.get("sequence", tok.text.upper())
|
||||||
|
elif role is TokenRole.LANGUAGE:
|
||||||
|
languages.append(tok.text.upper())
|
||||||
|
|
||||||
tech_parts = [p for p in (quality, source, codec) if p]
|
tech_parts = [p for p in (quality, source, codec) if p]
|
||||||
tech_string = ".".join(tech_parts)
|
tech_string = ".".join(tech_parts)
|
||||||
|
|
||||||
# Media type: TV if a season was parsed, otherwise movie if we have
|
# Media type heuristic — same rules as the legacy parser, minus the
|
||||||
# at least one tech marker, else unknown.
|
# documentary/concert/integrale specials (handled by SHITTY for now).
|
||||||
if season is not None:
|
if season is not None:
|
||||||
media_type = "tv_show"
|
media_type = "tv_show"
|
||||||
|
elif edition in {"COMPLETE", "INTEGRALE", "COLLECTION"}:
|
||||||
|
media_type = "tv_complete"
|
||||||
elif any((quality, source, codec, year)):
|
elif any((quality, source, codec, year)):
|
||||||
media_type = "movie"
|
media_type = "movie"
|
||||||
else:
|
else:
|
||||||
@@ -408,4 +601,10 @@ def assemble(
|
|||||||
"tech_string": tech_string,
|
"tech_string": tech_string,
|
||||||
"media_type": media_type,
|
"media_type": media_type,
|
||||||
"site_tag": site_tag,
|
"site_tag": site_tag,
|
||||||
|
"languages": languages,
|
||||||
|
"audio_codec": audio_codec,
|
||||||
|
"audio_channels": audio_channels,
|
||||||
|
"bit_depth": bit_depth,
|
||||||
|
"hdr_format": hdr_format,
|
||||||
|
"edition": edition,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -140,3 +140,65 @@ class TestAssemble:
|
|||||||
assert fields["source"] is None # ELiTE omits it
|
assert fields["source"] is None # ELiTE omits it
|
||||||
assert fields["tech_string"] == "1080p.x265"
|
assert fields["tech_string"] == "1080p.x265"
|
||||||
assert fields["group"] == "ELiTE"
|
assert fields["group"] == "ELiTE"
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnrichers:
|
||||||
|
"""Non-positional roles populated alongside the structural walk.
|
||||||
|
|
||||||
|
These releases would have failed the v2 EASY path before the enricher
|
||||||
|
pass landed (leftover unknown tokens would force a fallback). They
|
||||||
|
now succeed in v2 with rich metadata.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_bit_depth_and_audio(self) -> None:
|
||||||
|
name = "Back.in.Action.2025.1080p.WEBRip.10bit.DDP.5.1.x265-KONTRAST"
|
||||||
|
tokens, tag = tokenize(name, _KB)
|
||||||
|
annotated = annotate(tokens, _KB)
|
||||||
|
assert annotated is not None
|
||||||
|
fields = assemble(annotated, tag, name, _KB)
|
||||||
|
|
||||||
|
assert fields["title"] == "Back.in.Action"
|
||||||
|
assert fields["bit_depth"] == "10bit"
|
||||||
|
assert fields["audio_codec"] == "DDP"
|
||||||
|
assert fields["audio_channels"] == "5.1"
|
||||||
|
|
||||||
|
def test_hdr_sequence(self) -> None:
|
||||||
|
# DV.HDR10 sequence + TrueHD.Atmos sequence + 7.1 channels +
|
||||||
|
# DIRECTORS.CUT edition all in one release.
|
||||||
|
name = (
|
||||||
|
"Some.Movie.2024.DIRECTORS.CUT.2160p.BluRay.DV.HDR10."
|
||||||
|
"TrueHD.Atmos.7.1.x265-KONTRAST"
|
||||||
|
)
|
||||||
|
tokens, tag = tokenize(name, _KB)
|
||||||
|
annotated = annotate(tokens, _KB)
|
||||||
|
assert annotated is not None
|
||||||
|
fields = assemble(annotated, tag, name, _KB)
|
||||||
|
|
||||||
|
assert fields["edition"] == "DIRECTORS.CUT"
|
||||||
|
assert fields["hdr_format"] == "DV.HDR10"
|
||||||
|
assert fields["audio_codec"] == "TrueHD.Atmos"
|
||||||
|
assert fields["audio_channels"] == "7.1"
|
||||||
|
|
||||||
|
def test_multiple_languages(self) -> None:
|
||||||
|
name = "Movie.2020.FRENCH.MULTI.1080p.WEBRip.DTS.HD.MA.5.1.x265-KONTRAST"
|
||||||
|
tokens, tag = tokenize(name, _KB)
|
||||||
|
annotated = annotate(tokens, _KB)
|
||||||
|
assert annotated is not None
|
||||||
|
fields = assemble(annotated, tag, name, _KB)
|
||||||
|
|
||||||
|
assert fields["languages"] == ["FRENCH", "MULTI"]
|
||||||
|
assert fields["audio_codec"] == "DTS-HD.MA"
|
||||||
|
assert fields["audio_channels"] == "5.1"
|
||||||
|
|
||||||
|
def test_tv_with_language(self) -> None:
|
||||||
|
name = "Show.S01E05.FRENCH.1080p.WEBRip.x265-KONTRAST"
|
||||||
|
tokens, tag = tokenize(name, _KB)
|
||||||
|
annotated = annotate(tokens, _KB)
|
||||||
|
assert annotated is not None
|
||||||
|
fields = assemble(annotated, tag, name, _KB)
|
||||||
|
|
||||||
|
assert fields["title"] == "Show"
|
||||||
|
assert fields["season"] == 1
|
||||||
|
assert fields["episode"] == 5
|
||||||
|
assert fields["languages"] == ["FRENCH"]
|
||||||
|
assert fields["media_type"] == "tv_show"
|
||||||
|
|||||||
Reference in New Issue
Block a user