f6eef59fca
Low-risk cleanup items, no functional change to the parser. The philosophy remains: keep the parser simple, the AI handles edge cases. - Extract duplicated 'fs-safe title → dot-folder-name' regex into to_dot_folder_name() in domain/shared/value_objects.py. Used by both MovieTitle.normalized() and TVShow.get_folder_name() (item #5). - ParsedRelease.languages now uses field(default_factory=list) instead of a manual __post_init__ assigning [] via object.__setattr__ (#6). - tv_shows/entities.py module docstring: prepend ASCII ownership tree for quicker visual scan of the aggregate hierarchy (#7). - file_extensions.yaml: split subtitle sidecars (.srt/.sub/.idx/.ass/.ssa) into a dedicated 'subtitle:' category instead of lumping them under 'metadata:'. _METADATA_EXTENSIONS at the value_objects.py level remains the union of both — detect_media_type behavior unchanged. New loader load_subtitle_extensions() exposes the distinct subtitle set for future callers in the subtitles domain (#20). Suite: 1020 passed, 8 skipped.
443 lines
19 KiB
Python
443 lines
19 KiB
Python
"""TV Show domain entities.
|
|
|
|
This module implements the TVShow aggregate following DDD principles.
|
|
|
|
Aggregate ownership::
|
|
|
|
TVShow ← aggregate root (the repo returns this)
|
|
└── seasons: dict[SeasonNumber, Season]
|
|
└── Season
|
|
└── episodes: dict[EpisodeNumber, Episode]
|
|
└── Episode ← file metadata + audio/subtitle tracks
|
|
|
|
Rules:
|
|
|
|
* ``TVShow`` is the aggregate **root** — the only entity exposed by the
|
|
repository.
|
|
* ``Season`` is owned by TVShow. ``Episode`` is owned by Season.
|
|
* Children do not back-reference the root (no ``show_imdb_id`` on
|
|
Season/Episode): they are only ever reached *through* TVShow.
|
|
* Mutation invariants are enforced through aggregate-root methods such as
|
|
``TVShow.add_episode()`` — never reach into ``show.seasons[...].episodes``
|
|
to mutate without going through the root, otherwise invariants are not
|
|
guaranteed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
|
|
from ..shared.media import AudioTrack, SubtitleTrack, track_lang_matches
|
|
from ..shared.value_objects import (
|
|
FilePath,
|
|
FileSize,
|
|
ImdbId,
|
|
Language,
|
|
to_dot_folder_name,
|
|
)
|
|
from .value_objects import (
|
|
CollectionStatus,
|
|
EpisodeNumber,
|
|
SeasonNumber,
|
|
ShowStatus,
|
|
)
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# Episode
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
@dataclass
|
|
class Episode:
|
|
"""
|
|
A single episode of a TV show — leaf of the TVShow aggregate.
|
|
|
|
Carries the file metadata (path, size) and the discovered tracks
|
|
(audio + subtitle). Track lists are populated by the ffprobe + subtitle
|
|
scan pipeline; they may be empty when the episode is known but not yet
|
|
scanned, or when no file is downloaded yet.
|
|
"""
|
|
|
|
season_number: SeasonNumber
|
|
episode_number: EpisodeNumber
|
|
title: str
|
|
file_path: FilePath | None = None
|
|
file_size: FileSize | None = None
|
|
audio_tracks: list[AudioTrack] = field(default_factory=list)
|
|
subtitle_tracks: list[SubtitleTrack] = field(default_factory=list)
|
|
|
|
def __post_init__(self) -> None:
|
|
# Coerce numbers if raw ints were passed
|
|
if not isinstance(self.season_number, SeasonNumber):
|
|
if isinstance(self.season_number, int):
|
|
self.season_number = SeasonNumber(self.season_number)
|
|
if not isinstance(self.episode_number, EpisodeNumber):
|
|
if isinstance(self.episode_number, int):
|
|
self.episode_number = EpisodeNumber(self.episode_number)
|
|
|
|
# ── File presence ──────────────────────────────────────────────────────
|
|
|
|
def has_file(self) -> bool:
|
|
"""True if a file path is set and the file actually exists on disk."""
|
|
return self.file_path is not None and self.file_path.exists()
|
|
|
|
def is_downloaded(self) -> bool:
|
|
"""Alias of ``has_file()`` — reads better in collection-status contexts."""
|
|
return self.has_file()
|
|
|
|
# ── Audio helpers ──────────────────────────────────────────────────────
|
|
|
|
def has_audio_in(self, lang: str | Language) -> bool:
|
|
"""True if at least one audio track is in the given language."""
|
|
return any(track_lang_matches(t.language, lang) for t in self.audio_tracks)
|
|
|
|
def audio_languages(self) -> list[str]:
|
|
"""Unique audio languages across all tracks, in track order."""
|
|
seen: set[str] = set()
|
|
result: list[str] = []
|
|
for t in self.audio_tracks:
|
|
if t.language and t.language not in seen:
|
|
seen.add(t.language)
|
|
result.append(t.language)
|
|
return result
|
|
|
|
# ── Subtitle helpers ───────────────────────────────────────────────────
|
|
|
|
def has_subtitles_in(self, lang: str | Language) -> bool:
|
|
"""True if at least one subtitle track is in the given language."""
|
|
return any(track_lang_matches(t.language, lang) for t in self.subtitle_tracks)
|
|
|
|
def has_forced_subs(self) -> bool:
|
|
"""True if at least one subtitle track is flagged as forced."""
|
|
return any(t.is_forced for t in self.subtitle_tracks)
|
|
|
|
def subtitle_languages(self) -> list[str]:
|
|
"""Unique subtitle languages across all tracks, in track order."""
|
|
seen: set[str] = set()
|
|
result: list[str] = []
|
|
for t in self.subtitle_tracks:
|
|
if t.language and t.language not in seen:
|
|
seen.add(t.language)
|
|
result.append(t.language)
|
|
return result
|
|
|
|
# ── Naming ─────────────────────────────────────────────────────────────
|
|
|
|
def get_filename(self) -> str:
|
|
"""Suggested filename: ``S01E05.Pilot``."""
|
|
season_str = f"S{self.season_number.value:02d}"
|
|
episode_str = f"E{self.episode_number.value:02d}"
|
|
clean_title = re.sub(r"[^\w\s\-]", "", self.title)
|
|
clean_title = clean_title.replace(" ", ".")
|
|
return f"{season_str}{episode_str}.{clean_title}"
|
|
|
|
def __str__(self) -> str:
|
|
return f"S{self.season_number.value:02d}E{self.episode_number.value:02d} - {self.title}"
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"Episode(S{self.season_number.value:02d}E{self.episode_number.value:02d})"
|
|
)
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# Season
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
@dataclass
|
|
class Season:
|
|
"""
|
|
A season of a TV show — owned by ``TVShow``.
|
|
|
|
Owns its episodes via the ``episodes`` dict keyed by ``EpisodeNumber``.
|
|
|
|
Two TMDB-sourced counts shape the collection logic:
|
|
|
|
* ``expected_episodes`` — total episodes planned for the season
|
|
(``None`` if unknown).
|
|
* ``aired_episodes`` — episodes **already aired** as of the latest TMDB
|
|
refresh. ``None`` falls back to ``expected_episodes`` (best-effort).
|
|
|
|
The split matters: ``is_complete()`` checks owned against aired, so a season
|
|
in the middle of broadcasting can be "complete" today and become "partial"
|
|
later when new episodes air — that is correct behavior.
|
|
"""
|
|
|
|
season_number: SeasonNumber
|
|
episodes: dict[EpisodeNumber, Episode] = field(default_factory=dict)
|
|
expected_episodes: int | None = None
|
|
aired_episodes: int | None = None
|
|
name: str | None = None
|
|
|
|
def __post_init__(self) -> None:
|
|
if not isinstance(self.season_number, SeasonNumber):
|
|
if isinstance(self.season_number, int):
|
|
self.season_number = SeasonNumber(self.season_number)
|
|
|
|
if self.expected_episodes is not None and self.expected_episodes < 0:
|
|
raise ValueError(
|
|
f"expected_episodes must be >= 0, got {self.expected_episodes}"
|
|
)
|
|
if self.aired_episodes is not None and self.aired_episodes < 0:
|
|
raise ValueError(f"aired_episodes must be >= 0, got {self.aired_episodes}")
|
|
if (
|
|
self.expected_episodes is not None
|
|
and self.aired_episodes is not None
|
|
and self.aired_episodes > self.expected_episodes
|
|
):
|
|
raise ValueError(
|
|
f"aired_episodes ({self.aired_episodes}) cannot exceed "
|
|
f"expected_episodes ({self.expected_episodes})"
|
|
)
|
|
|
|
# ── Properties ─────────────────────────────────────────────────────────
|
|
|
|
@property
|
|
def episode_count(self) -> int:
|
|
"""Number of episodes currently owned in this season."""
|
|
return len(self.episodes)
|
|
|
|
# ── Collection state ───────────────────────────────────────────────────
|
|
|
|
def _effective_aired(self) -> int | None:
|
|
"""``aired_episodes`` if set, else fall back to ``expected_episodes``."""
|
|
return (
|
|
self.aired_episodes
|
|
if self.aired_episodes is not None
|
|
else self.expected_episodes
|
|
)
|
|
|
|
def is_complete(self) -> bool:
|
|
"""
|
|
True if every aired episode is owned.
|
|
|
|
Returns False (conservative) when the aired count is unknown — without
|
|
knowing how many episodes have aired we cannot claim completeness.
|
|
"""
|
|
aired = self._effective_aired()
|
|
if aired is None:
|
|
return False
|
|
if aired == 0:
|
|
# No episode has aired yet → trivially "complete"
|
|
return True
|
|
return len(self.episodes) >= aired
|
|
|
|
def is_fully_aired(self) -> bool:
|
|
"""True if all planned episodes have already aired."""
|
|
if self.expected_episodes is None or self.aired_episodes is None:
|
|
return False
|
|
return self.aired_episodes >= self.expected_episodes
|
|
|
|
def missing_episodes(self) -> list[EpisodeNumber]:
|
|
"""
|
|
List of episode numbers that have aired but are not owned.
|
|
|
|
Episodes beyond ``aired_episodes`` are **not** considered missing
|
|
(they have not aired yet). When the aired count is unknown, returns
|
|
an empty list — we cannot reason about gaps without a target.
|
|
"""
|
|
aired = self._effective_aired()
|
|
if aired is None or aired <= 0:
|
|
return []
|
|
present = {ep.value for ep in self.episodes}
|
|
return [EpisodeNumber(n) for n in range(1, aired + 1) if n not in present]
|
|
|
|
# ── Mutation (called through the aggregate root) ───────────────────────
|
|
|
|
def add_episode(self, episode: Episode) -> None:
|
|
"""
|
|
Insert an episode into this season. Replaces any episode with the same
|
|
number — callers wishing to detect conflicts should check beforehand.
|
|
"""
|
|
if episode.season_number != self.season_number:
|
|
raise ValueError(
|
|
f"Episode season ({episode.season_number}) does not match season "
|
|
f"({self.season_number})"
|
|
)
|
|
self.episodes[episode.episode_number] = episode
|
|
|
|
# ── Naming ─────────────────────────────────────────────────────────────
|
|
|
|
def is_special(self) -> bool:
|
|
return self.season_number.is_special()
|
|
|
|
def get_folder_name(self) -> str:
|
|
"""``Season 01`` or ``Specials`` for season 0."""
|
|
if self.is_special():
|
|
return "Specials"
|
|
return f"Season {self.season_number.value:02d}"
|
|
|
|
def __str__(self) -> str:
|
|
if self.name:
|
|
return f"Season {self.season_number.value}: {self.name}"
|
|
return f"Season {self.season_number.value}"
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"Season(number={self.season_number.value}, episodes={len(self.episodes)})"
|
|
)
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# TVShow — aggregate root
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
@dataclass
|
|
class TVShow:
|
|
"""
|
|
Aggregate root for the TV shows domain.
|
|
|
|
Owns its seasons via the ``seasons`` dict keyed by ``SeasonNumber``.
|
|
All mutations (adding episodes, creating seasons) MUST go through the
|
|
methods on this class — that is how invariants are preserved.
|
|
|
|
Two axes describe the show, kept deliberately orthogonal:
|
|
|
|
* ``status`` (``ShowStatus``) — production state (TMDB-sourced).
|
|
* ``collection_status()`` — what the user owns vs what has aired today.
|
|
|
|
A third axis (upcoming/scheduled) will be added later as a separate flag
|
|
when scheduling support is introduced; for now we make no claim about
|
|
future episodes.
|
|
"""
|
|
|
|
imdb_id: ImdbId
|
|
title: str
|
|
status: ShowStatus
|
|
seasons: dict[SeasonNumber, Season] = field(default_factory=dict)
|
|
expected_seasons: int | None = None
|
|
tmdb_id: int | None = None
|
|
|
|
def __post_init__(self) -> None:
|
|
if not isinstance(self.imdb_id, ImdbId):
|
|
if isinstance(self.imdb_id, str):
|
|
self.imdb_id = ImdbId(self.imdb_id)
|
|
else:
|
|
raise ValueError(
|
|
f"imdb_id must be ImdbId or str, got {type(self.imdb_id)}"
|
|
)
|
|
|
|
if not isinstance(self.status, ShowStatus):
|
|
if isinstance(self.status, str):
|
|
self.status = ShowStatus.from_string(self.status)
|
|
else:
|
|
raise ValueError(
|
|
f"status must be ShowStatus or str, got {type(self.status)}"
|
|
)
|
|
|
|
if self.expected_seasons is not None and self.expected_seasons < 0:
|
|
raise ValueError(
|
|
f"expected_seasons must be >= 0, got {self.expected_seasons}"
|
|
)
|
|
|
|
# ── Production-state queries ───────────────────────────────────────────
|
|
|
|
def is_ongoing(self) -> bool:
|
|
return self.status == ShowStatus.ONGOING
|
|
|
|
def is_ended(self) -> bool:
|
|
return self.status == ShowStatus.ENDED
|
|
|
|
# ── Properties ─────────────────────────────────────────────────────────
|
|
|
|
@property
|
|
def seasons_count(self) -> int:
|
|
"""Number of seasons currently owned (any episode count, even 0)."""
|
|
return len(self.seasons)
|
|
|
|
@property
|
|
def episode_count(self) -> int:
|
|
"""Total episodes owned across all seasons."""
|
|
return sum(s.episode_count for s in self.seasons.values())
|
|
|
|
# ── Mutation — the sole entry point for adding content ─────────────────
|
|
|
|
def add_episode(self, episode: Episode) -> None:
|
|
"""
|
|
Add an episode to the appropriate season, creating the season if needed.
|
|
|
|
This is the **only** sanctioned way to add content to the aggregate —
|
|
it preserves the invariant that an episode is always reachable through
|
|
``show.seasons[s].episodes[e]``.
|
|
"""
|
|
season = self.seasons.get(episode.season_number)
|
|
if season is None:
|
|
season = Season(season_number=episode.season_number)
|
|
self.seasons[episode.season_number] = season
|
|
season.add_episode(episode)
|
|
|
|
def add_season(self, season: Season) -> None:
|
|
"""
|
|
Attach a (possibly already populated) Season to the show.
|
|
|
|
Replaces any existing season with the same number.
|
|
"""
|
|
self.seasons[season.season_number] = season
|
|
|
|
# ── Collection state ───────────────────────────────────────────────────
|
|
|
|
def collection_status(self) -> CollectionStatus:
|
|
"""
|
|
High-level state of the user's collection for this show.
|
|
|
|
* ``EMPTY`` — no episode owned
|
|
* ``COMPLETE`` — every season is complete relative to its aired count
|
|
* ``PARTIAL`` — at least one aired episode is missing
|
|
|
|
Seasons with an unknown aired count are treated conservatively: if no
|
|
season has any episode, the show is EMPTY; otherwise the unknown
|
|
seasons cannot prove completeness, so the show is PARTIAL.
|
|
"""
|
|
if self.episode_count == 0:
|
|
return CollectionStatus.EMPTY
|
|
|
|
# Check completeness across all seasons we know about
|
|
for season in self.seasons.values():
|
|
if not season.is_complete():
|
|
return CollectionStatus.PARTIAL
|
|
|
|
# We also need to consider whether seasons themselves are missing.
|
|
# If expected_seasons is known and we have fewer seasons than expected,
|
|
# the missing seasons may have aired episodes → cannot claim COMPLETE.
|
|
if (
|
|
self.expected_seasons is not None
|
|
and len(self.seasons) < self.expected_seasons
|
|
):
|
|
return CollectionStatus.PARTIAL
|
|
|
|
return CollectionStatus.COMPLETE
|
|
|
|
def is_complete_series(self) -> bool:
|
|
"""
|
|
True if the show is finished (ENDED) **and** the collection is complete.
|
|
|
|
This is the strongest "I own the entire series, no more to come" claim
|
|
we can make today, before scheduling/upcoming-episode awareness lands.
|
|
"""
|
|
return self.is_ended() and self.collection_status() == CollectionStatus.COMPLETE
|
|
|
|
def missing_episodes(self) -> list[tuple[SeasonNumber, EpisodeNumber]]:
|
|
"""All aired-but-not-owned ``(season, episode)`` pairs across the show."""
|
|
result: list[tuple[SeasonNumber, EpisodeNumber]] = []
|
|
for season_number, season in sorted(
|
|
self.seasons.items(), key=lambda kv: kv[0].value
|
|
):
|
|
for ep_number in season.missing_episodes():
|
|
result.append((season_number, ep_number))
|
|
return result
|
|
|
|
# ── Naming ─────────────────────────────────────────────────────────────
|
|
|
|
def get_folder_name(self) -> str:
|
|
"""Dot-separated folder name (e.g. ``Breaking.Bad``)."""
|
|
return to_dot_folder_name(self.title)
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.title} ({self.status.value}, {self.seasons_count} seasons)"
|
|
|
|
def __repr__(self) -> str:
|
|
return f"TVShow(imdb_id={self.imdb_id}, title='{self.title}')"
|