"""TV Show domain services - Business logic.""" import logging import re from ..shared.value_objects import ImdbId from .entities import TVShow from .exceptions import ( TVShowAlreadyExists, TVShowNotFound, ) from .repositories import EpisodeRepository, SeasonRepository, TVShowRepository logger = logging.getLogger(__name__) class TVShowService: """ Domain service for TV show-related business logic. This service contains business rules that don't naturally fit within a single entity. """ def __init__( self, show_repository: TVShowRepository, season_repository: SeasonRepository | None = None, episode_repository: EpisodeRepository | None = None, ): """ Initialize TV show service. Args: show_repository: TV show repository for persistence season_repository: Optional season repository episode_repository: Optional episode repository """ self.show_repository = show_repository self.season_repository = season_repository self.episode_repository = episode_repository def track_show(self, show: TVShow) -> None: """ Start tracking a TV show. Args: show: TVShow entity to track Raises: TVShowAlreadyExists: If show is already being tracked """ if self.show_repository.exists(show.imdb_id): raise TVShowAlreadyExists( f"TV show with IMDb ID {show.imdb_id} is already tracked" ) self.show_repository.save(show) logger.info(f"Started tracking TV show: {show.title} ({show.imdb_id})") def get_show(self, imdb_id: ImdbId) -> TVShow: """ Get a TV show by IMDb ID. Args: imdb_id: IMDb ID of the show Returns: TVShow entity Raises: TVShowNotFound: If show not found """ show = self.show_repository.find_by_imdb_id(imdb_id) if not show: raise TVShowNotFound(f"TV show with IMDb ID {imdb_id} not found") return show def get_all_shows(self) -> list[TVShow]: """ Get all tracked TV shows. Returns: List of all TV shows """ return self.show_repository.find_all() def get_ongoing_shows(self) -> list[TVShow]: """ Get all ongoing TV shows. Returns: List of ongoing TV shows """ all_shows = self.show_repository.find_all() return [show for show in all_shows if show.is_ongoing()] def get_ended_shows(self) -> list[TVShow]: """ Get all ended TV shows. Returns: List of ended TV shows """ all_shows = self.show_repository.find_all() return [show for show in all_shows if show.is_ended()] def update_show(self, show: TVShow) -> None: """ Update an existing TV show. Args: show: TVShow entity with updated data Raises: TVShowNotFound: If show doesn't exist """ if not self.show_repository.exists(show.imdb_id): raise TVShowNotFound(f"TV show with IMDb ID {show.imdb_id} not found") self.show_repository.save(show) logger.info(f"Updated TV show: {show.title} ({show.imdb_id})") def untrack_show(self, imdb_id: ImdbId) -> None: """ Stop tracking a TV show. Args: imdb_id: IMDb ID of the show to untrack Raises: TVShowNotFound: If show not found """ if not self.show_repository.delete(imdb_id): raise TVShowNotFound(f"TV show with IMDb ID {imdb_id} not found") logger.info(f"Stopped tracking TV show with IMDb ID: {imdb_id}") def parse_episode_from_filename(self, filename: str) -> tuple[int, int] | None: """ Parse season and episode numbers from filename. Supports formats: - S01E05 - 1x05 - Season 1 Episode 5 Args: filename: Filename to parse Returns: Tuple of (season, episode) if found, None otherwise """ filename_lower = filename.lower() # Pattern 1: S01E05 pattern1 = r"s(\d{1,2})e(\d{1,2})" match = re.search(pattern1, filename_lower) if match: return (int(match.group(1)), int(match.group(2))) # Pattern 2: 1x05 pattern2 = r"(\d{1,2})x(\d{1,2})" match = re.search(pattern2, filename_lower) if match: return (int(match.group(1)), int(match.group(2))) # Pattern 3: Season 1 Episode 5 pattern3 = r"season\s*(\d{1,2})\s*episode\s*(\d{1,2})" match = re.search(pattern3, filename_lower) if match: return (int(match.group(1)), int(match.group(2))) return None def validate_episode_file(self, filename: str) -> bool: """ Validate that a file is a valid episode file. Args: filename: Filename to validate Returns: True if valid episode file, False otherwise """ # Check file extension valid_extensions = {".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".webm"} extension = filename[filename.rfind(".") :].lower() if "." in filename else "" if extension not in valid_extensions: logger.warning(f"Invalid file extension: {extension}") return False # Check if we can parse episode info episode_info = self.parse_episode_from_filename(filename) if not episode_info: logger.warning(f"Could not parse episode info from filename: {filename}") return False return True def find_next_episode( self, show: TVShow, last_season: int, last_episode: int ) -> tuple[int, int] | None: """ Find the next episode to download for a show. Args: show: TVShow entity last_season: Last downloaded season number last_episode: Last downloaded episode number Returns: Tuple of (season, episode) for next episode, or None if show is complete """ # If show has ended and we've watched all seasons, no next episode if show.is_ended() and last_season >= show.seasons_count: return None # Simple logic: next episode in same season, or first episode of next season # This could be enhanced with actual episode counts per season next_episode = last_episode + 1 next_season = last_season # Assume max 50 episodes per season (could be improved with actual data) if next_episode > 50: next_season += 1 next_episode = 1 # Don't go beyond known seasons if next_season > show.seasons_count: return None return (next_season, next_episode)