"""Annotate-based pipeline. Three stages: 1. :func:`tokenize` — release name → ``list[Token]`` (all UNKNOWN), plus a separately-returned site tag (e.g. ``[YTS.MX]``) that is never tokenized. 2. :func:`annotate` — promote each token's :class:`TokenRole` using the injected knowledge base. Two sub-passes: a. **Structural** (schema-driven, EASY only). Detects the group at the right end, looks up its :class:`GroupSchema`, then matches the schema's chunk sequence against the token stream. Between two structural chunks, any number of unmatched tokens may remain — they are left UNKNOWN for the enricher pass to handle. b. **Enrichers** (non-positional). Walks UNKNOWN tokens and tags audio / video-meta / edition / language roles. Multi-token sequences (``DTS.HD.MA``, ``DV.HDR10``, ``DIRECTORS.CUT``) are matched first, single tokens after. 3. :func:`assemble` — fold annotated tokens into a :class:`~alfred.domain.release.value_objects.ParsedRelease`-compatible dict. The pipeline is **pure**: no I/O, no TMDB, no probe. All knowledge arrives through ``kb: ReleaseKnowledge``. """ from __future__ import annotations from ..ports.knowledge import ReleaseKnowledge from ..value_objects import MediaTypeToken from .schema import GroupSchema from .tokens import Token, TokenRole # --------------------------------------------------------------------------- # Stage 1 — tokenize # --------------------------------------------------------------------------- def strip_site_tag(name: str) -> tuple[str, str | None]: """Split off a ``[site.tag]`` prefix or suffix. Returns ``(clean_name, tag)``. If no tag is found, returns ``(name.strip(), None)``. """ s = name.strip() if s.startswith("["): close = s.find("]") if close != -1: tag = s[1:close].strip() remainder = s[close + 1 :].strip() if tag and remainder: return remainder, tag if s.endswith("]"): open_bracket = s.rfind("[") if open_bracket != -1: tag = s[open_bracket + 1 : -1].strip() remainder = s[:open_bracket].strip() if tag and remainder: return remainder, tag return s, None def tokenize(name: str, kb: ReleaseKnowledge) -> tuple[list[Token], str | None]: """Split ``name`` into tokens after stripping any site tag. String-ops style: replace every configured separator with a single NUL byte then split. NUL cannot legally appear in a release name, so it's a safe sentinel. """ clean, site_tag = strip_site_tag(name) DELIM = "\x00" buf = clean for sep in kb.separators: if sep != DELIM: buf = buf.replace(sep, DELIM) pieces = [p for p in buf.split(DELIM) if p] tokens = [Token(text=p, index=i) for i, p in enumerate(pieces)] return tokens, site_tag # --------------------------------------------------------------------------- # Helpers shared across passes # --------------------------------------------------------------------------- def _parse_season_episode(text: str) -> tuple[int, int | None, int | None] | None: """Parse a single token as ``SxxExx`` / ``SxxExxExx`` / ``Sxx`` / ``Sxx-yy`` (season range) / ``NxNN``. Returns ``(season, episode, episode_end)`` or ``None`` if the token is not a season/episode marker. For ``Sxx-yy``, returns the first season with no episode info — the caller is expected to detect the range form and promote ``media_type`` to ``tv_complete`` separately. """ upper = text.upper() # SxxExx form (and Sxx, Sxx-yy) if len(upper) >= 3 and upper[0] == "S" and upper[1:3].isdigit(): season = int(upper[1:3]) rest = upper[3:] if not rest: return season, None, None # Sxx-yy season-range form: capture the first season, treat as a # complete-series marker (no episode info). if ( len(rest) == 3 and rest[0] == "-" and rest[1:3].isdigit() ): return season, None, None episodes: list[int] = [] while rest.startswith("E") and len(rest) >= 3 and rest[1:3].isdigit(): episodes.append(int(rest[1:3])) rest = rest[3:] if not episodes: return None # For chained multi-episode markers (E09E10E11), the range is the # first → last episode. Intermediate values are implied. return season, episodes[0], episodes[-1] if len(episodes) >= 2 else None # NxNN form if "X" in upper: parts = upper.split("X") if len(parts) >= 2 and all(p.isdigit() and p for p in parts): season = int(parts[0]) episode = int(parts[1]) episode_end = int(parts[2]) if len(parts) >= 3 else None return season, episode, episode_end return None def _is_year(text: str) -> bool: """Return True if ``text`` is a 4-digit year in [1900, 2099].""" return len(text) == 4 and text.isdigit() and 1900 <= int(text) <= 2099 def _split_codec_group(text: str, kb: ReleaseKnowledge) -> tuple[str, str] | None: """Split a ``codec-GROUP`` token into ``(codec, group)`` if it fits. Returns ``None`` if the token doesn't match the ``codec-GROUP`` shape. Handles the empty-group case (``x265-``) as ``(codec, "")``. """ if "-" not in text: return None head, _, tail = text.rpartition("-") if head.lower() in kb.codecs: return head, tail return None def _match_role(text: str, role: TokenRole, kb: ReleaseKnowledge) -> TokenRole | None: """Return ``role`` if ``text`` matches it under ``kb``, else ``None``.""" lower = text.lower() if role is TokenRole.YEAR: return TokenRole.YEAR if _is_year(text) else None if role is TokenRole.SEASON_EPISODE: return ( TokenRole.SEASON_EPISODE if _parse_season_episode(text) is not None else None ) if role is TokenRole.RESOLUTION: return TokenRole.RESOLUTION if lower in kb.resolutions else None if role is TokenRole.SOURCE: return TokenRole.SOURCE if lower in kb.sources else None if role is TokenRole.CODEC: return TokenRole.CODEC if lower in kb.codecs else None return None # --------------------------------------------------------------------------- # Stage 2a — group detection # --------------------------------------------------------------------------- def _detect_group(tokens: list[Token], kb: ReleaseKnowledge) -> tuple[str, int | None]: """Identify the release group by walking tokens right-to-left. Returns ``(group_name, token_index_carrying_group)``. ``index`` is ``None`` when the group is absent (no trailing ``-`` in the stream). """ # Priority 1: codec-GROUP shape (clearest signal). for tok in reversed(tokens): split = _split_codec_group(tok.text, kb) if split is not None: _, group = split return (group or "UNKNOWN"), tok.index # Priority 2: rightmost dash, excluding dashed sources (Web-DL, etc.). for tok in reversed(tokens): if "-" not in tok.text: continue head, _, tail = tok.text.rpartition("-") if ( head.lower() in kb.sources or tok.text.lower().replace("-", "") in kb.sources ): continue if tail: return tail, tok.index return "UNKNOWN", None # --------------------------------------------------------------------------- # Stage 2b — structural annotation (schema-driven) # --------------------------------------------------------------------------- def _annotate_structural( tokens: list[Token], kb: ReleaseKnowledge, schema: GroupSchema, group_token_index: int, ) -> list[Token] | None: """Annotate structural tokens following a known group schema. Walks the schema's chunks against the body (tokens up to the group token). For each chunk, scans forward in the body for a matching token — tokens passed over without match are left UNKNOWN (the enricher pass will handle them). Returns ``None`` if any mandatory chunk fails to find a match. """ result = list(tokens) # The codec-GROUP token carries CODEC + GROUP. Split it now so the # schema walk knows the codec is "pre-consumed" at the end. group_token = result[group_token_index] cg_split = _split_codec_group(group_token.text, kb) codec_pre_consumed = False if cg_split is not None: codec, group = cg_split result[group_token_index] = group_token.with_role( TokenRole.CODEC, codec=codec, group=group or "UNKNOWN" ) codec_pre_consumed = True else: head, _, tail = group_token.text.rpartition("-") result[group_token_index] = group_token.with_role( TokenRole.GROUP, group=tail or "UNKNOWN", prefix=head ) body_end = group_token_index # exclusive tok_idx = 0 chunk_idx = 0 # 1) TITLE — leftmost contiguous tokens up to the first structural # boundary. Title is special because it can be multi-token. while ( chunk_idx < len(schema.chunks) and schema.chunks[chunk_idx].role is TokenRole.TITLE ): title_end = _find_title_end(result, body_end, kb) for i in range(tok_idx, title_end): result[i] = result[i].with_role(TokenRole.TITLE) tok_idx = title_end chunk_idx += 1 # 2) Remaining structural chunks. For each, scan forward in the body # for a matching token; tokens passed over remain UNKNOWN. for chunk in schema.chunks[chunk_idx:]: if chunk.role is TokenRole.GROUP: continue if chunk.role is TokenRole.CODEC and codec_pre_consumed: continue match_idx = _find_chunk(result, tok_idx, body_end, chunk.role, kb) if match_idx is None: if chunk.optional: continue return None result[match_idx] = result[match_idx].with_role(chunk.role) tok_idx = match_idx + 1 return result def _find_title_end( tokens: list[Token], body_end: int, kb: ReleaseKnowledge ) -> int: """Return the exclusive index where the title ends. The title is the leftmost run of tokens whose text does not match any structural role (year, season/episode, resolution, source, codec). Enricher tokens (audio, HDR, language) are *not* boundaries because they can appear in the middle of the structural sequence; however, in canonical scene names they don't appear inside the title itself, so this heuristic holds in practice. """ for i in range(body_end): text = tokens[i].text if _parse_season_episode(text) is not None: return i if _is_year(text): return i lower = text.lower() if lower in kb.resolutions: return i if lower in kb.sources: return i if lower in kb.codecs: return i # codec-GROUP token (e.g. "x265-KONTRAST") or dashed source (Web-DL). if "-" in text: head, _, _ = text.rpartition("-") if ( head.lower() in kb.codecs or head.lower() in kb.sources or text.lower().replace("-", "") in kb.sources ): return i return body_end def _find_chunk( tokens: list[Token], start: int, end: int, role: TokenRole, kb: ReleaseKnowledge, ) -> int | None: """Return the first index in ``[start, end)`` whose token matches ``role``. Returns ``None`` if no token in the range matches. Tokens already annotated (non-UNKNOWN) are skipped — they belong to another chunk. """ for i in range(start, end): if tokens[i].role is not TokenRole.UNKNOWN: continue if _match_role(tokens[i].text, role, kb) is not None: return i return None # --------------------------------------------------------------------------- # Stage 2b' — SHITTY annotation (schema-less heuristic) # --------------------------------------------------------------------------- def _annotate_shitty( tokens: list[Token], kb: ReleaseKnowledge, group_index: int | None, ) -> list[Token]: """Schema-less, dictionary-driven annotation. SHITTY's job is narrow: for releases that *look* like scene names but don't have a registered group schema, tag every token whose text falls into a known YAML bucket (resolutions, codecs, sources, …). Anything we can't classify stays UNKNOWN. The leftmost run of UNKNOWN tokens becomes the title. Done. Anything that requires more reasoning (parenthesized tech blocks, bare-dashed title fragments, year-disguised slug suffixes, …) is PATH OF PAIN territory and stays out of here on purpose. """ result = list(tokens) # 1) Group token — split codec-GROUP or tag GROUP. Same logic as EASY. if group_index is not None: gt = result[group_index] cg_split = _split_codec_group(gt.text, kb) if cg_split is not None: codec, group = cg_split result[group_index] = gt.with_role( TokenRole.CODEC, codec=codec, group=group or "UNKNOWN" ) else: _, _, tail = gt.text.rpartition("-") result[group_index] = gt.with_role( TokenRole.GROUP, group=tail or "UNKNOWN" ) # 2) Enrichers (audio / video-meta / edition / language). result = _annotate_enrichers(result, kb) # 3) Single pass: tag each UNKNOWN token by looking it up in the kb # buckets. First match wins per token, first occurrence wins per # role (we don't overwrite an already-tagged role). matchers: list[tuple[TokenRole, callable]] = [ (TokenRole.SEASON_EPISODE, lambda t: _parse_season_episode(t) is not None), (TokenRole.YEAR, _is_year), (TokenRole.RESOLUTION, lambda t: t.lower() in kb.resolutions), (TokenRole.DISTRIBUTOR, lambda t: t.upper() in kb.distributors), (TokenRole.SOURCE, lambda t: t.lower() in kb.sources), (TokenRole.CODEC, lambda t: t.lower() in kb.codecs), ] seen: set[TokenRole] = set() for i, tok in enumerate(result): if tok.role is not TokenRole.UNKNOWN: continue for role, matches in matchers: if role in seen: continue if matches(tok.text): result[i] = tok.with_role(role) seen.add(role) break # 4) Title = leftmost contiguous UNKNOWN tokens. for i, tok in enumerate(result): if tok.role is not TokenRole.UNKNOWN: break result[i] = tok.with_role(TokenRole.TITLE) return result # --------------------------------------------------------------------------- # Stage 2c — enricher pass (non-positional roles) # --------------------------------------------------------------------------- def _annotate_enrichers(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]: """Tag the remaining UNKNOWN tokens with non-positional roles. Multi-token sequences are matched first (so ``DTS.HD.MA`` wins over a single-token ``DTS``). For each sequence match, the first token receives the role + ``extra["sequence"]`` (the canonical joined value), and the trailing members are marked with the same role + ``extra["sequence_member"]=True`` so :func:`assemble` extracts the value only from the primary. """ result = list(tokens) # Multi-token sequences first. _apply_sequences( result, kb.audio.get("sequences", []), "codec", TokenRole.AUDIO_CODEC ) _apply_sequences( result, kb.video_meta.get("sequences", []), "hdr", TokenRole.HDR ) _apply_sequences( result, kb.editions.get("sequences", []), "edition", TokenRole.EDITION ) # Single tokens. known_audio_codecs = {c.upper() for c in kb.audio.get("codecs", [])} known_audio_channels = set(kb.audio.get("channels", [])) known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra known_bit_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])} known_editions = {t.upper() for t in kb.editions.get("tokens", [])} # Channel layouts like "5.1" are tokenized as two tokens ("5", "1") # because "." is a separator. Detect consecutive pairs whose joined # value (without any trailing "-GROUP") is in the channel set. _detect_channel_pairs(result, known_audio_channels) for i, tok in enumerate(result): if tok.role is not TokenRole.UNKNOWN: continue text = tok.text upper = text.upper() lower = text.lower() if upper in known_audio_codecs: result[i] = tok.with_role(TokenRole.AUDIO_CODEC) continue if text in known_audio_channels: result[i] = tok.with_role(TokenRole.AUDIO_CHANNELS) continue if upper in known_hdr: result[i] = tok.with_role(TokenRole.HDR) continue if lower in known_bit_depth: result[i] = tok.with_role(TokenRole.BIT_DEPTH) continue if upper in known_editions: result[i] = tok.with_role(TokenRole.EDITION) continue if upper in kb.language_tokens: result[i] = tok.with_role(TokenRole.LANGUAGE) continue if upper in kb.distributors: result[i] = tok.with_role(TokenRole.DISTRIBUTOR) continue return result def _apply_sequences( tokens: list[Token], sequences: list[dict], value_key: str, role: TokenRole, ) -> None: """Mark the first occurrence of each sequence in place. Mutates ``tokens`` (replacing entries with new role-tagged Token instances). Sequences in the YAML must be ordered most-specific first; the first match wins per starting position. """ if not sequences: return upper_texts = [t.text.upper() for t in tokens] consumed: set[int] = set() for seq in sequences: seq_upper = [s.upper() for s in seq["tokens"]] n = len(seq_upper) for start in range(len(tokens) - n + 1): if any(idx in consumed for idx in range(start, start + n)): continue if any( tokens[start + k].role is not TokenRole.UNKNOWN for k in range(n) ): continue if upper_texts[start : start + n] == seq_upper: tokens[start] = tokens[start].with_role( role, sequence=seq[value_key] ) for k in range(1, n): tokens[start + k] = tokens[start + k].with_role( role, sequence_member="True" ) consumed.update(range(start, start + n)) def _detect_channel_pairs( tokens: list[Token], known_channels: set[str] ) -> None: """Spot two consecutive numeric tokens that form a channel layout. Example: ``["5", "1-KTH"]`` → joined ``"5.1"`` (after stripping the ``-GROUP`` suffix on the second). The second token may be the trailing codec-GROUP token, in which case it's already tagged CODEC and we skip — we'd corrupt its role. """ for i in range(len(tokens) - 1): first = tokens[i] second = tokens[i + 1] if first.role is not TokenRole.UNKNOWN: continue # Strip a "-GROUP" suffix on the second token before joining. second_text = second.text.split("-")[0] candidate = f"{first.text}.{second_text}" if candidate not in known_channels: continue # Only tag the first token (carries the channel value). The # second token may legitimately remain UNKNOWN (or be the # codec-GROUP token, already tagged CODEC). tokens[i] = first.with_role( TokenRole.AUDIO_CHANNELS, sequence=candidate ) if second.role is TokenRole.UNKNOWN: tokens[i + 1] = second.with_role( TokenRole.AUDIO_CHANNELS, sequence_member="True" ) # --------------------------------------------------------------------------- # Stage 2 entry point # --------------------------------------------------------------------------- def annotate(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]: """Annotate token roles. Dispatch: * If a group is detected AND has a known schema, run the EASY structural walk. If the schema walk aborts on a mandatory chunk mismatch, fall through to SHITTY (the heuristic still does better than giving up). * Otherwise run SHITTY — schema-less, best-effort, never aborts. The enricher pass runs in both cases. The pipeline always returns a populated token list; downstream callers don't need to distinguish EASY vs SHITTY at this layer (the parse_path is decided in the service based on whether a schema matched). """ group_name, group_index = _detect_group(tokens, kb) schema = kb.group_schema(group_name) if group_index is not None else None if schema is not None and group_index is not None: structural = _annotate_structural(tokens, kb, schema, group_index) if structural is not None: return _annotate_enrichers(structural, kb) # SHITTY fallback — heuristic positional pass. ``_annotate_shitty`` # runs its own enricher pass internally (it has to, so the title # scan can skip enricher-tagged tokens). return _annotate_shitty(tokens, kb, group_index) def has_known_schema(tokens: list[Token], kb: ReleaseKnowledge) -> bool: """Return True if ``tokens`` would take the EASY path in :func:`annotate`.""" group_name, group_index = _detect_group(tokens, kb) if group_index is None: return False return kb.group_schema(group_name) is not None # --------------------------------------------------------------------------- # Stage 3 — assemble # --------------------------------------------------------------------------- def assemble( annotated: list[Token], site_tag: str | None, raw_name: str, kb: ReleaseKnowledge, ) -> dict: """Fold annotated tokens into a ``ParsedRelease``-compatible dict. Returns a dict (not a ``ParsedRelease`` instance) so the caller can layer in additional fields (``parse_path``, ``raw``, …) before instantiation. """ # Pure-punctuation tokens (e.g. a stray "-" left by ` - ` separators in # human-friendly release names) carry no title content and would leak # into the joined title as ``"Show.-.Episode"``. Drop them here. title_parts = [ t.text for t in annotated if t.role is TokenRole.TITLE and any(c.isalnum() for c in t.text) ] title = ".".join(title_parts) if title_parts else ( annotated[0].text if annotated else raw_name ) year: int | None = None season: int | None = None episode: int | None = None episode_end: int | None = None quality: str | None = None source: str | None = None codec: str | None = None group = "UNKNOWN" audio_codec: str | None = None audio_channels: str | None = None bit_depth: str | None = None hdr_format: str | None = None edition: str | None = None distributor: str | None = None languages: list[str] = [] is_season_range = False for tok in annotated: # Skip non-primary members of a multi-token sequence. if tok.extra.get("sequence_member") == "True": continue role = tok.role if role is TokenRole.YEAR: year = int(tok.text) elif role is TokenRole.SEASON_EPISODE: parsed = _parse_season_episode(tok.text) if parsed is not None: season, episode, episode_end = parsed # Detect Sxx-yy range form to flag it as a multi-season pack. upper = tok.text.upper() if ( len(upper) == 6 and upper[0] == "S" and upper[1:3].isdigit() and upper[3] == "-" and upper[4:6].isdigit() ): is_season_range = True elif role is TokenRole.RESOLUTION: quality = tok.text elif role is TokenRole.SOURCE: source = tok.text elif role is TokenRole.CODEC: codec = tok.extra.get("codec", tok.text) if "group" in tok.extra: group = tok.extra["group"] or "UNKNOWN" elif role is TokenRole.GROUP: group = tok.extra.get("group", tok.text) or "UNKNOWN" elif role is TokenRole.AUDIO_CODEC: if audio_codec is None: audio_codec = tok.extra.get("sequence", tok.text) elif role is TokenRole.AUDIO_CHANNELS: if audio_channels is None: audio_channels = tok.extra.get("sequence", tok.text) elif role is TokenRole.BIT_DEPTH: if bit_depth is None: bit_depth = tok.text.lower() elif role is TokenRole.HDR: if hdr_format is None: hdr_format = tok.extra.get("sequence", tok.text.upper()) elif role is TokenRole.EDITION: if edition is None: edition = tok.extra.get("sequence", tok.text.upper()) elif role is TokenRole.LANGUAGE: languages.append(tok.text.upper()) elif role is TokenRole.DISTRIBUTOR: if distributor is None: distributor = tok.text.upper() # Media type heuristic. Doc/concert/integrale tokens win over the # generic tech-based fallback. We look across all tokens (not just # annotated ones) because these markers may be tagged UNKNOWN by the # structural pass — only the assemble step cares about them. upper_tokens = {tok.text.upper() for tok in annotated} doc_tokens = {t.upper() for t in kb.media_type_tokens.get("doc", [])} concert_tokens = {t.upper() for t in kb.media_type_tokens.get("concert", [])} integrale_tokens = {t.upper() for t in kb.media_type_tokens.get("integrale", [])} if upper_tokens & doc_tokens: media_type = MediaTypeToken.DOCUMENTARY elif upper_tokens & concert_tokens: media_type = MediaTypeToken.CONCERT elif is_season_range: media_type = MediaTypeToken.TV_COMPLETE elif ( edition in {"COMPLETE", "INTEGRALE", "COLLECTION"} or upper_tokens & integrale_tokens ) and season is None: media_type = MediaTypeToken.TV_COMPLETE elif season is not None: media_type = MediaTypeToken.TV_SHOW elif any((quality, source, codec, year)): media_type = MediaTypeToken.MOVIE else: media_type = MediaTypeToken.UNKNOWN return { "title": title, "title_sanitized": kb.sanitize_for_fs(title), "year": year, "season": season, "episode": episode, "episode_end": episode_end, "quality": quality, "source": source, "codec": codec, "group": group, "media_type": media_type, "site_tag": site_tag, "languages": languages, "audio_codec": audio_codec, "audio_channels": audio_channels, "bit_depth": bit_depth, "hdr_format": hdr_format, "edition": edition, "distributor": distributor, }