Files
alfred/alfred/domain/release/parser/pipeline.py
T
francwa b7979c0f8b refactor(release): freeze ParsedRelease + enrich_from_probe returns new instance
ParsedRelease is now @dataclass(frozen=True). The enrichment passes that
used to patch fields in place now produce new instances:

- enrich_from_probe(parsed, info, kb) returns a new ParsedRelease via
  dataclasses.replace (no allocation when no field changed).
- inspect_release rebinds 'parsed' after detect_media_type (wrapped in
  MediaTypeToken — the strict isinstance check now also runs on
  replace) and after enrich_from_probe.

languages becomes a tuple[str, ...] so the VO is properly immutable.
Parser pipeline packs languages as a tuple in the assemble dict.

Callers updated: inspect_release, testing/recognize_folders_in_downloads.py.
Tests updated: 22 enrich_from_probe call sites rebound, language
assertions switched to tuple literals, test_release_fixtures normalizes
result['languages'] back to list for YAML-fixture comparison.

Suite: 1077 passed.
2026-05-21 07:51:49 +02:00

764 lines
27 KiB
Python

"""Annotate-based pipeline.
Three stages:
1. :func:`tokenize` — release name → ``list[Token]`` (all UNKNOWN), plus
a separately-returned site tag (e.g. ``[YTS.MX]``) that is never
tokenized.
2. :func:`annotate` — promote each token's :class:`TokenRole` using the
injected knowledge base. Two sub-passes:
a. **Structural** (schema-driven, EASY only). Detects the group at
the right end, looks up its :class:`GroupSchema`, then matches
the schema's chunk sequence against the token stream. Between
two structural chunks, any number of unmatched tokens may
remain — they are left UNKNOWN for the enricher pass to handle.
b. **Enrichers** (non-positional). Walks UNKNOWN tokens and tags
audio / video-meta / edition / language roles. Multi-token
sequences (``DTS.HD.MA``, ``DV.HDR10``, ``DIRECTORS.CUT``) are
matched first, single tokens after.
3. :func:`assemble` — fold annotated tokens into a
:class:`~alfred.domain.release.value_objects.ParsedRelease`-compatible
dict.
The pipeline is **pure**: no I/O, no TMDB, no probe. All knowledge
arrives through ``kb: ReleaseKnowledge``.
"""
from __future__ import annotations
from ..ports.knowledge import ReleaseKnowledge
from ..value_objects import MediaTypeToken
from .schema import GroupSchema
from .tokens import Token, TokenRole
# ---------------------------------------------------------------------------
# Stage 1 — tokenize
# ---------------------------------------------------------------------------
def strip_site_tag(name: str) -> tuple[str, str | None]:
"""Split off a ``[site.tag]`` prefix or suffix.
Returns ``(clean_name, tag)``. If no tag is found, returns
``(name.strip(), None)``.
"""
s = name.strip()
if s.startswith("["):
close = s.find("]")
if close != -1:
tag = s[1:close].strip()
remainder = s[close + 1 :].strip()
if tag and remainder:
return remainder, tag
if s.endswith("]"):
open_bracket = s.rfind("[")
if open_bracket != -1:
tag = s[open_bracket + 1 : -1].strip()
remainder = s[:open_bracket].strip()
if tag and remainder:
return remainder, tag
return s, None
def tokenize(name: str, kb: ReleaseKnowledge) -> tuple[list[Token], str | None]:
"""Split ``name`` into tokens after stripping any site tag.
String-ops style: replace every configured separator with a single
NUL byte then split. NUL cannot legally appear in a release name, so
it's a safe sentinel.
"""
clean, site_tag = strip_site_tag(name)
DELIM = "\x00"
buf = clean
for sep in kb.separators:
if sep != DELIM:
buf = buf.replace(sep, DELIM)
pieces = [p for p in buf.split(DELIM) if p]
tokens = [Token(text=p, index=i) for i, p in enumerate(pieces)]
return tokens, site_tag
# ---------------------------------------------------------------------------
# Helpers shared across passes
# ---------------------------------------------------------------------------
def _parse_season_episode(text: str) -> tuple[int, int | None, int | None] | None:
"""Parse a single token as ``SxxExx`` / ``SxxExxExx`` / ``Sxx`` /
``Sxx-yy`` (season range) / ``NxNN``.
Returns ``(season, episode, episode_end)`` or ``None`` if the token
is not a season/episode marker. For ``Sxx-yy``, returns the first
season with no episode info — the caller is expected to detect the
range form and promote ``media_type`` to ``tv_complete`` separately.
"""
upper = text.upper()
# SxxExx form (and Sxx, Sxx-yy)
if len(upper) >= 3 and upper[0] == "S" and upper[1:3].isdigit():
season = int(upper[1:3])
rest = upper[3:]
if not rest:
return season, None, None
# Sxx-yy season-range form: capture the first season, treat as a
# complete-series marker (no episode info).
if (
len(rest) == 3
and rest[0] == "-"
and rest[1:3].isdigit()
):
return season, None, None
episodes: list[int] = []
while rest.startswith("E") and len(rest) >= 3 and rest[1:3].isdigit():
episodes.append(int(rest[1:3]))
rest = rest[3:]
if not episodes:
return None
# For chained multi-episode markers (E09E10E11), the range is the
# first → last episode. Intermediate values are implied.
return season, episodes[0], episodes[-1] if len(episodes) >= 2 else None
# NxNN form
if "X" in upper:
parts = upper.split("X")
if len(parts) >= 2 and all(p.isdigit() and p for p in parts):
season = int(parts[0])
episode = int(parts[1])
episode_end = int(parts[2]) if len(parts) >= 3 else None
return season, episode, episode_end
return None
def _is_year(text: str) -> bool:
"""Return True if ``text`` is a 4-digit year in [1900, 2099]."""
return len(text) == 4 and text.isdigit() and 1900 <= int(text) <= 2099
def _split_codec_group(text: str, kb: ReleaseKnowledge) -> tuple[str, str] | None:
"""Split a ``codec-GROUP`` token into ``(codec, group)`` if it fits.
Returns ``None`` if the token doesn't match the ``codec-GROUP``
shape. Handles the empty-group case (``x265-``) as ``(codec, "")``.
"""
if "-" not in text:
return None
head, _, tail = text.rpartition("-")
if head.lower() in kb.codecs:
return head, tail
return None
def _match_role(text: str, role: TokenRole, kb: ReleaseKnowledge) -> TokenRole | None:
"""Return ``role`` if ``text`` matches it under ``kb``, else ``None``."""
lower = text.lower()
if role is TokenRole.YEAR:
return TokenRole.YEAR if _is_year(text) else None
if role is TokenRole.SEASON_EPISODE:
return (
TokenRole.SEASON_EPISODE
if _parse_season_episode(text) is not None
else None
)
if role is TokenRole.RESOLUTION:
return TokenRole.RESOLUTION if lower in kb.resolutions else None
if role is TokenRole.SOURCE:
return TokenRole.SOURCE if lower in kb.sources else None
if role is TokenRole.CODEC:
return TokenRole.CODEC if lower in kb.codecs else None
return None
# ---------------------------------------------------------------------------
# Stage 2a — group detection
# ---------------------------------------------------------------------------
def _detect_group(tokens: list[Token], kb: ReleaseKnowledge) -> tuple[str, int | None]:
"""Identify the release group by walking tokens right-to-left.
Returns ``(group_name, token_index_carrying_group)``. ``index`` is
``None`` when the group is absent (no trailing ``-`` in the stream).
"""
# Priority 1: codec-GROUP shape (clearest signal).
for tok in reversed(tokens):
split = _split_codec_group(tok.text, kb)
if split is not None:
_, group = split
return (group or "UNKNOWN"), tok.index
# Priority 2: rightmost dash, excluding dashed sources (Web-DL, etc.).
for tok in reversed(tokens):
if "-" not in tok.text:
continue
head, _, tail = tok.text.rpartition("-")
if (
head.lower() in kb.sources
or tok.text.lower().replace("-", "") in kb.sources
):
continue
if tail:
return tail, tok.index
return "UNKNOWN", None
# ---------------------------------------------------------------------------
# Stage 2b — structural annotation (schema-driven)
# ---------------------------------------------------------------------------
def _annotate_structural(
tokens: list[Token],
kb: ReleaseKnowledge,
schema: GroupSchema,
group_token_index: int,
) -> list[Token] | None:
"""Annotate structural tokens following a known group schema.
Walks the schema's chunks against the body (tokens up to the group
token). For each chunk, scans forward in the body for a matching
token — tokens passed over without match are left UNKNOWN (the
enricher pass will handle them).
Returns ``None`` if any mandatory chunk fails to find a match.
"""
result = list(tokens)
# The codec-GROUP token carries CODEC + GROUP. Split it now so the
# schema walk knows the codec is "pre-consumed" at the end.
group_token = result[group_token_index]
cg_split = _split_codec_group(group_token.text, kb)
codec_pre_consumed = False
if cg_split is not None:
codec, group = cg_split
result[group_token_index] = group_token.with_role(
TokenRole.CODEC, codec=codec, group=group or "UNKNOWN"
)
codec_pre_consumed = True
else:
head, _, tail = group_token.text.rpartition("-")
result[group_token_index] = group_token.with_role(
TokenRole.GROUP, group=tail or "UNKNOWN", prefix=head
)
body_end = group_token_index # exclusive
tok_idx = 0
chunk_idx = 0
# 1) TITLE — leftmost contiguous tokens up to the first structural
# boundary. Title is special because it can be multi-token.
while (
chunk_idx < len(schema.chunks)
and schema.chunks[chunk_idx].role is TokenRole.TITLE
):
title_end = _find_title_end(result, body_end, kb)
for i in range(tok_idx, title_end):
result[i] = result[i].with_role(TokenRole.TITLE)
tok_idx = title_end
chunk_idx += 1
# 2) Remaining structural chunks. For each, scan forward in the body
# for a matching token; tokens passed over remain UNKNOWN.
for chunk in schema.chunks[chunk_idx:]:
if chunk.role is TokenRole.GROUP:
continue
if chunk.role is TokenRole.CODEC and codec_pre_consumed:
continue
match_idx = _find_chunk(result, tok_idx, body_end, chunk.role, kb)
if match_idx is None:
if chunk.optional:
continue
return None
result[match_idx] = result[match_idx].with_role(chunk.role)
tok_idx = match_idx + 1
return result
def _find_title_end(
tokens: list[Token], body_end: int, kb: ReleaseKnowledge
) -> int:
"""Return the exclusive index where the title ends.
The title is the leftmost run of tokens whose text does not match
any structural role (year, season/episode, resolution, source,
codec). Enricher tokens (audio, HDR, language) are *not* boundaries
because they can appear in the middle of the structural sequence;
however, in canonical scene names they don't appear inside the title
itself, so this heuristic holds in practice.
"""
for i in range(body_end):
text = tokens[i].text
if _parse_season_episode(text) is not None:
return i
if _is_year(text):
return i
lower = text.lower()
if lower in kb.resolutions:
return i
if lower in kb.sources:
return i
if lower in kb.codecs:
return i
# codec-GROUP token (e.g. "x265-KONTRAST") or dashed source (Web-DL).
if "-" in text:
head, _, _ = text.rpartition("-")
if (
head.lower() in kb.codecs
or head.lower() in kb.sources
or text.lower().replace("-", "") in kb.sources
):
return i
return body_end
def _find_chunk(
tokens: list[Token],
start: int,
end: int,
role: TokenRole,
kb: ReleaseKnowledge,
) -> int | None:
"""Return the first index in ``[start, end)`` whose token matches ``role``.
Returns ``None`` if no token in the range matches. Tokens already
annotated (non-UNKNOWN) are skipped — they belong to another chunk.
"""
for i in range(start, end):
if tokens[i].role is not TokenRole.UNKNOWN:
continue
if _match_role(tokens[i].text, role, kb) is not None:
return i
return None
# ---------------------------------------------------------------------------
# Stage 2b' — SHITTY annotation (schema-less heuristic)
# ---------------------------------------------------------------------------
def _annotate_shitty(
tokens: list[Token],
kb: ReleaseKnowledge,
group_index: int | None,
) -> list[Token]:
"""Schema-less, dictionary-driven annotation.
SHITTY's job is narrow: for releases that *look* like scene names
but don't have a registered group schema, tag every token whose text
falls into a known YAML bucket (resolutions, codecs, sources, …).
Anything we can't classify stays UNKNOWN. The leftmost run of
UNKNOWN tokens becomes the title. Done.
Anything that requires more reasoning (parenthesized tech blocks,
bare-dashed title fragments, year-disguised slug suffixes, …) is
PATH OF PAIN territory and stays out of here on purpose.
"""
result = list(tokens)
# 1) Group token — split codec-GROUP or tag GROUP. Same logic as EASY.
if group_index is not None:
gt = result[group_index]
cg_split = _split_codec_group(gt.text, kb)
if cg_split is not None:
codec, group = cg_split
result[group_index] = gt.with_role(
TokenRole.CODEC, codec=codec, group=group or "UNKNOWN"
)
else:
_, _, tail = gt.text.rpartition("-")
result[group_index] = gt.with_role(
TokenRole.GROUP, group=tail or "UNKNOWN"
)
# 2) Enrichers (audio / video-meta / edition / language).
result = _annotate_enrichers(result, kb)
# 3) Single pass: tag each UNKNOWN token by looking it up in the kb
# buckets. First match wins per token, first occurrence wins per
# role (we don't overwrite an already-tagged role).
matchers: list[tuple[TokenRole, callable]] = [
(TokenRole.SEASON_EPISODE, lambda t: _parse_season_episode(t) is not None),
(TokenRole.YEAR, _is_year),
(TokenRole.RESOLUTION, lambda t: t.lower() in kb.resolutions),
(TokenRole.DISTRIBUTOR, lambda t: t.upper() in kb.distributors),
(TokenRole.SOURCE, lambda t: t.lower() in kb.sources),
(TokenRole.CODEC, lambda t: t.lower() in kb.codecs),
]
seen: set[TokenRole] = set()
for i, tok in enumerate(result):
if tok.role is not TokenRole.UNKNOWN:
continue
for role, matches in matchers:
if role in seen:
continue
if matches(tok.text):
result[i] = tok.with_role(role)
seen.add(role)
break
# 4) Title = leftmost contiguous UNKNOWN tokens.
for i, tok in enumerate(result):
if tok.role is not TokenRole.UNKNOWN:
break
result[i] = tok.with_role(TokenRole.TITLE)
return result
# ---------------------------------------------------------------------------
# Stage 2c — enricher pass (non-positional roles)
# ---------------------------------------------------------------------------
def _annotate_enrichers(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]:
"""Tag the remaining UNKNOWN tokens with non-positional roles.
Multi-token sequences are matched first (so ``DTS.HD.MA`` wins over
a single-token ``DTS``). For each sequence match, the first token
receives the role + ``extra["sequence"]`` (the canonical joined
value), and the trailing members are marked with the same role +
``extra["sequence_member"]=True`` so :func:`assemble` extracts the
value only from the primary.
"""
result = list(tokens)
# Multi-token sequences first.
_apply_sequences(
result, kb.audio.get("sequences", []), "codec", TokenRole.AUDIO_CODEC
)
_apply_sequences(
result, kb.video_meta.get("sequences", []), "hdr", TokenRole.HDR
)
_apply_sequences(
result, kb.editions.get("sequences", []), "edition", TokenRole.EDITION
)
# Single tokens.
known_audio_codecs = {c.upper() for c in kb.audio.get("codecs", [])}
known_audio_channels = set(kb.audio.get("channels", []))
known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra
known_bit_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])}
known_editions = {t.upper() for t in kb.editions.get("tokens", [])}
# Channel layouts like "5.1" are tokenized as two tokens ("5", "1")
# because "." is a separator. Detect consecutive pairs whose joined
# value (without any trailing "-GROUP") is in the channel set.
_detect_channel_pairs(result, known_audio_channels)
for i, tok in enumerate(result):
if tok.role is not TokenRole.UNKNOWN:
continue
text = tok.text
upper = text.upper()
lower = text.lower()
if upper in known_audio_codecs:
result[i] = tok.with_role(TokenRole.AUDIO_CODEC)
continue
if text in known_audio_channels:
result[i] = tok.with_role(TokenRole.AUDIO_CHANNELS)
continue
if upper in known_hdr:
result[i] = tok.with_role(TokenRole.HDR)
continue
if lower in known_bit_depth:
result[i] = tok.with_role(TokenRole.BIT_DEPTH)
continue
if upper in known_editions:
result[i] = tok.with_role(TokenRole.EDITION)
continue
if upper in kb.language_tokens:
result[i] = tok.with_role(TokenRole.LANGUAGE)
continue
if upper in kb.distributors:
result[i] = tok.with_role(TokenRole.DISTRIBUTOR)
continue
return result
def _apply_sequences(
tokens: list[Token],
sequences: list[dict],
value_key: str,
role: TokenRole,
) -> None:
"""Mark the first occurrence of each sequence in place.
Mutates ``tokens`` (replacing entries with new role-tagged Token
instances). Sequences in the YAML must be ordered most-specific
first; the first match wins per starting position.
"""
if not sequences:
return
upper_texts = [t.text.upper() for t in tokens]
consumed: set[int] = set()
for seq in sequences:
seq_upper = [s.upper() for s in seq["tokens"]]
n = len(seq_upper)
for start in range(len(tokens) - n + 1):
if any(idx in consumed for idx in range(start, start + n)):
continue
if any(
tokens[start + k].role is not TokenRole.UNKNOWN for k in range(n)
):
continue
if upper_texts[start : start + n] == seq_upper:
tokens[start] = tokens[start].with_role(
role, sequence=seq[value_key]
)
for k in range(1, n):
tokens[start + k] = tokens[start + k].with_role(
role, sequence_member="True"
)
consumed.update(range(start, start + n))
def _detect_channel_pairs(
tokens: list[Token], known_channels: set[str]
) -> None:
"""Spot two consecutive numeric tokens that form a channel layout.
Example: ``["5", "1-KTH"]`` → joined ``"5.1"`` (after stripping the
``-GROUP`` suffix on the second). The second token may be the trailing
codec-GROUP token, in which case it's already tagged CODEC and we
skip — we'd corrupt its role.
"""
for i in range(len(tokens) - 1):
first = tokens[i]
second = tokens[i + 1]
if first.role is not TokenRole.UNKNOWN:
continue
# Strip a "-GROUP" suffix on the second token before joining.
second_text = second.text.split("-")[0]
candidate = f"{first.text}.{second_text}"
if candidate not in known_channels:
continue
# Only tag the first token (carries the channel value). The
# second token may legitimately remain UNKNOWN (or be the
# codec-GROUP token, already tagged CODEC).
tokens[i] = first.with_role(
TokenRole.AUDIO_CHANNELS, sequence=candidate
)
if second.role is TokenRole.UNKNOWN:
tokens[i + 1] = second.with_role(
TokenRole.AUDIO_CHANNELS, sequence_member="True"
)
# ---------------------------------------------------------------------------
# Stage 2 entry point
# ---------------------------------------------------------------------------
def annotate(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]:
"""Annotate token roles.
Dispatch:
* If a group is detected AND has a known schema, run the EASY
structural walk. If the schema walk aborts on a mandatory chunk
mismatch, fall through to SHITTY (the heuristic still does better
than giving up).
* Otherwise run SHITTY — schema-less, best-effort, never aborts.
The enricher pass runs in both cases. The pipeline always returns a
populated token list; downstream callers don't need to distinguish
EASY vs SHITTY at this layer (the parse_path is decided in the
service based on whether a schema matched).
"""
group_name, group_index = _detect_group(tokens, kb)
schema = kb.group_schema(group_name) if group_index is not None else None
if schema is not None and group_index is not None:
structural = _annotate_structural(tokens, kb, schema, group_index)
if structural is not None:
return _annotate_enrichers(structural, kb)
# SHITTY fallback — heuristic positional pass. ``_annotate_shitty``
# runs its own enricher pass internally (it has to, so the title
# scan can skip enricher-tagged tokens).
return _annotate_shitty(tokens, kb, group_index)
def has_known_schema(tokens: list[Token], kb: ReleaseKnowledge) -> bool:
"""Return True if ``tokens`` would take the EASY path in :func:`annotate`."""
group_name, group_index = _detect_group(tokens, kb)
if group_index is None:
return False
return kb.group_schema(group_name) is not None
# ---------------------------------------------------------------------------
# Stage 3 — assemble
# ---------------------------------------------------------------------------
def assemble(
annotated: list[Token],
site_tag: str | None,
raw_name: str,
kb: ReleaseKnowledge,
) -> dict:
"""Fold annotated tokens into a ``ParsedRelease``-compatible dict.
Returns a dict (not a ``ParsedRelease`` instance) so the caller can
layer in additional fields (``parse_path``, ``raw``, …) before
instantiation.
"""
# Pure-punctuation tokens (e.g. a stray "-" left by ` - ` separators in
# human-friendly release names) carry no title content and would leak
# into the joined title as ``"Show.-.Episode"``. Drop them here.
title_parts = [
t.text
for t in annotated
if t.role is TokenRole.TITLE and any(c.isalnum() for c in t.text)
]
title = ".".join(title_parts) if title_parts else (
annotated[0].text if annotated else raw_name
)
year: int | None = None
season: int | None = None
episode: int | None = None
episode_end: int | None = None
quality: str | None = None
source: str | None = None
codec: str | None = None
group = "UNKNOWN"
audio_codec: str | None = None
audio_channels: str | None = None
bit_depth: str | None = None
hdr_format: str | None = None
edition: str | None = None
distributor: str | None = None
languages: list[str] = []
is_season_range = False
for tok in annotated:
# Skip non-primary members of a multi-token sequence.
if tok.extra.get("sequence_member") == "True":
continue
role = tok.role
if role is TokenRole.YEAR:
year = int(tok.text)
elif role is TokenRole.SEASON_EPISODE:
parsed = _parse_season_episode(tok.text)
if parsed is not None:
season, episode, episode_end = parsed
# Detect Sxx-yy range form to flag it as a multi-season pack.
upper = tok.text.upper()
if (
len(upper) == 6
and upper[0] == "S"
and upper[1:3].isdigit()
and upper[3] == "-"
and upper[4:6].isdigit()
):
is_season_range = True
elif role is TokenRole.RESOLUTION:
quality = tok.text
elif role is TokenRole.SOURCE:
source = tok.text
elif role is TokenRole.CODEC:
codec = tok.extra.get("codec", tok.text)
if "group" in tok.extra:
group = tok.extra["group"] or "UNKNOWN"
elif role is TokenRole.GROUP:
group = tok.extra.get("group", tok.text) or "UNKNOWN"
elif role is TokenRole.AUDIO_CODEC:
if audio_codec is None:
audio_codec = tok.extra.get("sequence", tok.text)
elif role is TokenRole.AUDIO_CHANNELS:
if audio_channels is None:
audio_channels = tok.extra.get("sequence", tok.text)
elif role is TokenRole.BIT_DEPTH:
if bit_depth is None:
bit_depth = tok.text.lower()
elif role is TokenRole.HDR:
if hdr_format is None:
hdr_format = tok.extra.get("sequence", tok.text.upper())
elif role is TokenRole.EDITION:
if edition is None:
edition = tok.extra.get("sequence", tok.text.upper())
elif role is TokenRole.LANGUAGE:
languages.append(tok.text.upper())
elif role is TokenRole.DISTRIBUTOR:
if distributor is None:
distributor = tok.text.upper()
# Media type heuristic. Doc/concert/integrale tokens win over the
# generic tech-based fallback. We look across all tokens (not just
# annotated ones) because these markers may be tagged UNKNOWN by the
# structural pass — only the assemble step cares about them.
upper_tokens = {tok.text.upper() for tok in annotated}
doc_tokens = {t.upper() for t in kb.media_type_tokens.get("doc", [])}
concert_tokens = {t.upper() for t in kb.media_type_tokens.get("concert", [])}
integrale_tokens = {t.upper() for t in kb.media_type_tokens.get("integrale", [])}
if upper_tokens & doc_tokens:
media_type = MediaTypeToken.DOCUMENTARY
elif upper_tokens & concert_tokens:
media_type = MediaTypeToken.CONCERT
elif is_season_range:
media_type = MediaTypeToken.TV_COMPLETE
elif (
edition in {"COMPLETE", "INTEGRALE", "COLLECTION"}
or upper_tokens & integrale_tokens
) and season is None:
media_type = MediaTypeToken.TV_COMPLETE
elif season is not None:
media_type = MediaTypeToken.TV_SHOW
elif any((quality, source, codec, year)):
media_type = MediaTypeToken.MOVIE
else:
media_type = MediaTypeToken.UNKNOWN
return {
"title": title,
"title_sanitized": kb.sanitize_for_fs(title),
"year": year,
"season": season,
"episode": episode,
"episode_end": episode_end,
"quality": quality,
"source": source,
"codec": codec,
"group": group,
"media_type": media_type,
"site_tag": site_tag,
"languages": tuple(languages),
"audio_codec": audio_codec,
"audio_channels": audio_channels,
"bit_depth": bit_depth,
"hdr_format": hdr_format,
"edition": edition,
"distributor": distributor,
}