757e4045ee
The fields were already typed as MediaTypeToken / ParsePath, but a tolerant __post_init__ coerced raw strings into their enum form. With MediaTypeToken(str, Enum) (and ParsePath idem), the coercion served no purpose — callers that pass '.value' got back the enum anyway, and callers that pass an unknown string got a ValidationError just like they would now. Strict mode: constructor rejects non-enum values directly. The two in-tree builders (parse_release() and the parser pipeline) already produce enum values; all .value sites have been removed. Drops the unused _VALID_MEDIA_TYPES / _VALID_PARSE_PATHS lookup tables.
768 lines
27 KiB
Python
768 lines
27 KiB
Python
"""Annotate-based pipeline.
|
|
|
|
Three stages:
|
|
|
|
1. :func:`tokenize` — release name → ``list[Token]`` (all UNKNOWN), plus
|
|
a separately-returned site tag (e.g. ``[YTS.MX]``) that is never
|
|
tokenized.
|
|
2. :func:`annotate` — promote each token's :class:`TokenRole` using the
|
|
injected knowledge base. Two sub-passes:
|
|
|
|
a. **Structural** (schema-driven, EASY only). Detects the group at
|
|
the right end, looks up its :class:`GroupSchema`, then matches
|
|
the schema's chunk sequence against the token stream. Between
|
|
two structural chunks, any number of unmatched tokens may
|
|
remain — they are left UNKNOWN for the enricher pass to handle.
|
|
b. **Enrichers** (non-positional). Walks UNKNOWN tokens and tags
|
|
audio / video-meta / edition / language roles. Multi-token
|
|
sequences (``DTS.HD.MA``, ``DV.HDR10``, ``DIRECTORS.CUT``) are
|
|
matched first, single tokens after.
|
|
|
|
3. :func:`assemble` — fold annotated tokens into a
|
|
:class:`~alfred.domain.release.value_objects.ParsedRelease`-compatible
|
|
dict.
|
|
|
|
The pipeline is **pure**: no I/O, no TMDB, no probe. All knowledge
|
|
arrives through ``kb: ReleaseKnowledge``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from ..ports.knowledge import ReleaseKnowledge
|
|
from ..value_objects import MediaTypeToken
|
|
from .schema import GroupSchema
|
|
from .tokens import Token, TokenRole
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 1 — tokenize
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def strip_site_tag(name: str) -> tuple[str, str | None]:
|
|
"""Split off a ``[site.tag]`` prefix or suffix.
|
|
|
|
Returns ``(clean_name, tag)``. If no tag is found, returns
|
|
``(name.strip(), None)``.
|
|
"""
|
|
s = name.strip()
|
|
|
|
if s.startswith("["):
|
|
close = s.find("]")
|
|
if close != -1:
|
|
tag = s[1:close].strip()
|
|
remainder = s[close + 1 :].strip()
|
|
if tag and remainder:
|
|
return remainder, tag
|
|
|
|
if s.endswith("]"):
|
|
open_bracket = s.rfind("[")
|
|
if open_bracket != -1:
|
|
tag = s[open_bracket + 1 : -1].strip()
|
|
remainder = s[:open_bracket].strip()
|
|
if tag and remainder:
|
|
return remainder, tag
|
|
|
|
return s, None
|
|
|
|
|
|
def tokenize(name: str, kb: ReleaseKnowledge) -> tuple[list[Token], str | None]:
|
|
"""Split ``name`` into tokens after stripping any site tag.
|
|
|
|
String-ops style: replace every configured separator with a single
|
|
NUL byte then split. NUL cannot legally appear in a release name, so
|
|
it's a safe sentinel.
|
|
"""
|
|
clean, site_tag = strip_site_tag(name)
|
|
|
|
DELIM = "\x00"
|
|
buf = clean
|
|
for sep in kb.separators:
|
|
if sep != DELIM:
|
|
buf = buf.replace(sep, DELIM)
|
|
|
|
pieces = [p for p in buf.split(DELIM) if p]
|
|
tokens = [Token(text=p, index=i) for i, p in enumerate(pieces)]
|
|
return tokens, site_tag
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers shared across passes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_season_episode(text: str) -> tuple[int, int | None, int | None] | None:
|
|
"""Parse a single token as ``SxxExx`` / ``SxxExxExx`` / ``Sxx`` /
|
|
``Sxx-yy`` (season range) / ``NxNN``.
|
|
|
|
Returns ``(season, episode, episode_end)`` or ``None`` if the token
|
|
is not a season/episode marker. For ``Sxx-yy``, returns the first
|
|
season with no episode info — the caller is expected to detect the
|
|
range form and promote ``media_type`` to ``tv_complete`` separately.
|
|
"""
|
|
upper = text.upper()
|
|
|
|
# SxxExx form (and Sxx, Sxx-yy)
|
|
if len(upper) >= 3 and upper[0] == "S" and upper[1:3].isdigit():
|
|
season = int(upper[1:3])
|
|
rest = upper[3:]
|
|
|
|
if not rest:
|
|
return season, None, None
|
|
|
|
# Sxx-yy season-range form: capture the first season, treat as a
|
|
# complete-series marker (no episode info).
|
|
if (
|
|
len(rest) == 3
|
|
and rest[0] == "-"
|
|
and rest[1:3].isdigit()
|
|
):
|
|
return season, None, None
|
|
|
|
episodes: list[int] = []
|
|
while rest.startswith("E") and len(rest) >= 3 and rest[1:3].isdigit():
|
|
episodes.append(int(rest[1:3]))
|
|
rest = rest[3:]
|
|
|
|
if not episodes:
|
|
return None
|
|
# For chained multi-episode markers (E09E10E11), the range is the
|
|
# first → last episode. Intermediate values are implied.
|
|
return season, episodes[0], episodes[-1] if len(episodes) >= 2 else None
|
|
|
|
# NxNN form
|
|
if "X" in upper:
|
|
parts = upper.split("X")
|
|
if len(parts) >= 2 and all(p.isdigit() and p for p in parts):
|
|
season = int(parts[0])
|
|
episode = int(parts[1])
|
|
episode_end = int(parts[2]) if len(parts) >= 3 else None
|
|
return season, episode, episode_end
|
|
|
|
return None
|
|
|
|
|
|
def _is_year(text: str) -> bool:
|
|
"""Return True if ``text`` is a 4-digit year in [1900, 2099]."""
|
|
return len(text) == 4 and text.isdigit() and 1900 <= int(text) <= 2099
|
|
|
|
|
|
def _split_codec_group(text: str, kb: ReleaseKnowledge) -> tuple[str, str] | None:
|
|
"""Split a ``codec-GROUP`` token into ``(codec, group)`` if it fits.
|
|
|
|
Returns ``None`` if the token doesn't match the ``codec-GROUP``
|
|
shape. Handles the empty-group case (``x265-``) as ``(codec, "")``.
|
|
"""
|
|
if "-" not in text:
|
|
return None
|
|
head, _, tail = text.rpartition("-")
|
|
if head.lower() in kb.codecs:
|
|
return head, tail
|
|
return None
|
|
|
|
|
|
def _match_role(text: str, role: TokenRole, kb: ReleaseKnowledge) -> TokenRole | None:
|
|
"""Return ``role`` if ``text`` matches it under ``kb``, else ``None``."""
|
|
lower = text.lower()
|
|
|
|
if role is TokenRole.YEAR:
|
|
return TokenRole.YEAR if _is_year(text) else None
|
|
|
|
if role is TokenRole.SEASON_EPISODE:
|
|
return (
|
|
TokenRole.SEASON_EPISODE
|
|
if _parse_season_episode(text) is not None
|
|
else None
|
|
)
|
|
|
|
if role is TokenRole.RESOLUTION:
|
|
return TokenRole.RESOLUTION if lower in kb.resolutions else None
|
|
|
|
if role is TokenRole.SOURCE:
|
|
return TokenRole.SOURCE if lower in kb.sources else None
|
|
|
|
if role is TokenRole.CODEC:
|
|
return TokenRole.CODEC if lower in kb.codecs else None
|
|
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 2a — group detection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _detect_group(tokens: list[Token], kb: ReleaseKnowledge) -> tuple[str, int | None]:
|
|
"""Identify the release group by walking tokens right-to-left.
|
|
|
|
Returns ``(group_name, token_index_carrying_group)``. ``index`` is
|
|
``None`` when the group is absent (no trailing ``-`` in the stream).
|
|
"""
|
|
# Priority 1: codec-GROUP shape (clearest signal).
|
|
for tok in reversed(tokens):
|
|
split = _split_codec_group(tok.text, kb)
|
|
if split is not None:
|
|
_, group = split
|
|
return (group or "UNKNOWN"), tok.index
|
|
|
|
# Priority 2: rightmost dash, excluding dashed sources (Web-DL, etc.).
|
|
for tok in reversed(tokens):
|
|
if "-" not in tok.text:
|
|
continue
|
|
head, _, tail = tok.text.rpartition("-")
|
|
if (
|
|
head.lower() in kb.sources
|
|
or tok.text.lower().replace("-", "") in kb.sources
|
|
):
|
|
continue
|
|
if tail:
|
|
return tail, tok.index
|
|
|
|
return "UNKNOWN", None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 2b — structural annotation (schema-driven)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _annotate_structural(
|
|
tokens: list[Token],
|
|
kb: ReleaseKnowledge,
|
|
schema: GroupSchema,
|
|
group_token_index: int,
|
|
) -> list[Token] | None:
|
|
"""Annotate structural tokens following a known group schema.
|
|
|
|
Walks the schema's chunks against the body (tokens up to the group
|
|
token). For each chunk, scans forward in the body for a matching
|
|
token — tokens passed over without match are left UNKNOWN (the
|
|
enricher pass will handle them).
|
|
|
|
Returns ``None`` if any mandatory chunk fails to find a match.
|
|
"""
|
|
result = list(tokens)
|
|
|
|
# The codec-GROUP token carries CODEC + GROUP. Split it now so the
|
|
# schema walk knows the codec is "pre-consumed" at the end.
|
|
group_token = result[group_token_index]
|
|
cg_split = _split_codec_group(group_token.text, kb)
|
|
codec_pre_consumed = False
|
|
if cg_split is not None:
|
|
codec, group = cg_split
|
|
result[group_token_index] = group_token.with_role(
|
|
TokenRole.CODEC, codec=codec, group=group or "UNKNOWN"
|
|
)
|
|
codec_pre_consumed = True
|
|
else:
|
|
head, _, tail = group_token.text.rpartition("-")
|
|
result[group_token_index] = group_token.with_role(
|
|
TokenRole.GROUP, group=tail or "UNKNOWN", prefix=head
|
|
)
|
|
|
|
body_end = group_token_index # exclusive
|
|
tok_idx = 0
|
|
chunk_idx = 0
|
|
|
|
# 1) TITLE — leftmost contiguous tokens up to the first structural
|
|
# boundary. Title is special because it can be multi-token.
|
|
while (
|
|
chunk_idx < len(schema.chunks)
|
|
and schema.chunks[chunk_idx].role is TokenRole.TITLE
|
|
):
|
|
title_end = _find_title_end(result, body_end, kb)
|
|
for i in range(tok_idx, title_end):
|
|
result[i] = result[i].with_role(TokenRole.TITLE)
|
|
tok_idx = title_end
|
|
chunk_idx += 1
|
|
|
|
# 2) Remaining structural chunks. For each, scan forward in the body
|
|
# for a matching token; tokens passed over remain UNKNOWN.
|
|
for chunk in schema.chunks[chunk_idx:]:
|
|
if chunk.role is TokenRole.GROUP:
|
|
continue
|
|
if chunk.role is TokenRole.CODEC and codec_pre_consumed:
|
|
continue
|
|
|
|
match_idx = _find_chunk(result, tok_idx, body_end, chunk.role, kb)
|
|
if match_idx is None:
|
|
if chunk.optional:
|
|
continue
|
|
return None
|
|
|
|
result[match_idx] = result[match_idx].with_role(chunk.role)
|
|
tok_idx = match_idx + 1
|
|
|
|
return result
|
|
|
|
|
|
def _find_title_end(
|
|
tokens: list[Token], body_end: int, kb: ReleaseKnowledge
|
|
) -> int:
|
|
"""Return the exclusive index where the title ends.
|
|
|
|
The title is the leftmost run of tokens whose text does not match
|
|
any structural role (year, season/episode, resolution, source,
|
|
codec). Enricher tokens (audio, HDR, language) are *not* boundaries
|
|
because they can appear in the middle of the structural sequence;
|
|
however, in canonical scene names they don't appear inside the title
|
|
itself, so this heuristic holds in practice.
|
|
"""
|
|
for i in range(body_end):
|
|
text = tokens[i].text
|
|
if _parse_season_episode(text) is not None:
|
|
return i
|
|
if _is_year(text):
|
|
return i
|
|
lower = text.lower()
|
|
if lower in kb.resolutions:
|
|
return i
|
|
if lower in kb.sources:
|
|
return i
|
|
if lower in kb.codecs:
|
|
return i
|
|
# codec-GROUP token (e.g. "x265-KONTRAST") or dashed source (Web-DL).
|
|
if "-" in text:
|
|
head, _, _ = text.rpartition("-")
|
|
if (
|
|
head.lower() in kb.codecs
|
|
or head.lower() in kb.sources
|
|
or text.lower().replace("-", "") in kb.sources
|
|
):
|
|
return i
|
|
return body_end
|
|
|
|
|
|
def _find_chunk(
|
|
tokens: list[Token],
|
|
start: int,
|
|
end: int,
|
|
role: TokenRole,
|
|
kb: ReleaseKnowledge,
|
|
) -> int | None:
|
|
"""Return the first index in ``[start, end)`` whose token matches ``role``.
|
|
|
|
Returns ``None`` if no token in the range matches. Tokens already
|
|
annotated (non-UNKNOWN) are skipped — they belong to another chunk.
|
|
"""
|
|
for i in range(start, end):
|
|
if tokens[i].role is not TokenRole.UNKNOWN:
|
|
continue
|
|
if _match_role(tokens[i].text, role, kb) is not None:
|
|
return i
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 2b' — SHITTY annotation (schema-less heuristic)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _annotate_shitty(
|
|
tokens: list[Token],
|
|
kb: ReleaseKnowledge,
|
|
group_index: int | None,
|
|
) -> list[Token]:
|
|
"""Schema-less, dictionary-driven annotation.
|
|
|
|
SHITTY's job is narrow: for releases that *look* like scene names
|
|
but don't have a registered group schema, tag every token whose text
|
|
falls into a known YAML bucket (resolutions, codecs, sources, …).
|
|
Anything we can't classify stays UNKNOWN. The leftmost run of
|
|
UNKNOWN tokens becomes the title. Done.
|
|
|
|
Anything that requires more reasoning (parenthesized tech blocks,
|
|
bare-dashed title fragments, year-disguised slug suffixes, …) is
|
|
PATH OF PAIN territory and stays out of here on purpose.
|
|
"""
|
|
result = list(tokens)
|
|
|
|
# 1) Group token — split codec-GROUP or tag GROUP. Same logic as EASY.
|
|
if group_index is not None:
|
|
gt = result[group_index]
|
|
cg_split = _split_codec_group(gt.text, kb)
|
|
if cg_split is not None:
|
|
codec, group = cg_split
|
|
result[group_index] = gt.with_role(
|
|
TokenRole.CODEC, codec=codec, group=group or "UNKNOWN"
|
|
)
|
|
else:
|
|
_, _, tail = gt.text.rpartition("-")
|
|
result[group_index] = gt.with_role(
|
|
TokenRole.GROUP, group=tail or "UNKNOWN"
|
|
)
|
|
|
|
# 2) Enrichers (audio / video-meta / edition / language).
|
|
result = _annotate_enrichers(result, kb)
|
|
|
|
# 3) Single pass: tag each UNKNOWN token by looking it up in the kb
|
|
# buckets. First match wins per token, first occurrence wins per
|
|
# role (we don't overwrite an already-tagged role).
|
|
matchers: list[tuple[TokenRole, callable]] = [
|
|
(TokenRole.SEASON_EPISODE, lambda t: _parse_season_episode(t) is not None),
|
|
(TokenRole.YEAR, _is_year),
|
|
(TokenRole.RESOLUTION, lambda t: t.lower() in kb.resolutions),
|
|
(TokenRole.DISTRIBUTOR, lambda t: t.upper() in kb.distributors),
|
|
(TokenRole.SOURCE, lambda t: t.lower() in kb.sources),
|
|
(TokenRole.CODEC, lambda t: t.lower() in kb.codecs),
|
|
]
|
|
seen: set[TokenRole] = set()
|
|
|
|
for i, tok in enumerate(result):
|
|
if tok.role is not TokenRole.UNKNOWN:
|
|
continue
|
|
for role, matches in matchers:
|
|
if role in seen:
|
|
continue
|
|
if matches(tok.text):
|
|
result[i] = tok.with_role(role)
|
|
seen.add(role)
|
|
break
|
|
|
|
# 4) Title = leftmost contiguous UNKNOWN tokens.
|
|
for i, tok in enumerate(result):
|
|
if tok.role is not TokenRole.UNKNOWN:
|
|
break
|
|
result[i] = tok.with_role(TokenRole.TITLE)
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 2c — enricher pass (non-positional roles)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _annotate_enrichers(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]:
|
|
"""Tag the remaining UNKNOWN tokens with non-positional roles.
|
|
|
|
Multi-token sequences are matched first (so ``DTS.HD.MA`` wins over
|
|
a single-token ``DTS``). For each sequence match, the first token
|
|
receives the role + ``extra["sequence"]`` (the canonical joined
|
|
value), and the trailing members are marked with the same role +
|
|
``extra["sequence_member"]=True`` so :func:`assemble` extracts the
|
|
value only from the primary.
|
|
"""
|
|
result = list(tokens)
|
|
|
|
# Multi-token sequences first.
|
|
_apply_sequences(
|
|
result, kb.audio.get("sequences", []), "codec", TokenRole.AUDIO_CODEC
|
|
)
|
|
_apply_sequences(
|
|
result, kb.video_meta.get("sequences", []), "hdr", TokenRole.HDR
|
|
)
|
|
_apply_sequences(
|
|
result, kb.editions.get("sequences", []), "edition", TokenRole.EDITION
|
|
)
|
|
|
|
# Single tokens.
|
|
known_audio_codecs = {c.upper() for c in kb.audio.get("codecs", [])}
|
|
known_audio_channels = set(kb.audio.get("channels", []))
|
|
known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra
|
|
known_bit_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])}
|
|
known_editions = {t.upper() for t in kb.editions.get("tokens", [])}
|
|
|
|
# Channel layouts like "5.1" are tokenized as two tokens ("5", "1")
|
|
# because "." is a separator. Detect consecutive pairs whose joined
|
|
# value (without any trailing "-GROUP") is in the channel set.
|
|
_detect_channel_pairs(result, known_audio_channels)
|
|
|
|
for i, tok in enumerate(result):
|
|
if tok.role is not TokenRole.UNKNOWN:
|
|
continue
|
|
text = tok.text
|
|
upper = text.upper()
|
|
lower = text.lower()
|
|
|
|
if upper in known_audio_codecs:
|
|
result[i] = tok.with_role(TokenRole.AUDIO_CODEC)
|
|
continue
|
|
if text in known_audio_channels:
|
|
result[i] = tok.with_role(TokenRole.AUDIO_CHANNELS)
|
|
continue
|
|
if upper in known_hdr:
|
|
result[i] = tok.with_role(TokenRole.HDR)
|
|
continue
|
|
if lower in known_bit_depth:
|
|
result[i] = tok.with_role(TokenRole.BIT_DEPTH)
|
|
continue
|
|
if upper in known_editions:
|
|
result[i] = tok.with_role(TokenRole.EDITION)
|
|
continue
|
|
if upper in kb.language_tokens:
|
|
result[i] = tok.with_role(TokenRole.LANGUAGE)
|
|
continue
|
|
if upper in kb.distributors:
|
|
result[i] = tok.with_role(TokenRole.DISTRIBUTOR)
|
|
continue
|
|
|
|
return result
|
|
|
|
|
|
def _apply_sequences(
|
|
tokens: list[Token],
|
|
sequences: list[dict],
|
|
value_key: str,
|
|
role: TokenRole,
|
|
) -> None:
|
|
"""Mark the first occurrence of each sequence in place.
|
|
|
|
Mutates ``tokens`` (replacing entries with new role-tagged Token
|
|
instances). Sequences in the YAML must be ordered most-specific
|
|
first; the first match wins per starting position.
|
|
"""
|
|
if not sequences:
|
|
return
|
|
|
|
upper_texts = [t.text.upper() for t in tokens]
|
|
consumed: set[int] = set()
|
|
|
|
for seq in sequences:
|
|
seq_upper = [s.upper() for s in seq["tokens"]]
|
|
n = len(seq_upper)
|
|
for start in range(len(tokens) - n + 1):
|
|
if any(idx in consumed for idx in range(start, start + n)):
|
|
continue
|
|
if any(
|
|
tokens[start + k].role is not TokenRole.UNKNOWN for k in range(n)
|
|
):
|
|
continue
|
|
if upper_texts[start : start + n] == seq_upper:
|
|
tokens[start] = tokens[start].with_role(
|
|
role, sequence=seq[value_key]
|
|
)
|
|
for k in range(1, n):
|
|
tokens[start + k] = tokens[start + k].with_role(
|
|
role, sequence_member="True"
|
|
)
|
|
consumed.update(range(start, start + n))
|
|
|
|
|
|
def _detect_channel_pairs(
|
|
tokens: list[Token], known_channels: set[str]
|
|
) -> None:
|
|
"""Spot two consecutive numeric tokens that form a channel layout.
|
|
|
|
Example: ``["5", "1-KTH"]`` → joined ``"5.1"`` (after stripping the
|
|
``-GROUP`` suffix on the second). The second token may be the trailing
|
|
codec-GROUP token, in which case it's already tagged CODEC and we
|
|
skip — we'd corrupt its role.
|
|
"""
|
|
for i in range(len(tokens) - 1):
|
|
first = tokens[i]
|
|
second = tokens[i + 1]
|
|
if first.role is not TokenRole.UNKNOWN:
|
|
continue
|
|
# Strip a "-GROUP" suffix on the second token before joining.
|
|
second_text = second.text.split("-")[0]
|
|
candidate = f"{first.text}.{second_text}"
|
|
if candidate not in known_channels:
|
|
continue
|
|
# Only tag the first token (carries the channel value). The
|
|
# second token may legitimately remain UNKNOWN (or be the
|
|
# codec-GROUP token, already tagged CODEC).
|
|
tokens[i] = first.with_role(
|
|
TokenRole.AUDIO_CHANNELS, sequence=candidate
|
|
)
|
|
if second.role is TokenRole.UNKNOWN:
|
|
tokens[i + 1] = second.with_role(
|
|
TokenRole.AUDIO_CHANNELS, sequence_member="True"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 2 entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def annotate(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]:
|
|
"""Annotate token roles.
|
|
|
|
Dispatch:
|
|
|
|
* If a group is detected AND has a known schema, run the EASY
|
|
structural walk. If the schema walk aborts on a mandatory chunk
|
|
mismatch, fall through to SHITTY (the heuristic still does better
|
|
than giving up).
|
|
* Otherwise run SHITTY — schema-less, best-effort, never aborts.
|
|
|
|
The enricher pass runs in both cases. The pipeline always returns a
|
|
populated token list; downstream callers don't need to distinguish
|
|
EASY vs SHITTY at this layer (the parse_path is decided in the
|
|
service based on whether a schema matched).
|
|
"""
|
|
group_name, group_index = _detect_group(tokens, kb)
|
|
|
|
schema = kb.group_schema(group_name) if group_index is not None else None
|
|
if schema is not None and group_index is not None:
|
|
structural = _annotate_structural(tokens, kb, schema, group_index)
|
|
if structural is not None:
|
|
return _annotate_enrichers(structural, kb)
|
|
|
|
# SHITTY fallback — heuristic positional pass. ``_annotate_shitty``
|
|
# runs its own enricher pass internally (it has to, so the title
|
|
# scan can skip enricher-tagged tokens).
|
|
return _annotate_shitty(tokens, kb, group_index)
|
|
|
|
|
|
def has_known_schema(tokens: list[Token], kb: ReleaseKnowledge) -> bool:
|
|
"""Return True if ``tokens`` would take the EASY path in :func:`annotate`."""
|
|
group_name, group_index = _detect_group(tokens, kb)
|
|
if group_index is None:
|
|
return False
|
|
return kb.group_schema(group_name) is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 3 — assemble
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def assemble(
|
|
annotated: list[Token],
|
|
site_tag: str | None,
|
|
raw_name: str,
|
|
kb: ReleaseKnowledge,
|
|
) -> dict:
|
|
"""Fold annotated tokens into a ``ParsedRelease``-compatible dict.
|
|
|
|
Returns a dict (not a ``ParsedRelease`` instance) so the caller can
|
|
layer in additional fields (``parse_path``, ``raw``, …) before
|
|
instantiation.
|
|
"""
|
|
# Pure-punctuation tokens (e.g. a stray "-" left by ` - ` separators in
|
|
# human-friendly release names) carry no title content and would leak
|
|
# into the joined title as ``"Show.-.Episode"``. Drop them here.
|
|
title_parts = [
|
|
t.text
|
|
for t in annotated
|
|
if t.role is TokenRole.TITLE and any(c.isalnum() for c in t.text)
|
|
]
|
|
title = ".".join(title_parts) if title_parts else (
|
|
annotated[0].text if annotated else raw_name
|
|
)
|
|
|
|
year: int | None = None
|
|
season: int | None = None
|
|
episode: int | None = None
|
|
episode_end: int | None = None
|
|
quality: str | None = None
|
|
source: str | None = None
|
|
codec: str | None = None
|
|
group = "UNKNOWN"
|
|
audio_codec: str | None = None
|
|
audio_channels: str | None = None
|
|
bit_depth: str | None = None
|
|
hdr_format: str | None = None
|
|
edition: str | None = None
|
|
distributor: str | None = None
|
|
languages: list[str] = []
|
|
is_season_range = False
|
|
|
|
for tok in annotated:
|
|
# Skip non-primary members of a multi-token sequence.
|
|
if tok.extra.get("sequence_member") == "True":
|
|
continue
|
|
|
|
role = tok.role
|
|
if role is TokenRole.YEAR:
|
|
year = int(tok.text)
|
|
elif role is TokenRole.SEASON_EPISODE:
|
|
parsed = _parse_season_episode(tok.text)
|
|
if parsed is not None:
|
|
season, episode, episode_end = parsed
|
|
# Detect Sxx-yy range form to flag it as a multi-season pack.
|
|
upper = tok.text.upper()
|
|
if (
|
|
len(upper) == 6
|
|
and upper[0] == "S"
|
|
and upper[1:3].isdigit()
|
|
and upper[3] == "-"
|
|
and upper[4:6].isdigit()
|
|
):
|
|
is_season_range = True
|
|
elif role is TokenRole.RESOLUTION:
|
|
quality = tok.text
|
|
elif role is TokenRole.SOURCE:
|
|
source = tok.text
|
|
elif role is TokenRole.CODEC:
|
|
codec = tok.extra.get("codec", tok.text)
|
|
if "group" in tok.extra:
|
|
group = tok.extra["group"] or "UNKNOWN"
|
|
elif role is TokenRole.GROUP:
|
|
group = tok.extra.get("group", tok.text) or "UNKNOWN"
|
|
elif role is TokenRole.AUDIO_CODEC:
|
|
if audio_codec is None:
|
|
audio_codec = tok.extra.get("sequence", tok.text)
|
|
elif role is TokenRole.AUDIO_CHANNELS:
|
|
if audio_channels is None:
|
|
audio_channels = tok.extra.get("sequence", tok.text)
|
|
elif role is TokenRole.BIT_DEPTH:
|
|
if bit_depth is None:
|
|
bit_depth = tok.text.lower()
|
|
elif role is TokenRole.HDR:
|
|
if hdr_format is None:
|
|
hdr_format = tok.extra.get("sequence", tok.text.upper())
|
|
elif role is TokenRole.EDITION:
|
|
if edition is None:
|
|
edition = tok.extra.get("sequence", tok.text.upper())
|
|
elif role is TokenRole.LANGUAGE:
|
|
languages.append(tok.text.upper())
|
|
elif role is TokenRole.DISTRIBUTOR:
|
|
if distributor is None:
|
|
distributor = tok.text.upper()
|
|
|
|
tech_parts = [p for p in (quality, source, codec) if p]
|
|
tech_string = ".".join(tech_parts)
|
|
|
|
# Media type heuristic. Doc/concert/integrale tokens win over the
|
|
# generic tech-based fallback. We look across all tokens (not just
|
|
# annotated ones) because these markers may be tagged UNKNOWN by the
|
|
# structural pass — only the assemble step cares about them.
|
|
upper_tokens = {tok.text.upper() for tok in annotated}
|
|
doc_tokens = {t.upper() for t in kb.media_type_tokens.get("doc", [])}
|
|
concert_tokens = {t.upper() for t in kb.media_type_tokens.get("concert", [])}
|
|
integrale_tokens = {t.upper() for t in kb.media_type_tokens.get("integrale", [])}
|
|
|
|
if upper_tokens & doc_tokens:
|
|
media_type = MediaTypeToken.DOCUMENTARY
|
|
elif upper_tokens & concert_tokens:
|
|
media_type = MediaTypeToken.CONCERT
|
|
elif is_season_range:
|
|
media_type = MediaTypeToken.TV_COMPLETE
|
|
elif (
|
|
edition in {"COMPLETE", "INTEGRALE", "COLLECTION"}
|
|
or upper_tokens & integrale_tokens
|
|
) and season is None:
|
|
media_type = MediaTypeToken.TV_COMPLETE
|
|
elif season is not None:
|
|
media_type = MediaTypeToken.TV_SHOW
|
|
elif any((quality, source, codec, year)):
|
|
media_type = MediaTypeToken.MOVIE
|
|
else:
|
|
media_type = MediaTypeToken.UNKNOWN
|
|
|
|
return {
|
|
"title": title,
|
|
"title_sanitized": kb.sanitize_for_fs(title),
|
|
"year": year,
|
|
"season": season,
|
|
"episode": episode,
|
|
"episode_end": episode_end,
|
|
"quality": quality,
|
|
"source": source,
|
|
"codec": codec,
|
|
"group": group,
|
|
"tech_string": tech_string,
|
|
"media_type": media_type,
|
|
"site_tag": site_tag,
|
|
"languages": languages,
|
|
"audio_codec": audio_codec,
|
|
"audio_channels": audio_channels,
|
|
"bit_depth": bit_depth,
|
|
"hdr_format": hdr_format,
|
|
"edition": edition,
|
|
"distributor": distributor,
|
|
}
|