diff --git a/alfred/application/filesystem/dto.py b/alfred/application/filesystem/dto.py index 3f238bc..968e163 100644 --- a/alfred/application/filesystem/dto.py +++ b/alfred/application/filesystem/dto.py @@ -53,31 +53,6 @@ class MoveMediaResponse: } -@dataclass -class SetFolderPathResponse: - """Response from setting a folder path.""" - - status: str - folder_name: str | None = None - path: str | None = None - error: str | None = None - message: str | None = None - - def to_dict(self): - """Convert to dict for agent compatibility.""" - result = {"status": self.status} - - if self.error: - result["error"] = self.error - result["message"] = self.message - else: - if self.folder_name: - result["folder_name"] = self.folder_name - if self.path: - result["path"] = self.path - - return result - @dataclass class PlacedSubtitle: @@ -186,10 +161,10 @@ class ListFolderResponse: """Response from listing a folder.""" status: str - folder_type: str | None = None - path: str | None = None - entries: list[str] | None = None - count: int | None = None + folder_type: str | None = None # SHOULD BE A PROPERTY + path: str | None = None # NOT NONE - Should be path + entries: list[str] | None = None # NOT NONE - Empty list of path + count: int | None = None # USELESS error: str | None = None message: str | None = None diff --git a/alfred/application/filesystem/resolve_destination.py b/alfred/application/filesystem/resolve_destination.py index b823b26..d258e65 100644 --- a/alfred/application/filesystem/resolve_destination.py +++ b/alfred/application/filesystem/resolve_destination.py @@ -24,7 +24,7 @@ from pathlib import Path from alfred.application.release import inspect_release from alfred.domain.release import parse_release -from alfred.domain.release.ports import ReleaseKnowledge +from alfred.domain.releases.ports import ReleaseKnowledge from alfred.domain.release.value_objects import ParsedRelease from alfred.domain.shared.ports import MediaProber from alfred.infrastructure.persistence import get_memory diff --git a/alfred/application/filesystem/set_folder_path.py b/alfred/application/filesystem/set_folder_path.py deleted file mode 100644 index 277a35f..0000000 --- a/alfred/application/filesystem/set_folder_path.py +++ /dev/null @@ -1,50 +0,0 @@ -"""Set folder path use case.""" - -import logging - -from alfred.infrastructure.filesystem import FileManager - -from .dto import SetFolderPathResponse - -logger = logging.getLogger(__name__) - - -class SetFolderPathUseCase: - """ - Use case for setting a folder path in configuration. - - This orchestrates the FileManager to set folder paths. - """ - - def __init__(self, file_manager: FileManager): - """ - Initialize use case. - - Args: - file_manager: FileManager instance - """ - self.file_manager = file_manager - - def execute(self, folder_name: str, path_value: str) -> SetFolderPathResponse: - """ - Set a folder path in configuration. - - Args: - folder_name: Name of folder to set (download, tvshow, movie, torrent) - path_value: Absolute path to the folder - - Returns: - SetFolderPathResponse with success or error information - """ - result = self.file_manager.set_folder_path(folder_name, path_value) - - if result.get("status") == "ok": - return SetFolderPathResponse( - status="ok", - folder_name=result.get("folder_name"), - path=result.get("path"), - ) - else: - return SetFolderPathResponse( - status="error", error=result.get("error"), message=result.get("message") - ) diff --git a/alfred/application/movies/dto.py b/alfred/application/movies/dto.py index 2a8fe7c..e5aa722 100644 --- a/alfred/application/movies/dto.py +++ b/alfred/application/movies/dto.py @@ -12,9 +12,7 @@ class SearchMovieResponse: title: str | None = None media_type: str | None = None tmdb_id: int | None = None - overview: str | None = None release_date: str | None = None - vote_average: float | None = None error: str | None = None message: str | None = None @@ -34,11 +32,7 @@ class SearchMovieResponse: result["media_type"] = self.media_type if self.tmdb_id: result["tmdb_id"] = self.tmdb_id - if self.overview: - result["overview"] = self.overview if self.release_date: result["release_date"] = self.release_date - if self.vote_average: - result["vote_average"] = self.vote_average return result diff --git a/alfred/application/movies/rescan.py b/alfred/application/movies/rescan.py deleted file mode 100644 index 448ac92..0000000 --- a/alfred/application/movies/rescan.py +++ /dev/null @@ -1,122 +0,0 @@ -"""``rescan_movie`` — rebuild a MovieRelease from disk and persist it. - -The orchestrator locates the main video inside a movie folder, runs -``inspect_release`` on it (same single-source-of-truth as the TV -rescan flow), and assembles the result into a frozen -:class:`MovieRelease` written to the per-movie v2 ``.alfred`` sidecar. - -Folder convention ------------------ - -Movies are **one folder, one main file** in Alfred's library layout: - - movies/ - Inception (2010)/ - Inception.2010.1080p.BluRay.x264-GROUP.mkv - optional.srt - optional.nfo - -``find_video_file`` is responsible for picking the main video -(recursive walk, deterministic ordering). Adjacent subtitles / nfos -are ignored by this orchestrator — only embedded subtitle tracks are -captured (same scope as TV rescan). - -TMDB ----- - -``rescan_movie`` does **not** call TMDB. Identity (``tmdb_id``, -optional ``imdb_id``) is supplied by the caller; the library index -auto-heals from the new sidecar on its next read. A subsequent TMDB -sync (Phase 5) layers identity facts (``name``, ``release_year``) on -top of the on-disk truth. -""" - -from __future__ import annotations - -import logging -from datetime import UTC, datetime -from pathlib import Path - -from alfred.application.release.inspect import inspect_release -from alfred.domain.release.ports import ReleaseKnowledge -from alfred.domain.releases.entities import MovieRelease, TrackProfile -from alfred.domain.shared.ports import MediaProber -from alfred.domain.shared.value_objects import FilePath, ImdbId, TmdbId -from alfred.infrastructure.filesystem.find_video import find_video_file -from alfred.infrastructure.persistence.dot_alfred.v2.repository import ( - DotAlfredMovieReleaseRepository, -) - -_LOG = logging.getLogger(__name__) - - -class MovieRescanFailed(RuntimeError): - """Raised when ``rescan_movie`` cannot produce a release. - - The orchestrator surfaces a single explicit failure mode — no main - video found inside ``movie_dir``. All other adapter-level errors - (probe failure, parser low-confidence) degrade gracefully into a - sidecar with empty / partial fields, since the file is on disk - regardless. - """ - - -def rescan_movie( - movie_dir: Path, - *, - tmdb_id: TmdbId, - imdb_id: ImdbId | None = None, - movie_repo: DotAlfredMovieReleaseRepository, - prober: MediaProber, - kb: ReleaseKnowledge, -) -> MovieRelease: - """Rebuild and persist the :class:`MovieRelease` for ``movie_dir``. - - ``movie_dir.name`` is used as both the sidecar location (relative - to the movie library root) and the ``folder`` field on the - aggregate. - - Args: - movie_dir: absolute path to the movie folder under the movie - library root. - tmdb_id: TMDB primary key (required, no coercion). - imdb_id: optional secondary anchor. - movie_repo: v2 per-movie ``.alfred`` repository. - prober: ffprobe adapter (or stub). - kb: release knowledge base (video extensions, codecs, …). - - Returns: - The rebuilt :class:`MovieRelease` (also written to disk). - - Raises: - MovieRescanFailed: if no video file can be located inside - ``movie_dir``. ``added_at`` is fresh - (``datetime.now(UTC)``) on every rescan — the v2 sidecar - records when the release was last reconciled with disk, - not when the file appeared on the filesystem. - """ - main_video = find_video_file(movie_dir, kb) - if main_video is None: - raise MovieRescanFailed( - f"no video file found in {movie_dir}" - ) - - result = inspect_release(main_video.name, main_video, kb, prober) - media_info = result.media_info - audio_tracks = media_info.audio_tracks if media_info else () - subtitle_tracks = media_info.subtitle_tracks if media_info else () - - rel_path = main_video.relative_to(movie_dir) - release = MovieRelease( - tmdb_id=tmdb_id, - imdb_id=imdb_id, - folder=movie_dir.name, - file_path=FilePath(str(rel_path)), - added_at=datetime.now(UTC), - tracks=TrackProfile( - audio_tracks=audio_tracks, - subtitle_tracks=subtitle_tracks, - ), - ) - movie_repo.save(release) - return release diff --git a/alfred/application/movies/search_movie.py b/alfred/application/movies/search_movie.py index 652d418..758e1d0 100644 --- a/alfred/application/movies/search_movie.py +++ b/alfred/application/movies/search_movie.py @@ -44,7 +44,7 @@ class SearchMovieUseCase: # Use the TMDB client to search for media result = self.tmdb_client.search_media(media_title) - # Check if IMDb ID was found + # Check if IMDb ID was found # FUCK THIS if result.imdb_id: logger.info(f"IMDb ID found for '{media_title}': {result.imdb_id}") return SearchMovieResponse( @@ -53,9 +53,7 @@ class SearchMovieUseCase: title=result.title, media_type=result.media_type, tmdb_id=result.tmdb_id, - overview=result.overview, release_date=result.release_date, - vote_average=result.vote_average, ) else: logger.warning(f"No IMDb ID available for '{media_title}'") diff --git a/alfred/application/movies/sync.py b/alfred/application/movies/sync.py deleted file mode 100644 index 994d053..0000000 --- a/alfred/application/movies/sync.py +++ /dev/null @@ -1,122 +0,0 @@ -"""``sync_movie`` — refresh TMDB-cached fields on the movies library index. - -Parallel to :func:`alfred.application.tv_shows.sync.sync_show`. See -that module for the TTL / placeholder / force-flag policy — the -movies flow is structurally identical, differing only in the DTO -shape (movies have no seasons) and the placeholder marker (movies -use ``name == ""`` rather than ``status == "unknown"``). -""" - -from __future__ import annotations - -import logging -from collections.abc import Callable -from datetime import UTC, datetime -from pathlib import Path - -from alfred.application.exceptions import MovieNotFoundInLibrary -from alfred.domain.shared.value_objects import TmdbId -from alfred.infrastructure.api.tmdb.client import TMDBClient -from alfred.infrastructure.persistence.dot_alfred.v2.repository import ( - DotAlfredMovieLibraryIndex, - DotAlfredMovieReleaseRepository, -) -from alfred.infrastructure.persistence.dot_alfred.v2.sidecar_root import ( - MovieIndexEntry, -) - -_LOG = logging.getLogger(__name__) - -# Placeholder signature written by the movie library index's -# auto-heal path: ``MovieIndexEntry.name == metadata.path`` (the -# heal copies the folder name into ``name`` because it has no TMDB -# title to write). The sidecar schema requires ``name`` to be -# non-empty, so we cannot use ``name == ""`` as the marker — the -# folder-name equality is the next best signature. -# See ``DotAlfredMovieLibraryIndex._build_from_releases``. - - -def sync_movie( - library_root: Path, - *, - tmdb_id: TmdbId, - index: DotAlfredMovieLibraryIndex, - release_repo: DotAlfredMovieReleaseRepository, - tmdb_client: TMDBClient, - ttl_days: int, - now: Callable[[], datetime] = lambda: datetime.now(UTC), - force: bool = False, -) -> MovieIndexEntry: - """Refresh TMDB-cached fields for ``tmdb_id`` on the movie index. - - Args: - library_root: movies library root (informational). - tmdb_id: movie identifier. - index: library-root index to read and upsert into. - release_repo: per-movie sidecar repository. - tmdb_client: TMDB HTTP client. - ttl_days: max age (days) for an already-synced entry before - it is considered stale. Sourced from - :attr:`Settings.tmdb_cache_ttl_days`. - now: clock injection for deterministic tests. - force: bypass TTL gate; placeholders always refresh. - - Returns: - The fresh :class:`MovieIndexEntry`. - - Raises: - MovieNotFoundInLibrary: when no on-disk movie carries - ``tmdb_id`` (no per-movie sidecar and no index entry). - TMDBAPIError: re-raised from the client. - """ - del library_root # documented for symmetry with sync_show - current_time = now() - existing = index.find_by_tmdb_id(tmdb_id) - - if existing is not None and not force and not _needs_refresh( - existing, ttl_days=ttl_days, now=current_time - ): - return existing - - info = tmdb_client.get_movie_info(tmdb_id.value) - release = release_repo.load_by_tmdb_id(tmdb_id) - - if release is None and existing is None: - raise MovieNotFoundInLibrary( - f"no on-disk movie carries tmdb_id={tmdb_id.value}" - ) - - if release is None: - # Index entry exists but per-movie sidecar is gone or corrupt. - # We cannot upsert because index.upsert(release=...) is the - # only path that knows the imdb_id and folder anchor for - # movies (it's all carried on the release). Warn and skip — - # let the caller rescan to repopulate the per-movie sidecar. - _LOG.warning( - "sync_movie: per-movie sidecar missing for tmdb_id=%s; " - "skipping index upsert (anchor=%s) — rescan to repair", - tmdb_id.value, - existing.metadata.path, - ) - return existing - - index.upsert( - release, - name=info.title, - release_year=info.release_year, - path=release.folder, - fetched_at=current_time, - ) - refreshed = index.find_by_tmdb_id(tmdb_id) - assert refreshed is not None, "upsert did not persist entry" - return refreshed - - -def _needs_refresh( - entry: MovieIndexEntry, *, ttl_days: int, now: datetime -) -> bool: - """True if ``entry`` is a placeholder or older than ``ttl_days``.""" - if entry.name == entry.metadata.path: - return True - age = now - entry.metadata.fetched_at - return age.days >= ttl_days diff --git a/alfred/application/release/detect_media_type.py b/alfred/application/release/detect_media_type.py index 1fbef84..dc18f9b 100644 --- a/alfred/application/release/detect_media_type.py +++ b/alfred/application/release/detect_media_type.py @@ -19,7 +19,7 @@ from __future__ import annotations from pathlib import Path -from alfred.domain.release.ports import ReleaseKnowledge +from alfred.domain.releases.ports import ReleaseKnowledge from alfred.domain.release.value_objects import ParsedRelease diff --git a/alfred/application/release/enrich_from_probe.py b/alfred/application/release/enrich_from_probe.py index 15e81c2..9835435 100644 --- a/alfred/application/release/enrich_from_probe.py +++ b/alfred/application/release/enrich_from_probe.py @@ -4,7 +4,7 @@ from __future__ import annotations from dataclasses import replace -from alfred.domain.release.ports import ReleaseKnowledge +from alfred.domain.releases.ports import ReleaseKnowledge from alfred.domain.release.value_objects import ParsedRelease from alfred.domain.shared.media import MediaInfo diff --git a/alfred/application/release/inspect.py b/alfred/application/release/inspect.py index bb58cc5..71e3b7e 100644 --- a/alfred/application/release/inspect.py +++ b/alfred/application/release/inspect.py @@ -51,8 +51,8 @@ from pathlib import Path from alfred.application.release.detect_media_type import detect_media_type from alfred.application.release.enrich_from_probe import enrich_from_probe from alfred.application.release.supported_media import find_main_video -from alfred.domain.release.ports import ReleaseKnowledge -from alfred.domain.release.services import parse_release +from alfred.domain.releases.ports import ReleaseKnowledge +from alfred.domain.releases.parser.services import parse_release from alfred.domain.release.value_objects import ( MediaTypeToken, ParsedRelease, diff --git a/alfred/application/release/supported_media.py b/alfred/application/release/supported_media.py index aa1a59b..b4df37e 100644 --- a/alfred/application/release/supported_media.py +++ b/alfred/application/release/supported_media.py @@ -32,7 +32,7 @@ from __future__ import annotations from pathlib import Path -from alfred.domain.release.ports.knowledge import ReleaseKnowledge +from alfred.domain.releases.ports.knowledge import ReleaseKnowledge def is_supported_video(path: Path, kb: ReleaseKnowledge) -> bool: diff --git a/alfred/application/tv_shows/rescan.py b/alfred/application/tv_shows/rescan.py deleted file mode 100644 index 230db39..0000000 --- a/alfred/application/tv_shows/rescan.py +++ /dev/null @@ -1,204 +0,0 @@ -"""``rescan_show`` — rebuild a SeriesRelease from disk and persist it. - -The orchestrator walks the show folder, runs the existing release -pipeline (``inspect_release``) on every video file, and assembles the -result into a frozen :class:`SeriesRelease` written to the per-show -v2 ``.alfred`` sidecar. - -Why reuse ``inspect_release``? -------------------------------- - -The "fresh download" flow already parses release names, picks a main -video, runs ffprobe and refines media type. We want exactly the same -intelligence applied to library content — running it again here keeps -a single source of truth for parsing / probing rules. The orchestrator -just translates per-file :class:`InspectedResult` into release -aggregate construction. - -PACK vs EPISODIC ----------------- - -Classification is done by the walker, by inspecting the season -folder's filesystem structure (flat videos → PACK, subfolders → -EPISODIC). See :mod:`alfred.application.tv_shows.walker`. The -orchestrator trusts ``season_folder.mode`` and never re-derives. - -Files whose parser yields ``season is None`` or ``episode is None`` -are logged and skipped — a real PACK or EPISODIC file always carries -both. Mixed-season folders (two different ``Sxx`` numbers in the -same directory) are skipped with a warning. - -TMDB ----- - -``rescan_show`` does **not** call TMDB. It writes the release -sidecar; the library index is updated transparently by its auto-heal -path on the next read. A subsequent TMDB sync (Phase 5) layers -identity / season cache facts on top of the on-disk truth. - -Out of scope (tracked as tech debt): - -* Adjacent ``.srt`` files — only embedded subtitle tracks are - captured. -* Multi-episode files — ``ParsedRelease`` has no ``episode_end`` - field yet. -""" - -from __future__ import annotations - -import logging -from pathlib import Path - -from alfred.application.release.inspect import inspect_release -from alfred.application.tv_shows.walker import SeasonFolder, walk_show -from alfred.domain.release.ports import ReleaseKnowledge -from alfred.domain.releases.entities import ( - EpisodeRelease, - SeasonRelease, - SeriesRelease, - TrackProfile, -) -from alfred.domain.releases.value_objects import EpisodeRange, ReleaseMode -from alfred.domain.shared.media import MediaInfo -from alfred.domain.shared.ports import FilesystemScanner, MediaProber -from alfred.domain.shared.value_objects import FilePath, ImdbId, TmdbId -from alfred.domain.tv_shows.value_objects import EpisodeNumber, SeasonNumber -from alfred.infrastructure.persistence.dot_alfred.v2.repository import ( - DotAlfredSeriesReleaseRepository, -) - -_LOG = logging.getLogger(__name__) - - -def rescan_show( - show_root: Path, - *, - tmdb_id: TmdbId, - imdb_id: ImdbId | None = None, - series_repo: DotAlfredSeriesReleaseRepository, - scanner: FilesystemScanner, - prober: MediaProber, - kb: ReleaseKnowledge, -) -> SeriesRelease: - """Rebuild and persist the :class:`SeriesRelease` for ``show_root``. - - The show's folder name (``show_root.name``) is used as the sidecar - location relative to the library root. TMDB identity comes from the - caller — the orchestrator does not call TMDB. - - Returns the rebuilt frozen aggregate (also written to disk by - ``series_repo.save``). - """ - tree = walk_show(show_root, scanner=scanner, kb=kb) - seasons: list[SeasonRelease] = [] - for season_folder in tree.season_folders: - season = _ingest_season(season_folder, show_root, kb, prober) - if season is not None: - seasons.append(season) - release = SeriesRelease( - tmdb_id=tmdb_id, - imdb_id=imdb_id, - seasons=tuple(seasons), - ) - series_repo.save(release, show_folder=show_root.name) - return release - - -# --------------------------------------------------------------------------- # -# Per-season ingestion # -# --------------------------------------------------------------------------- # - - -def _ingest_season( - season_folder: SeasonFolder, - show_root: Path, - kb: ReleaseKnowledge, - prober: MediaProber, -) -> SeasonRelease | None: - if season_folder.mode is None: - # Walker already logged the reason (empty / malformed mix / - # multi-video subfolder). Just skip. - return None - - if not season_folder.video_files: - _LOG.warning( - "rescan_show: season folder %s contains no video file — skipping", - season_folder.season_dir, - ) - return None - - # Inspect every video to extract season + episode numbers. - inspected = [] - for video_path in season_folder.video_files: - result = inspect_release(video_path.name, video_path, kb, prober) - inspected.append((video_path, result)) - - season_numbers = { - r.parsed.season for _, r in inspected if r.parsed.season is not None - } - if not season_numbers: - _LOG.warning( - "rescan_show: no season number parsed in %s — skipping", - season_folder.season_dir, - ) - return None - if len(season_numbers) > 1: - _LOG.warning( - "rescan_show: mixed season numbers %s in %s — skipping", - sorted(season_numbers), - season_folder.season_dir, - ) - return None - season_number = SeasonNumber(season_numbers.pop()) - folder_name = season_folder.season_dir.name - - episodes: list[EpisodeRelease] = [] - for video_path, result in inspected: - if result.parsed.episode is None: - _LOG.warning( - "rescan_show: no episode number parsed for %s — skipping", - video_path, - ) - continue - episodes.append( - _make_episode_release( - episode_number=EpisodeNumber(result.parsed.episode), - video_path=video_path, - show_root=show_root, - media_info=result.media_info, - ) - ) - - if not episodes: - _LOG.warning( - "rescan_show: no parseable episodes in %s — skipping", - season_folder.season_dir, - ) - return None - - return SeasonRelease( - season_number=season_number, - folder=folder_name, - mode=season_folder.mode, - episodes=tuple(episodes), - ) - - -def _make_episode_release( - *, - episode_number: EpisodeNumber, - video_path: Path, - show_root: Path, - media_info: MediaInfo | None, -) -> EpisodeRelease: - rel_path = video_path.relative_to(show_root) - audio_tracks = media_info.audio_tracks if media_info else () - subtitle_tracks = media_info.subtitle_tracks if media_info else () - return EpisodeRelease( - episodes=EpisodeRange(start=episode_number, end=episode_number), - file_path=FilePath(str(rel_path)), - tracks=TrackProfile( - audio_tracks=audio_tracks, - subtitle_tracks=subtitle_tracks, - ), - ) diff --git a/alfred/application/tv_shows/sync.py b/alfred/application/tv_shows/sync.py deleted file mode 100644 index a34aadf..0000000 --- a/alfred/application/tv_shows/sync.py +++ /dev/null @@ -1,146 +0,0 @@ -"""``sync_show`` — refresh TMDB-cached fields on the TV library index. - -The orchestrator hits TMDB for one show, combines the response with -the on-disk release (if any), and upserts the result into the -library-root index. It is the only place that calls -:meth:`TMDBClient.get_tv_show_info`; the rescan flow stays -TMDB-free. - -TTL & placeholder policy ------------------------- - -``ttl_days`` (passed by the caller, sourced from -:attr:`Settings.tmdb_cache_ttl_days`) gates refreshes for entries -that already carry real TMDB facts. Placeholder entries — those -produced by the library index's auto-heal path, recognizable by -``status == "unknown"`` — always refresh regardless of TTL, because -auto-heal leaves the cache empty on purpose. ``force=True`` overrides -both gates. - -Missing on-disk release ------------------------ - -If the library index has an entry for ``tmdb_id`` but the per-show -sidecar is absent or corrupt, the sync still proceeds: ``release`` is -passed as ``None`` to :meth:`DotAlfredTVShowLibraryIndex.upsert`, -which produces an entry with TMDB facts but an empty episode-slot -map. Callers can then run :func:`rescan_show` to repopulate the slot -map. This is the "library knows the show, files temporarily missing" -state — a stale index pointing at a deleted folder. - -If both index and release sidecar are missing, the show genuinely -isn't in the library and we raise -:class:`ShowNotFoundInLibrary` — sync cannot invent a folder anchor. -""" - -from __future__ import annotations - -import logging -from collections.abc import Callable -from datetime import UTC, datetime -from pathlib import Path - -from alfred.application.exceptions import ShowNotFoundInLibrary -from alfred.domain.shared.value_objects import TmdbId -from alfred.infrastructure.api.tmdb.client import TMDBClient -from alfred.infrastructure.persistence.dot_alfred.v2.repository import ( - DotAlfredSeriesReleaseRepository, - DotAlfredTVShowLibraryIndex, -) -from alfred.infrastructure.persistence.dot_alfred.v2.sidecar_root import ( - ShowIndexEntry, -) - -_LOG = logging.getLogger(__name__) - -# Placeholder marker written by the library index's auto-heal path. -# See ``DotAlfredTVShowLibraryIndex._build_from_releases``. -_PLACEHOLDER_STATUS = "unknown" - - -def sync_show( - library_root: Path, - *, - tmdb_id: TmdbId, - index: DotAlfredTVShowLibraryIndex, - release_repo: DotAlfredSeriesReleaseRepository, - tmdb_client: TMDBClient, - ttl_days: int, - now: Callable[[], datetime] = lambda: datetime.now(UTC), - force: bool = False, -) -> ShowIndexEntry: - """Refresh TMDB-cached fields for ``tmdb_id`` on the TV index. - - Args: - library_root: TV shows library root (informational — the - index and repos already carry their own root). - tmdb_id: show identifier. - index: library-root index to read and upsert into. - release_repo: per-show sidecar repository, used to resolve - the on-disk release + folder anchor. - tmdb_client: TMDB HTTP client. - ttl_days: max age (days) for an already-synced entry before - it is considered stale. Sourced from - :attr:`Settings.tmdb_cache_ttl_days` by the caller. - now: clock injection for deterministic tests. - force: bypass TTL gate; placeholders always refresh - regardless of this flag. - - Returns: - The fresh :class:`ShowIndexEntry` (the one already in the - index when fresh, the newly-upserted one otherwise). - - Raises: - ShowNotFoundInLibrary: when neither index nor release repo - knows the show. - TMDBAPIError: re-raised from the client. - """ - del library_root # not needed for the algorithm; documented for symmetry with rescan_show - current_time = now() - existing = index.find_by_tmdb_id(tmdb_id) - - if existing is not None and not force and not _needs_refresh( - existing, ttl_days=ttl_days, now=current_time - ): - return existing - - info = tmdb_client.get_tv_show_info(tmdb_id.value) - loaded = release_repo.load_by_tmdb_id(tmdb_id) - - if loaded is None and existing is None: - raise ShowNotFoundInLibrary( - f"no on-disk TV show carries tmdb_id={tmdb_id.value}" - ) - - if loaded is not None: - release, folder = loaded - else: - # Index entry exists but per-show sidecar is gone or corrupt. - # Use the anchor recorded in the index; the slot map will be - # empty until a rescan repopulates it. - release = None - folder = existing.metadata.path - _LOG.warning( - "sync_show: per-show sidecar missing for tmdb_id=%s; " - "upserting index entry with empty episode slots (anchor=%s)", - tmdb_id.value, - folder, - ) - - index.upsert(info, release, path=folder, fetched_at=current_time) - refreshed = index.find_by_tmdb_id(tmdb_id) - # ``upsert`` writes synchronously; the follow-up read returns the - # entry we just persisted. Defensive ``assert``: if it isn't there, - # the index implementation has regressed. - assert refreshed is not None, "upsert did not persist entry" - return refreshed - - -def _needs_refresh( - entry: ShowIndexEntry, *, ttl_days: int, now: datetime -) -> bool: - """True if ``entry`` is a placeholder or older than ``ttl_days``.""" - if entry.status == _PLACEHOLDER_STATUS: - return True - age = now - entry.metadata.fetched_at - return age.days >= ttl_days diff --git a/alfred/application/tv_shows/walker.py b/alfred/application/tv_shows/walker.py index 2e7f821..2f963c5 100644 --- a/alfred/application/tv_shows/walker.py +++ b/alfred/application/tv_shows/walker.py @@ -50,7 +50,7 @@ import re from dataclasses import dataclass from pathlib import Path -from alfred.domain.release.ports import ReleaseKnowledge +from alfred.domain.releases.ports import ReleaseKnowledge from alfred.domain.releases.value_objects import ReleaseMode from alfred.domain.shared.ports import FilesystemScanner diff --git a/alfred/domain/movies/entities.py b/alfred/domain/movies/entities.py index fce2e70..5840083 100644 --- a/alfred/domain/movies/entities.py +++ b/alfred/domain/movies/entities.py @@ -55,6 +55,7 @@ class Movie: def __hash__(self) -> int: return hash(self.tmdb_id) + # WRONG def get_folder_name(self) -> str: """ Get the folder name for this movie. @@ -66,6 +67,7 @@ class Movie: return f"{self.title.value} ({self.release_year.value})" return self.title.value + # WRONG def get_filename(self) -> str: """ Get the suggested base filename (without extension) for this movie. diff --git a/alfred/domain/movies/value_objects.py b/alfred/domain/movies/value_objects.py index 7b74fbf..7bc199f 100644 --- a/alfred/domain/movies/value_objects.py +++ b/alfred/domain/movies/value_objects.py @@ -4,7 +4,6 @@ from dataclasses import dataclass from enum import Enum from ..shared.exceptions import ValidationError -from ..shared.value_objects import to_dot_folder_name class Quality(Enum): @@ -56,18 +55,11 @@ class MovieTitle: f"Movie title must be a string, got {type(self.value)}" ) - if len(self.value) > 500: + if len(self.value) > 100: raise ValidationError( - f"Movie title too long: {len(self.value)} characters (max 500)" + f"Movie title too long: {len(self.value)} characters (max 100)" ) - def normalized(self) -> str: - """ - Return normalized title for file system usage. - - Removes special characters and replaces spaces with dots. - """ - return to_dot_folder_name(self.value) def __str__(self) -> str: return self.value @@ -93,10 +85,6 @@ class ReleaseYear: f"Release year must be an integer, got {type(self.value)}" ) - # Movies started around 1888, and we shouldn't have movies from the future - if self.value < 1888 or self.value > 2100: - raise ValidationError(f"Invalid release year: {self.value}") - def __str__(self) -> str: return str(self.value) diff --git a/alfred/domain/release/__init__.py b/alfred/domain/release/__init__.py deleted file mode 100644 index 3c2b4c7..0000000 --- a/alfred/domain/release/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Release domain — release name parsing and naming conventions.""" - -from .services import parse_release -from .value_objects import ParsedRelease, ParseReport - -__all__ = ["ParsedRelease", "ParseReport", "parse_release"] diff --git a/alfred/domain/releases/builders.py b/alfred/domain/releases/builders.py index 2bdff85..b164629 100644 --- a/alfred/domain/releases/builders.py +++ b/alfred/domain/releases/builders.py @@ -42,6 +42,15 @@ from .entities import ( ) from .value_objects import ReleaseMode + +# ════════════════════════════════════════════════════════════════════════════ +# MovieReleaseBuilder +# ════════════════════════════════════════════════════════════════════════════ +# ... + + + + # ════════════════════════════════════════════════════════════════════════════ # SeasonReleaseBuilder # ════════════════════════════════════════════════════════════════════════════ diff --git a/alfred/domain/release/parser/__init__.py b/alfred/domain/releases/parser/__init__.py similarity index 85% rename from alfred/domain/release/parser/__init__.py rename to alfred/domain/releases/parser/__init__.py index 37558c1..c54b522 100644 --- a/alfred/domain/release/parser/__init__.py +++ b/alfred/domain/releases/parser/__init__.py @@ -17,10 +17,6 @@ The pipeline has three internal paths driven by the detected release group: knowledge sets, with a 0-100 confidence score. - **PATH OF PAIN**: score below threshold OR critical chunks missing — signaled to the caller, who decides whether to involve the LLM/user. - -Today the package exposes scaffolding only (token VOs and a thin pipeline -stub). The legacy ``parse_release`` in ``release.services`` keeps serving -production until each piece of the v2 pipeline is wired in. """ from __future__ import annotations diff --git a/alfred/domain/release/parser/pipeline.py b/alfred/domain/releases/parser/pipeline.py similarity index 99% rename from alfred/domain/release/parser/pipeline.py rename to alfred/domain/releases/parser/pipeline.py index 5373edc..d3a582b 100644 --- a/alfred/domain/release/parser/pipeline.py +++ b/alfred/domain/releases/parser/pipeline.py @@ -29,7 +29,7 @@ arrives through ``kb: ReleaseKnowledge``. from __future__ import annotations from ..ports.knowledge import ReleaseKnowledge -from ..value_objects import MediaTypeToken +from alfred.domain.release.value_objects import MediaTypeToken from .schema import GroupSchema from .tokens import Token, TokenRole diff --git a/alfred/domain/releases/parser/roads/__init__.py b/alfred/domain/releases/parser/roads/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alfred/domain/releases/parser/roads/easy.py b/alfred/domain/releases/parser/roads/easy.py new file mode 100644 index 0000000..e69de29 diff --git a/alfred/domain/releases/parser/roads/path_of_pain.py b/alfred/domain/releases/parser/roads/path_of_pain.py new file mode 100644 index 0000000..e69de29 diff --git a/alfred/domain/releases/parser/roads/shitty.py b/alfred/domain/releases/parser/roads/shitty.py new file mode 100644 index 0000000..e69de29 diff --git a/alfred/domain/release/parser/schema.py b/alfred/domain/releases/parser/schema.py similarity index 100% rename from alfred/domain/release/parser/schema.py rename to alfred/domain/releases/parser/schema.py diff --git a/alfred/domain/release/parser/scoring.py b/alfred/domain/releases/parser/scoring.py similarity index 98% rename from alfred/domain/release/parser/scoring.py rename to alfred/domain/releases/parser/scoring.py index e3a23da..965f69c 100644 --- a/alfred/domain/release/parser/scoring.py +++ b/alfred/domain/releases/parser/scoring.py @@ -27,7 +27,7 @@ from __future__ import annotations from enum import Enum from ..ports.knowledge import ReleaseKnowledge -from ..value_objects import ParsedRelease +from alfred.domain.release.value_objects import ParsedRelease from .tokens import Token, TokenRole diff --git a/alfred/domain/release/services.py b/alfred/domain/releases/parser/services.py similarity index 94% rename from alfred/domain/release/services.py rename to alfred/domain/releases/parser/services.py index bfcc3c4..7a02f2a 100644 --- a/alfred/domain/release/services.py +++ b/alfred/domain/releases/parser/services.py @@ -18,10 +18,9 @@ score, the road, and diagnostic info for downstream callers. from __future__ import annotations -from .parser import pipeline as _v2 -from .parser import scoring as _scoring -from .ports import ReleaseKnowledge -from .value_objects import MediaTypeToken, ParsedRelease, ParseReport, TokenizationRoute +from alfred.domain.releases.parser import scoring as _scoring, pipeline as _v2 +from alfred.domain.releases.ports import ReleaseKnowledge +from alfred.domain.release.value_objects import MediaTypeToken, ParsedRelease, ParseReport, TokenizationRoute def parse_release( diff --git a/alfred/domain/release/parser/tokens.py b/alfred/domain/releases/parser/tokens.py similarity index 100% rename from alfred/domain/release/parser/tokens.py rename to alfred/domain/releases/parser/tokens.py diff --git a/alfred/domain/release/ports/__init__.py b/alfred/domain/releases/ports/__init__.py similarity index 100% rename from alfred/domain/release/ports/__init__.py rename to alfred/domain/releases/ports/__init__.py diff --git a/alfred/domain/release/ports/knowledge.py b/alfred/domain/releases/ports/knowledge.py similarity index 100% rename from alfred/domain/release/ports/knowledge.py rename to alfred/domain/releases/ports/knowledge.py diff --git a/alfred/domain/release/value_objects.py b/alfred/domain/releases/value_objects_old_question_mark.py similarity index 67% rename from alfred/domain/release/value_objects.py rename to alfred/domain/releases/value_objects_old_question_mark.py index 8692bc6..4e8eb72 100644 --- a/alfred/domain/release/value_objects.py +++ b/alfred/domain/releases/value_objects_old_question_mark.py @@ -18,7 +18,7 @@ from __future__ import annotations from dataclasses import dataclass from enum import Enum -from ..shared.exceptions import ValidationError +from alfred.domain.shared.exceptions import ValidationError class MediaTypeToken(str, Enum): @@ -128,7 +128,6 @@ class ParsedRelease: """ raw: str # original release name (untouched) - clean: str # raw minus site_tag and apostrophes — used by season_folder_name() title: str # show/movie title (dots, no year/season/tech) title_sanitized: str # title with filesystem-forbidden chars stripped year: int | None # movie year or show start year (from TMDB) @@ -157,18 +156,6 @@ class ParsedRelease: raise ValidationError("ParsedRelease.raw cannot be empty") if not self.group: raise ValidationError("ParsedRelease.group cannot be empty") - if self.year is not None and not (1888 <= self.year <= 2100): - raise ValidationError( - f"ParsedRelease.year out of range: {self.year}" - ) - if self.season is not None and not (0 <= self.season <= 100): - raise ValidationError( - f"ParsedRelease.season out of range: {self.season}" - ) - if self.episode is not None and not (0 <= self.episode <= 9999): - raise ValidationError( - f"ParsedRelease.episode out of range: {self.episode}" - ) if self.episode_end is not None: if not (0 <= self.episode_end <= 9999): raise ValidationError( @@ -194,78 +181,3 @@ class ParsedRelease: def is_season_pack(self) -> bool: return self.season is not None and self.episode is None - @property - def tech_string(self) -> str: - """``quality.source.codec`` joined by dots, skipping ``None`` parts. - - Derived on every access so it stays in sync with the underlying - fields — no manual refresh needed after enrichment. - """ - return ".".join(p for p in (self.quality, self.source, self.codec) if p) - - def show_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str: - """ - Build the series root folder name. - - Format: {Title}.{Year}.{Tech}-{Group} - Example: Oz.1997.1080p.WEBRip.x265-KONTRAST - - ``tmdb_title_safe`` must already be filesystem-safe (the caller is - expected to have run it through ``kb.sanitize_for_fs``). - """ - title_part = tmdb_title_safe.replace(" ", ".") - tech = self.tech_string or "Unknown" - return f"{title_part}.{tmdb_year}.{tech}-{self.group}" - - def season_folder_name(self) -> str: - """ - Build the season subfolder name = normalized release name (no episode). - - Example: Oz.S03.1080p.WEBRip.x265-KONTRAST - For a single-episode release we still strip the episode token so the - folder can hold the whole season. - """ - return _strip_episode_from_normalized(self.clean) - - def episode_filename(self, tmdb_episode_title_safe: str | None, ext: str) -> str: - """ - Build the episode filename. - - Format: {Title}.{SxxExx}.{EpisodeTitle}.{Tech}-{Group}.{ext} - Example: Oz.S01E01.The.Routine.1080p.WEBRip.x265-KONTRAST.mkv - - ``tmdb_episode_title_safe`` must already be filesystem-safe; pass - ``None`` to omit the episode title segment. - """ - title_part = self.title_sanitized - s = f"S{self.season:02d}" if self.season is not None else "" - e = f"E{self.episode:02d}" if self.episode is not None else "" - se = s + e - - ep_title = "" - if tmdb_episode_title_safe: - ep_title = "." + tmdb_episode_title_safe.replace(" ", ".") - - tech = self.tech_string or "Unknown" - ext_clean = ext.lstrip(".") - return f"{title_part}.{se}{ep_title}.{tech}-{self.group}.{ext_clean}" - - def movie_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str: - """ - Build the movie folder name. - - Format: {Title}.{Year}.{Tech}-{Group} - Example: Inception.2010.1080p.BluRay.x265-GROUP - """ - return self.show_folder_name(tmdb_title_safe, tmdb_year) - - def movie_filename( - self, tmdb_title_safe: str, tmdb_year: int, ext: str - ) -> str: - """ - Build the movie filename (same as folder name + extension). - - Example: Inception.2010.1080p.BluRay.x265-GROUP.mkv - """ - ext_clean = ext.lstrip(".") - return f"{self.movie_folder_name(tmdb_title_safe, tmdb_year)}.{ext_clean}" diff --git a/alfred/domain/shared/value_objects.py b/alfred/domain/shared/value_objects.py index 306851c..7bf0b06 100644 --- a/alfred/domain/shared/value_objects.py +++ b/alfred/domain/shared/value_objects.py @@ -77,7 +77,7 @@ class TmdbId: def __int__(self) -> int: return self.value - +# GOOD @dataclass(frozen=True) class FilePath: """ @@ -265,7 +265,7 @@ class Language: # literal dots, and hyphens. Everything else is stripped. _FS_SAFE_CHARS = re.compile(r"[^\w\s\.\-]") - +# USELESS - TO REMOVE def to_dot_folder_name(title: str) -> str: """Sanitize ``title`` for filesystem use and convert spaces to dots. diff --git a/alfred/domain/tv_shows/entities.py b/alfred/domain/tv_shows/entities.py index bf9fd8c..92a91db 100644 --- a/alfred/domain/tv_shows/entities.py +++ b/alfred/domain/tv_shows/entities.py @@ -71,7 +71,6 @@ class Episode: season_number: SeasonNumber episode_number: EpisodeNumber - title: str def __post_init__(self) -> None: if not isinstance(self.season_number, SeasonNumber): @@ -97,7 +96,7 @@ class Episode: return hash((self.season_number, self.episode_number)) # ── Naming ───────────────────────────────────────────────────────────── - + # WRONG - NO TITLE REQUIRED def get_filename(self) -> str: """Suggested filename: ``S01E05.Pilot``.""" season_str = f"S{self.season_number.value:02d}" @@ -261,7 +260,7 @@ class TVShow: ) # ── Naming ───────────────────────────────────────────────────────────── - + # WRONG def get_folder_name(self) -> str: """Dot-separated folder name (e.g. ``Breaking.Bad``).""" return to_dot_folder_name(self.title) diff --git a/alfred/infrastructure/filesystem/find_video.py b/alfred/infrastructure/filesystem/find_video.py index 9260331..a31a434 100644 --- a/alfred/infrastructure/filesystem/find_video.py +++ b/alfred/infrastructure/filesystem/find_video.py @@ -4,7 +4,7 @@ from __future__ import annotations from pathlib import Path -from alfred.domain.release.ports import ReleaseKnowledge +from alfred.domain.releases.ports import ReleaseKnowledge def find_video_file(path: Path, kb: ReleaseKnowledge) -> Path | None: diff --git a/alfred/infrastructure/knowledge/release_kb.py b/alfred/infrastructure/knowledge/release_kb.py index 7f9d0b6..6c640ba 100644 --- a/alfred/infrastructure/knowledge/release_kb.py +++ b/alfred/infrastructure/knowledge/release_kb.py @@ -14,8 +14,8 @@ filesystem-level concerns. from __future__ import annotations -from alfred.domain.release.parser.schema import GroupSchema, SchemaChunk -from alfred.domain.release.parser.tokens import TokenRole +from alfred.domain.releases.parser import GroupSchema, SchemaChunk +from alfred.domain.releases.parser.tokens import TokenRole from .release import ( load_audio, diff --git a/testing/debug_release.py b/testing/debug_release.py deleted file mode 100644 index d92e14f..0000000 --- a/testing/debug_release.py +++ /dev/null @@ -1,434 +0,0 @@ -"""CLI de debug pour analyser une release et dry-run le déplacement.""" - -import json -import sys -from pathlib import Path - -# Permet de lancer le script depuis n'importe où sans install du package -sys.path.insert(0, str(Path(__file__).parent.parent)) - - -def _init_memory(): - from alfred.infrastructure.persistence import init_memory - from alfred.settings import settings - - init_memory(settings.data_storage_dir) - - -def _resolve_via_tmdb(release_name: str) -> tuple[str, int] | None: - """Parse le release name, interroge TMDB, retourne (tmdb_title, tmdb_year).""" - from alfred.application.movies import SearchMovieUseCase - from alfred.domain.release.services import parse_release - from alfred.infrastructure.api.tmdb import tmdb_client - - parsed = parse_release(release_name) - raw_title = parsed.title.replace(".", " ").strip() - - print(f" titre extrait : {raw_title}") - print(" interrogation TMDB...") - - use_case = SearchMovieUseCase(tmdb_client) - result = use_case.execute(raw_title).to_dict() - - if result.get("status") != "ok": - print(f" TMDB error: {result.get('message')}") - return None - - title = result["title"] - release_date = result.get("release_date", "") - year = int(release_date[:4]) if release_date and len(release_date) >= 4 else None - - if not year: - print(f" TMDB: pas d'année trouvée pour '{title}'") - return None - - print(f" TMDB: {title} ({year})") - return title, year - - -def _extract_release_name(release_name: str) -> tuple[str, str]: - """ - Retourne (release_name, source_path). - - Si c'est un path absolu existant → extrait le basename et utilise le path comme source. - Sinon → cherche le dossier dans workspace.download configuré en LTM. - """ - p = Path(release_name) - if p.is_absolute() and p.exists(): - return p.name, str(p) - - from alfred.infrastructure.persistence import get_memory - - memory = get_memory() - download_root = memory.ltm.workspace.download - if download_root: - candidate = Path(download_root) / release_name - if candidate.exists(): - return release_name, str(candidate) - - return release_name, "" - - -def analyze(release_name: str, source_path: str | None = None) -> None: - from alfred.domain.release.services import parse_release - - release_name, resolved_path = _extract_release_name(release_name) - if source_path is None and resolved_path: - source_path = resolved_path - - print(f"\n=== PARSE: {release_name} ===") - r = parse_release(release_name) - for k, v in vars(r).items(): - if v is not None and v != [] and v != "": - print(f" {k}: {v}") - - if source_path: - path = Path(source_path) - print(f"\n=== PROBE: {path} ===") - if not path.exists(): - print(" (chemin inexistant, probe skipped)") - else: - from alfred.infrastructure.filesystem.find_video import find_video_file - from alfred.infrastructure.probe import FfprobeMediaProber - - video = find_video_file(path) if path.is_dir() else path - if video: - print(f" video file: {video.name}") - info = FfprobeMediaProber().probe(video) - if info: - print(f" codec: {info.video_codec}") - print(f" resolution: {info.resolution}") - print( - f" audio_tracks: {[(t.codec, t.language) for t in info.audio_tracks]}" - ) - print( - f" subtitle_tracks: {[(t.codec, t.language) for t in info.subtitle_tracks]}" - ) - else: - print(" probe failed (ffprobe dispo ?)") - else: - print(" aucun fichier vidéo trouvé") - - -def dry_run(release_name: str) -> None: - _init_memory() - release_name, _ = _extract_release_name(release_name) - - print(f"\n=== DRY-RUN: {release_name} ===") - tmdb = _resolve_via_tmdb(release_name) - if not tmdb: - sys.exit(1) - - tmdb_title, tmdb_year = tmdb - - from alfred.application.filesystem.resolve_destination import ( - resolve_season_destination, - ) - from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge - from alfred.infrastructure.probe import FfprobeMediaProber - - result = resolve_season_destination( - release_name, - tmdb_title, - tmdb_year, - YamlReleaseKnowledge(), - FfprobeMediaProber(), - ) - d = result.to_dict() - print() - print(json.dumps(d, indent=2, ensure_ascii=False)) - - if d["status"] == "ok": - print("\n=== MOVE PREVIEW ===") - print(" src : ") - print(f" dst : {d['season_folder']}") - - -def _translate_path(path: str) -> str: - """Translate a host-side path to the qBittorrent container path.""" - from alfred.settings import settings - - host_prefix = settings.qbittorrent_host_path - container_prefix = settings.qbittorrent_container_path - if host_prefix and container_prefix and path.startswith(host_prefix): - return container_prefix + path[len(host_prefix) :] - return path - - -def _qbittorrent_update(torrent_name: str, new_location: str | None) -> None: - """ - Find the torrent in qBittorrent by name, update its save_path, and force recheck. - - Args: - torrent_name: Exact torrent name (release folder basename) - new_location: New save path on the host (parent of the torrent folder). - None if the torrent was sent to trash — skip location change. - """ - try: - from alfred.infrastructure.api.qbittorrent.client import QBittorrentClient - - client = QBittorrentClient() - client.login() - - torrent = client.find_by_name(torrent_name) - if torrent is None: - print(f" ⚠ qBittorrent: torrent '{torrent_name}' not found — skipping") - return - - print(f" qBittorrent: found '{torrent.name}' (hash={torrent.hash[:8]}…)") - - if new_location: - container_location = _translate_path(new_location) - client.set_location(torrent.hash, container_location) - print(f" ✓ qBittorrent: location → {container_location}") - - client.recheck(torrent.hash) - print(" ✓ qBittorrent: recheck triggered") - - except Exception as e: - # Non-fatal — the files are already in place - print(f" ⚠ qBittorrent update failed (non-fatal): {e}") - - -def do_move(release_name: str, source_folder: str | None = None) -> None: - _init_memory() - release_name, resolved_path = _extract_release_name(release_name) - if source_folder is None: - source_folder = resolved_path - if not source_folder: - print( - " Erreur: source introuvable. Configure workspace.download ou passe le path complet." - ) - sys.exit(1) - - print(f"\n=== MOVE: {release_name} ===") - tmdb = _resolve_via_tmdb(release_name) - if not tmdb: - sys.exit(1) - - tmdb_title, tmdb_year = tmdb - - from alfred.application.filesystem.resolve_destination import ( - resolve_season_destination, - ) - from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge - from alfred.infrastructure.probe import FfprobeMediaProber - - result = resolve_season_destination( - release_name, - tmdb_title, - tmdb_year, - YamlReleaseKnowledge(), - FfprobeMediaProber(), - ) - d = result.to_dict() - - if d["status"] == "needs_clarification": - print(f"\n {d['question']}") - for i, opt in enumerate(d["options"]): - print(f" {i + 1}. {opt}") - choice = input(" Choix (numéro) : ").strip() - try: - chosen = d["options"][int(choice) - 1] - except (ValueError, IndexError): - print(" Choix invalide.") - sys.exit(1) - result = resolve_season_destination( - release_name, tmdb_title, tmdb_year, confirmed_folder=chosen - ) - d = result.to_dict() - - if d["status"] != "ok": - print(json.dumps(d, indent=2, ensure_ascii=False)) - sys.exit(1) - - src_path = Path(source_folder) - season_folder = d["season_folder"] - mkv_files = sorted(src_path.glob("*.mkv")) or sorted(src_path.glob("*.mp4")) - - from alfred.infrastructure.persistence import get_memory - - memory = get_memory() - torrent_root = memory.ltm.workspace.torrent - trash_root = memory.ltm.workspace.trash - torrent_dst = str(Path(torrent_root) / src_path.name) if torrent_root else None - trash_dst = str(Path(trash_root) / src_path.name) if trash_root else None - - rebuild = input(" Recréer le torrent ? [y/N] : ").strip().lower() == "y" - - # --- PHASE 1: PLAN --- - print("\n=== PLAN ===") - print(f" destination : {season_folder}") - - from alfred.application.filesystem.manage_subtitles import ManageSubtitlesUseCase - from alfred.domain.release.services import parse_release - - parsed = parse_release(release_name) - - # Dict: video_path → sub_result (pre-scanned, files not yet moved) - plan: list[tuple[Path, str, object]] = [] # (src_file, dst_path, sub_result) - has_errors = False - - for f in mkv_files: - dst = str(Path(season_folder) / f.name) - ghost_src = str(src_path / f.name) - sub_result = ManageSubtitlesUseCase().execute( - source_video=ghost_src, - destination_video=dst, - media_type="tv_show", - release_group=parsed.group, - season=parsed.season, - dry_run=True, - ) - - print(f"\n {f.name}") - print(f" → {dst}") - - if sub_result.status == "ok": - if sub_result.placed: - for p in sub_result.placed: - print(f" sub: {p.filename}") - elif sub_result.available: - for a in sub_result.available: - print(f" sub (embedded): {a.language} {a.subtitle_type}") - else: - print(" subs: aucun") - elif sub_result.status == "needs_clarification": - print(" ✗ subs non résolus:") - for u in sub_result.unresolved: - print(f" {u.raw_tokens} ({u.reason})") - has_errors = True - elif sub_result.status == "error": - print(f" ✗ erreur subs: {sub_result.message}") - has_errors = True - - plan.append((f, dst, sub_result)) - - if rebuild and torrent_dst: - print(f"\n source → torrents : {torrent_dst}") - print(f" hard-links : {len(mkv_files)} fichier(s)") - elif trash_dst: - print(f"\n source → trash : {trash_dst}") - else: - print("\n source : laissée en place") - - if has_errors: - print("\n ✗ Plan invalide — subs non résolus, abandon.") - sys.exit(1) - - # --- CONFIRMATION --- - print() - confirm = input(" Confirmer ? [y/N] : ").strip().lower() - if confirm != "y": - print(" Annulé.") - sys.exit(0) - - # --- PHASE 2: EXECUTE --- - import os - - from alfred.infrastructure.filesystem.filesystem_operations import ( - create_folder, - move, - ) - - print("\n=== EXECUTE ===") - - # 1. Créer le season_folder - r = create_folder(season_folder) - if r["status"] != "ok": - print(f" ✗ create_folder: {r}") - sys.exit(1) - print(f" ✓ dossier : {season_folder}") - - # 2. Déplacer chaque fichier vidéo + placer les subs (re-run après move) - for f, dst, _pre_scan in plan: - r = move(str(f), dst) - if r["status"] != "ok": - print(f" ✗ {f.name}: {r['message']}") - sys.exit(1) - print(f" ✓ {f.name}") - - # Re-run manage_subtitles maintenant que dst existe (pour le hard-link) - ghost_src = str(src_path / f.name) - sub_result = ManageSubtitlesUseCase().execute( - source_video=ghost_src, - destination_video=dst, - media_type="tv_show", - release_group=parsed.group, - season=parsed.season, - ) - if sub_result.status == "ok" and sub_result.placed: - for p in sub_result.placed: - print(f" ✓ sub: {p.filename}") - - # 3. Dossier source → torrents ou trash - if rebuild and torrent_dst: - r = move(source_folder, torrent_dst) - if r["status"] != "ok": - print(f" ✗ source → torrents: {r['message']}") - sys.exit(1) - print(" ✓ source → torrents") - - # 4. Hard-link depuis season_folder → torrent_dst - torrent_dst_path = Path(torrent_dst) - for f, dst, _ in plan: - lib_file = Path(season_folder) / f.name - link_dst = torrent_dst_path / f.name - try: - os.link(lib_file, link_dst) - print(f" ✓ hard-link: {f.name}") - except OSError as e: - print(f" ✗ hard-link {f.name}: {e}") - sys.exit(1) - - elif trash_dst: - r = move(source_folder, trash_dst) - if r["status"] != "ok": - print(f" ✗ source → trash: {r['message']}") - sys.exit(1) - print(" ✓ source → trash") - - # 5. qBittorrent: update location + recheck - qb_location = torrent_root if (rebuild and torrent_dst) else None - _qbittorrent_update(src_path.name, qb_location) - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description="Debug release parsing + dry-run/move") - sub = parser.add_subparsers(dest="cmd") - - p_analyze = sub.add_parser( - "analyze", help="Parser une release (+ probe si path fourni)" - ) - p_analyze.add_argument("release_name") - p_analyze.add_argument("--path", help="Chemin vers le dossier/fichier source") - - p_dry = sub.add_parser( - "dryrun", help="Résout via TMDB et affiche les chemins sans rien bouger" - ) - p_dry.add_argument("release_name") - - p_move = sub.add_parser( - "move", help="Résout via TMDB et déplace le dossier (confirmation requise)" - ) - p_move.add_argument("release_name") - p_move.add_argument( - "source_folder", - nargs="?", - default=None, - help="Chemin absolu du dossier source (optionnel si workspace.download est configuré)", - ) - - args = parser.parse_args() - - if args.cmd == "analyze": - analyze(args.release_name, args.path) - elif args.cmd == "dryrun": - dry_run(args.release_name) - elif args.cmd == "move": - do_move(args.release_name, args.source_folder) - else: - parser.print_help() - sys.exit(1) diff --git a/testing/parse_release.py b/testing/parse_release.py deleted file mode 100644 index c40bef5..0000000 --- a/testing/parse_release.py +++ /dev/null @@ -1,270 +0,0 @@ -#!/usr/bin/env python3 -""" -parse_release.py — Test ParsedRelease interactively or via CLI args. - -Usage: - uv run testing/parse_release.py "Oz.S03.1080p.WEBRip.x265-KONTRAST" - uv run testing/parse_release.py "Oz.S03.1080p.WEBRip.x265-KONTRAST" --tmdb - uv run testing/parse_release.py "Inception.2010.1080p.BluRay.x265-GROUP" --tmdb-title "Inception" --tmdb-year 2010 - uv run testing/parse_release.py --interactive -""" - -import argparse -import sys -from pathlib import Path - -_PROJECT_ROOT = Path(__file__).resolve().parents[1] -if str(_PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(_PROJECT_ROOT)) - -# --------------------------------------------------------------------------- -# Colours -# --------------------------------------------------------------------------- - -RESET = "\033[0m" -BOLD = "\033[1m" -DIM = "\033[2m" -GREEN = "\033[32m" -YELLOW = "\033[33m" -RED = "\033[31m" -CYAN = "\033[36m" -BLUE = "\033[34m" - -USE_COLOR = True - - -def c(text: str, *codes: str) -> str: - if not USE_COLOR: - return str(text) - return "".join(codes) + str(text) + RESET - - -def kv(key: str, val: str, color: str = CYAN) -> None: - print(f" {c(key + ':', BOLD)} {c(val, color)}") - - -def hr() -> None: - print(c("─" * 64, DIM)) - - -# --------------------------------------------------------------------------- -# TMDB lookup -# --------------------------------------------------------------------------- - - -def _fetch_tmdb(title: str) -> tuple[str | None, int | None]: - """ - Call TMDBClient.search_media() and return (canonical_title, year). - Returns (None, None) on failure. - """ - try: - from alfred.infrastructure.api.tmdb import TMDBClient - - client = TMDBClient() - result = client.search_media(title) - year: int | None = None - if result.release_date: - try: - year = int(result.release_date[:4]) - except (ValueError, IndexError): - pass - print( - c( - f" TMDB → {result.title} ({year}) [{result.media_type}] imdb={result.imdb_id}", - DIM, - ) - ) - return result.title, year - except Exception as e: - print(c(f" TMDB lookup failed: {e}", YELLOW)) - return None, None - - -# --------------------------------------------------------------------------- -# Display -# --------------------------------------------------------------------------- - - -def _show( - release_name: str, - tmdb_title: str | None, - tmdb_year: int | None, - tmdb_episode_title: str | None, - ext: str, -) -> None: - from alfred.domain.release import parse_release - - p = parse_release(release_name) - - # Auto-fetch TMDB if requested and not already provided - if not (tmdb_title and tmdb_year): - fetched_title, fetched_year = _fetch_tmdb(p.title.replace(".", " ")) - tmdb_title = tmdb_title or fetched_title - tmdb_year = tmdb_year or fetched_year - - print() - print(c("━" * 64, BOLD)) - print(c(f" ParsedRelease — {p.raw}", BOLD, CYAN)) - print(c("━" * 64, BOLD)) - - # Core fields - hr() - kv("raw", p.raw) - kv("normalised", p.normalised) - kv("title", p.title) - kv("year", str(p.year) if p.year else c("None", DIM)) - kv("season", str(p.season) if p.season is not None else c("None", DIM)) - kv("episode", str(p.episode) if p.episode is not None else c("None", DIM)) - kv( - "episode_end", - str(p.episode_end) if p.episode_end is not None else c("None", DIM), - ) - kv("quality", p.quality or c("None", DIM)) - kv("source", p.source or c("None", DIM)) - kv("codec", p.codec or c("None", DIM)) - kv("group", p.group, YELLOW if p.group == "UNKNOWN" else GREEN) - kv("tech_string", p.tech_string or c("(empty)", DIM)) - - # Derived booleans - hr() - kv("is_movie", c(str(p.is_movie), GREEN if p.is_movie else DIM)) - kv("is_season_pack", c(str(p.is_season_pack), GREEN if p.is_season_pack else DIM)) - - # Generated names - hr() - title_for_names = tmdb_title or p.title.replace(".", " ") - year_for_names = tmdb_year or p.year or 0 - - if p.is_movie: - kv("movie_folder_name", p.movie_folder_name(title_for_names, year_for_names)) - kv("movie_filename", p.movie_filename(title_for_names, year_for_names, ext)) - else: - kv("show_folder_name", p.show_folder_name(title_for_names, year_for_names)) - kv("season_folder_name", p.season_folder_name()) - if not p.is_season_pack: - kv("episode_filename", p.episode_filename(tmdb_episode_title, ext)) - else: - kv("episode_filename", c("(season pack — no episode filename)", DIM)) - - if tmdb_title or tmdb_year or tmdb_episode_title: - hr() - print(c(" TMDB data used:", DIM)) - if tmdb_title: - kv(" tmdb_title", tmdb_title) - if tmdb_year: - kv(" tmdb_year", str(tmdb_year)) - if tmdb_episode_title: - kv(" tmdb_episode_title", tmdb_episode_title) - - print(c("━" * 64, BOLD)) - print() - - -# --------------------------------------------------------------------------- -# Interactive mode -# --------------------------------------------------------------------------- - - -def _interactive() -> None: - print(c("\n Alfred — Release Parser REPL", BOLD, CYAN)) - print(c(" Type a release name, or 'q' to quit.", DIM)) - print( - c( - " Inline overrides: ::title=Oz ::year=1997 ::ep=The.Routine ::ext=.mkv\n", - DIM, - ) - ) - - while True: - try: - raw = input(c(" release> ", BOLD)).strip() - except (EOFError, KeyboardInterrupt): - print() - break - - if not raw or raw.lower() in ("q", "quit", "exit"): - break - - # Parse inline overrides: "Oz.S03E01... ::title=Oz ::year=1997 ::tmdb" - parts = raw.split("::") - release = parts[0].strip() - overrides: dict[str, str] = {} - for part in parts[1:]: - part = part.strip() - if "=" in part: - k, _, v = part.partition("=") - overrides[k.strip()] = v.strip() - else: - overrides[part] = "1" # flag-style: ::tmdb - - tmdb_title = overrides.get("title") - tmdb_year = int(overrides["year"]) if "year" in overrides else None - tmdb_episode_title = overrides.get("ep") - ext = overrides.get("ext", ".mkv") - try: - _show(release, tmdb_title, tmdb_year, tmdb_episode_title, ext) - except Exception as e: - print(c(f" Error: {e}", RED)) - - -# --------------------------------------------------------------------------- -# CLI -# --------------------------------------------------------------------------- - - -def main() -> None: - global USE_COLOR - - parser = argparse.ArgumentParser( - description="Test ParsedRelease from domain/release/release_parser.py", - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("release", nargs="?", help="Release name to parse") - parser.add_argument( - "-i", "--interactive", action="store_true", help="Interactive REPL mode" - ) - parser.add_argument( - "--tmdb-title", metavar="TITLE", help="Override TMDB title for name generation" - ) - parser.add_argument( - "--tmdb-year", - metavar="YEAR", - type=int, - help="Override TMDB year for name generation", - ) - parser.add_argument( - "--episode-title", - metavar="TITLE", - help="TMDB episode title for episode_filename()", - ) - parser.add_argument( - "--ext", - default=".mkv", - metavar="EXT", - help="File extension for filename generation (default: .mkv)", - ) - parser.add_argument("--no-color", action="store_true") - args = parser.parse_args() - - if args.no_color or not sys.stdout.isatty(): - USE_COLOR = False - - if args.interactive: - _interactive() - return - - if not args.release: - parser.print_help() - sys.exit(1) - - try: - _show( - args.release, args.tmdb_title, args.tmdb_year, args.episode_title, args.ext - ) - except Exception as e: - print(c(f"Error: {e}", RED), file=sys.stderr) - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/testing/probe_video.py b/testing/probe_video.py deleted file mode 100644 index 6dd84e2..0000000 --- a/testing/probe_video.py +++ /dev/null @@ -1,164 +0,0 @@ -#!/usr/bin/env python3 -""" -probe_video.py — Display MediaInfo extracted by ffprobe for a video file. - -Usage: - uv run testing/probe_video.py /path/to/video.mkv - uv run testing/probe_video.py /path/to/video.mkv --no-color -""" - -import argparse -import sys -from pathlib import Path - -_PROJECT_ROOT = Path(__file__).resolve().parents[1] -if str(_PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(_PROJECT_ROOT)) - -# --------------------------------------------------------------------------- -# Colours -# --------------------------------------------------------------------------- - -RESET = "\033[0m" -BOLD = "\033[1m" -DIM = "\033[2m" -GREEN = "\033[32m" -YELLOW = "\033[33m" -RED = "\033[31m" -CYAN = "\033[36m" -BLUE = "\033[34m" - -USE_COLOR = True - - -def c(text: str, *codes: str) -> str: - if not USE_COLOR: - return str(text) - return "".join(codes) + str(text) + RESET - - -def kv(key: str, val: str, indent: int = 4, color: str = CYAN) -> None: - print(f"{' ' * indent}{c(key + ':', BOLD)} {c(val, color)}") - - -def section(title: str) -> None: - print() - print(f" {c('▸ ' + title, BOLD, BLUE)}") - - -def hr() -> None: - print(c("─" * 70, DIM)) - - -# --------------------------------------------------------------------------- -# Formatting helpers -# --------------------------------------------------------------------------- - - -def fmt_duration(seconds: float) -> str: - h = int(seconds // 3600) - m = int((seconds % 3600) // 60) - s = int(seconds % 60) - if h: - return f"{h}h {m:02d}m {s:02d}s" - return f"{m}m {s:02d}s" - - -def fmt_channels(channels: int | None, layout: str | None) -> str: - parts = [] - if channels is not None: - parts.append(str(channels) + "ch") - if layout: - parts.append(f"({layout})") - return " ".join(parts) if parts else "—" - - -def flag(val: bool) -> str: - return c("yes", GREEN) if val else c("no", DIM) - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - - -def main() -> None: - global USE_COLOR - - parser = argparse.ArgumentParser(description="Probe a video file with ffprobe") - parser.add_argument("file", help="Path to the video file") - parser.add_argument("--no-color", action="store_true") - args = parser.parse_args() - - if args.no_color or not sys.stdout.isatty(): - USE_COLOR = False - - path = Path(args.file) - if not path.exists(): - print(c(f"Error: {path} does not exist", RED), file=sys.stderr) - sys.exit(1) - - from alfred.infrastructure.probe import FfprobeMediaProber - - info = FfprobeMediaProber().probe(path) - if info is None: - print(c("Error: ffprobe failed to probe the file", RED), file=sys.stderr) - sys.exit(1) - - print() - print(c("━" * 70, BOLD)) - print(c(f" {path.name}", BOLD, CYAN)) - print(c(f" {path}", DIM)) - print(c("━" * 70, BOLD)) - - # --- Video --- - section("Video") - kv("codec", info.video_codec or c("—", DIM)) - kv("resolution", info.resolution or c("—", DIM)) - if info.width and info.height: - kv("dimensions", f"{info.width} × {info.height}") - if info.duration_seconds is not None: - kv("duration", fmt_duration(info.duration_seconds)) - if info.bitrate_kbps is not None: - kv("bitrate", f"{info.bitrate_kbps} kbps") - - # --- Audio --- - section(f"Audio {c(str(len(info.audio_tracks)) + ' track(s)', DIM)}") - if not info.audio_tracks: - print(f" {c('no audio tracks found', DIM)}") - for track in info.audio_tracks: - lang = track.language or "und" - default_marker = f" {c('default', GREEN, DIM)}" if track.is_default else "" - print(f" {c(f'[{track.index}]', BOLD)} {c(lang, YELLOW)}{default_marker}") - kv("codec", track.codec or c("—", DIM), indent=8) - kv("channels", fmt_channels(track.channels, track.channel_layout), indent=8) - - # --- Subtitles --- - section(f"Subtitles {c(str(len(info.subtitle_tracks)) + ' track(s)', DIM)}") - if not info.subtitle_tracks: - print(f" {c('no embedded subtitle tracks', DIM)}") - for track in info.subtitle_tracks: - lang = track.language or "und" - markers = [] - if track.is_default: - markers.append(c("default", GREEN, DIM)) - if track.is_forced: - markers.append(c("forced", YELLOW, DIM)) - marker_str = (" " + " ".join(markers)) if markers else "" - print(f" {c(f'[{track.index}]', BOLD)} {c(lang, YELLOW)}{marker_str}") - kv("codec", track.codec or c("—", DIM), indent=8) - - # --- Summary --- - print() - hr() - multi = c("yes", GREEN) if info.is_multi_audio else c("no", DIM) - langs = ", ".join(info.audio_languages) if info.audio_languages else c("—", DIM) - print( - f" {c('multi-audio:', BOLD)} {multi} {c('languages:', BOLD)} {c(langs, CYAN)}" - ) - hr() - print() - - -if __name__ == "__main__": - main() diff --git a/testing/recognize_folders_in_downloads.py b/testing/recognize_folders_in_downloads.py deleted file mode 100644 index 3069e05..0000000 --- a/testing/recognize_folders_in_downloads.py +++ /dev/null @@ -1,220 +0,0 @@ -#!/usr/bin/env python3 -""" -recognize_folders_in_downloads.py — Parse every folder/file in the downloads directory. - -Usage: - uv run testing/recognize_folders_in_downloads.py - uv run testing/recognize_folders_in_downloads.py --path /mnt/testipool/downloads - uv run testing/recognize_folders_in_downloads.py --failures-only - uv run testing/recognize_folders_in_downloads.py --successes-only -""" - -import argparse -import sys -from pathlib import Path - -_PROJECT_ROOT = Path(__file__).resolve().parents[1] -if str(_PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(_PROJECT_ROOT)) - -# --------------------------------------------------------------------------- -# Colours -# --------------------------------------------------------------------------- - -RESET = "\033[0m" -BOLD = "\033[1m" -DIM = "\033[2m" -GREEN = "\033[32m" -YELLOW = "\033[33m" -RED = "\033[31m" -CYAN = "\033[36m" - -USE_COLOR = True - - -def c(text: str, *codes: str) -> str: - if not USE_COLOR: - return str(text) - return "".join(codes) + str(text) + RESET - - -def kv(key: str, val: str, indent: int = 4, color: str = CYAN) -> None: - print(f"{' ' * indent}{c(key + ':', BOLD)} {c(val, color)}") - - -def hr() -> None: - print(c("─" * 70, DIM)) - - -# --------------------------------------------------------------------------- -# Parsing quality check -# --------------------------------------------------------------------------- - - -def _assess(p) -> list[str]: - """Return a list of warning strings for fields that look wrong.""" - if p.media_type in ("other", "unknown"): - return [] - warnings = [] - if p.group == "UNKNOWN": - warnings.append("group not found") - if not p.quality: - warnings.append("resolution not found") - if not p.codec: - warnings.append("codec not found") - if not p.title or p.title == p.normalised: - warnings.append("title extraction likely wrong") - return warnings - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - - -def main() -> None: - global USE_COLOR - - parser = argparse.ArgumentParser( - description="Recognize release folders in downloads" - ) - parser.add_argument( - "--path", - default="/mnt/testipool/downloads", - help="Downloads directory (default: /mnt/testipool/downloads)", - ) - parser.add_argument( - "--failures-only", action="store_true", help="Show only entries with warnings" - ) - parser.add_argument( - "--successes-only", action="store_true", help="Show only fully parsed entries" - ) - parser.add_argument("--no-color", action="store_true") - args = parser.parse_args() - - if args.no_color or not sys.stdout.isatty(): - USE_COLOR = False - - downloads = Path(args.path) - if not downloads.exists(): - print(c(f"Error: {downloads} does not exist", RED), file=sys.stderr) - sys.exit(1) - - from dataclasses import replace - - from alfred.application.release.detect_media_type import detect_media_type - from alfred.application.release.enrich_from_probe import enrich_from_probe - from alfred.domain.release.services import parse_release - from alfred.domain.release.value_objects import MediaTypeToken - from alfred.infrastructure.filesystem.find_video import find_video_file - from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge - from alfred.infrastructure.probe import FfprobeMediaProber - - _kb = YamlReleaseKnowledge() - _prober = FfprobeMediaProber() - - entries = sorted(downloads.iterdir(), key=lambda p: p.name.lower()) - total = len(entries) - ok_count = 0 - warn_count = 0 - - print() - print(c("━" * 70, BOLD)) - print(c(f" Downloads — {downloads}", BOLD, CYAN)) - print(c(f" {total} entries", DIM)) - print(c("━" * 70, BOLD)) - - for entry in entries: - name = entry.name - - try: - p, _report = parse_release(name, _kb) - p = replace(p, media_type=MediaTypeToken(detect_media_type(p, entry, _kb))) - if p.media_type not in ("unknown", "other"): - video_file = find_video_file(entry) - if video_file: - media_info = _prober.probe(video_file) - if media_info: - p = enrich_from_probe(p, media_info, _kb) - warnings = _assess(p) - except Exception as e: - warnings = [f"parse error: {e}"] - p = None - - has_warnings = bool(warnings) - - if args.failures_only and not has_warnings: - continue - if args.successes_only and has_warnings: - continue - - print() - path_label = "" - if p: - path_label = { - "direct": c("direct", GREEN, DIM), - "sanitized": c("sanitized", YELLOW), - "ai": c("ai", RED), - }.get(p.parse_path, p.parse_path) - - if has_warnings: - warn_count += 1 - print(f" {c('⚠', YELLOW, BOLD)} {c(name, YELLOW)} {path_label}") - else: - ok_count += 1 - print(f" {c('✓', GREEN, BOLD)} {c(name, BOLD)} {path_label}") - - if p: - kind = { - "movie": "movie", - "tv_show": "season pack" if p.is_season_pack else "episode", - "tv_complete": c("tv complete", CYAN), - "documentary": c("documentary", CYAN), - "concert": c("concert", CYAN), - "other": c("other", RED), - "unknown": c("unknown", YELLOW), - }.get(p.media_type, p.media_type) - kv("type", kind) - kv("title", p.title) - if p.season is not None: - ep = f"E{p.episode:02d}" if p.episode is not None else "—" - kv("season/ep", f"S{p.season:02d} / {ep}") - if p.year: - kv("year", str(p.year)) - if p.languages: - kv("langs", " ".join(p.languages)) - kv("quality", p.quality or c("—", DIM)) - kv("source", p.source or c("—", DIM)) - kv("codec", p.codec or c("—", DIM)) - if p.audio_codec: - ch = f" {p.audio_channels}" if p.audio_channels else "" - kv("audio", f"{p.audio_codec}{ch}") - if p.bit_depth or p.hdr_format: - hdr_parts = [x for x in [p.bit_depth, p.hdr_format] if x] - kv("hdr/depth", " ".join(hdr_parts)) - if p.edition: - kv("edition", p.edition, color=YELLOW) - kv("group", p.group, color=YELLOW if p.group == "UNKNOWN" else GREEN) - if p.site_tag: - kv("site tag", p.site_tag, color=YELLOW) - - if warnings: - for w in warnings: - print(f" {c('→ ' + w, YELLOW)}") - - # Summary - print() - hr() - skipped = total - ok_count - warn_count - print( - f" {c('Total:', BOLD)} {total} " - f"{c(str(ok_count) + ' ok', GREEN, BOLD)} " - f"{c(str(warn_count) + ' warnings', YELLOW, BOLD)}" - + (f" {c(str(skipped) + ' filtered', DIM)}" if skipped else "") - ) - hr() - print() - - -if __name__ == "__main__": - main() diff --git a/testing/subtitles/scan_subtitles.py b/testing/subtitles/scan_subtitles.py deleted file mode 100644 index 7adb304..0000000 --- a/testing/subtitles/scan_subtitles.py +++ /dev/null @@ -1,575 +0,0 @@ -#!/usr/bin/env python3 -""" -scan_subtitles.py — CLI pour tester le pipeline de scan de sous-titres Alfred. - -Usage: - uv run testing/subtitles/scan_subtitles.py [options] - -Options: - --release-group RARBG Groupe de release (optionnel — active les known patterns) - --pattern adjacent Forcer un pattern (adjacent|flat|episode_subfolder|embedded) - --video FILE Fichier vidéo de référence (défaut: premier .mkv/.mp4 trouvé) - --verbose Détails sur chaque token analysé - --no-color Désactive la colorisation - -Exemples: - uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" - uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" --release-group RARBG - uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" --pattern episode_subfolder --verbose -""" - -import argparse -import sys -import textwrap -from pathlib import Path - -# Ajoute la racine du projet au path (testing/subtitles/ → ../../) -_PROJECT_ROOT = Path(__file__).resolve().parents[2] -if str(_PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(_PROJECT_ROOT)) - -# --------------------------------------------------------------------------- -# Colorisation simple (pas de dépendance externe) -# --------------------------------------------------------------------------- - -USE_COLOR = True - -RESET = "\033[0m" -BOLD = "\033[1m" -DIM = "\033[2m" -GREEN = "\033[32m" -YELLOW = "\033[33m" -RED = "\033[31m" -CYAN = "\033[36m" -BLUE = "\033[34m" -MAGENTA = "\033[35m" - - -def c(text: str, *codes: str) -> str: - if not USE_COLOR: - return text - return "".join(codes) + text + RESET - - -def section(title: str) -> None: - width = 70 - print() - print(c("─" * width, DIM)) - print(c(f" {title}", BOLD, CYAN)) - print(c("─" * width, DIM)) - - -def ok(msg: str) -> None: - print(c(" ✓ ", GREEN, BOLD) + msg) - - -def warn(msg: str) -> None: - print(c(" ⚠ ", YELLOW, BOLD) + msg) - - -def err(msg: str) -> None: - print(c(" ✗ ", RED, BOLD) + msg) - - -def info(msg: str, indent: int = 2) -> None: - print(" " * indent + msg) - - -def kv(key: str, value: str, indent: int = 4) -> None: - print(" " * indent + c(f"{key}: ", BOLD) + value) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".mov", ".ts", ".m2ts"} - - -def find_videos(folder: Path) -> list[Path]: - return sorted( - p for p in folder.iterdir() if p.is_file() and p.suffix.lower() in VIDEO_EXTS - ) - - -def confidence_bar(conf: float, width: int = 20) -> str: - filled = int(conf * width) - bar = "█" * filled + "░" * (width - filled) - if conf >= 0.8: - color = GREEN - elif conf >= 0.5: - color = YELLOW - else: - color = RED - return c(bar, color) + c(f" {conf:.0%}", BOLD) - - -def track_summary(track, verbose: bool = False) -> None: - lang = track.language.code if track.language else c("?", RED) - fmt = track.format.id if track.format else c("?", RED) - typ = track.subtitle_type.value - src = ( - "embedded" - if track.is_embedded - else (track.file_path.name if track.file_path else "?") - ) - - # Couleur du type - type_colors = { - "standard": GREEN, - "sdh": YELLOW, - "forced": BLUE, - "unknown": RED, - } - typ_str = c(typ, type_colors.get(typ, RESET)) - - unresolved = not track.is_embedded and track.language is None - clarif = c(" [langue inconnue]", RED, BOLD) if unresolved else "" - - print(f" {c(src, BOLD)}") - print(f" lang={c(lang, CYAN)} type={typ_str} format={fmt}") - conf_str = ( - c("n/a (embedded)", DIM) - if track.is_embedded - else confidence_bar(track.confidence) - ) - print(f" confidence={conf_str}{clarif}") - - if track.entry_count is not None: - print( - f" entries={track.entry_count} size={track.file_size_kb:.1f} KB" - if track.file_size_kb - else f" entries={track.entry_count}" - ) - - if verbose and track.raw_tokens: - print(f" tokens={track.raw_tokens}") - - if track.is_resolved() and track.language and track.format: - try: - dest = track.destination_name - print(f" → {c(dest, GREEN, BOLD)}") - except ValueError: - pass - - -# --------------------------------------------------------------------------- -# Étapes du pipeline -# --------------------------------------------------------------------------- - - -def step_load_kb() -> SubtitleKnowledgeBase: - from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase - from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader - - section("ÉTAPE 1 — Chargement de la base de connaissances") - kb = SubtitleKnowledgeBase(KnowledgeLoader()) - - fmts = kb.formats() - langs = kb.languages() - patterns = kb.patterns() - - ok(f"{len(fmts)} format(s) connu(s): {', '.join(fmts.keys())}") - ok(f"{len(langs)} langue(s) connue(s): {', '.join(langs.keys())}") - ok(f"{len(patterns)} pattern(s) connu(s): {', '.join(patterns.keys())}") - - total_tokens = sum(len(l.tokens) for l in langs.values()) - info(c(f"→ {total_tokens} tokens de langue au total", DIM), indent=4) - - return kb - - -def step_detect_pattern( - kb: SubtitleKnowledgeBase, - season_folder: Path, - sample_video: Path, - release_group: str | None, - forced_pattern: str | None, -) -> SubtitlePattern: - from alfred.domain.subtitles.services.pattern_detector import PatternDetector - - section("ÉTAPE 2 — Détection du pattern de release") - - # Priorité: forced > known patterns from release_group > auto-detect - if forced_pattern: - pattern = kb.pattern(forced_pattern) - if not pattern: - err(f"Pattern inconnu: '{forced_pattern}'") - print(f" Patterns disponibles: {', '.join(kb.patterns().keys())}") - sys.exit(1) - ok(f"Pattern forcé: {c(forced_pattern, CYAN, BOLD)}") - return pattern - - if release_group: - known = kb.patterns_for_group(release_group) - if known: - kv("Release group", release_group) - ok( - f"Pattern(s) connu(s) pour {release_group}: {', '.join(p.id for p in known)}" - ) - pattern = known[0] - kv("Pattern sélectionné", c(pattern.id, CYAN, BOLD)) - return pattern - else: - warn(f"Groupe '{release_group}' inconnu — lancement de la détection auto") - - # Auto-detect - kv("Dossier analysé", str(season_folder)) - kv("Vidéo de référence", sample_video.name) - - detector = PatternDetector(kb) - result = detector.detect(season_folder, sample_video) - - findings = result.get("raw_findings", {}) - info(c("Observations:", BOLD), indent=4) - for key, val in findings.items(): - if val not in (False, None, 0): - info(f" {key}: {c(str(val), CYAN)}", indent=4) - - detected = result.get("detected") - confidence = result.get("confidence", 0.0) - description = result.get("description", "") - - print() - info(c(f'Description: "{description}"', DIM), indent=4) - print(f" Confiance: {confidence_bar(confidence)}") - - if detected: - ok(f"Pattern détecté: {c(detected.id, CYAN, BOLD)}") - kv("Stratégie de scan", detected.scan_strategy.value) - kv("Détection de type", detected.type_detection.value) - if detected.root_folder: - kv("Dossier racine", detected.root_folder) - return detected - else: - warn("Aucun pattern détecté avec confiance suffisante — fallback: adjacent") - fallback = kb.pattern("adjacent") - if not fallback: - err("Pattern 'adjacent' introuvable dans la KB !") - sys.exit(1) - return fallback - - -def step_identify_tracks( - kb: SubtitleKnowledgeBase, - sample_video: Path, - pattern: SubtitlePattern, - release_group: str | None, - verbose: bool, -) -> MediaSubtitleMetadata: - from alfred.domain.subtitles.services.identifier import SubtitleIdentifier - - section("ÉTAPE 3 — Identification des pistes") - - kv("Vidéo", sample_video.name) - kv("Pattern", pattern.id) - - identifier = SubtitleIdentifier(kb) - metadata = identifier.identify( - video_path=sample_video, - pattern=pattern, - media_id=None, - media_type="tv_show", - release_group=release_group, - ) - - n_emb = len(metadata.embedded_tracks) - n_ext = len(metadata.external_tracks) - n_unresolved = len(metadata.unresolved_tracks) - - print() - ok(f"{n_ext} piste(s) externe(s) trouvée(s)") - if n_emb: - ok(f"{n_emb} piste(s) embarquée(s) (ffprobe)") - if n_unresolved: - warn(f"{n_unresolved} piste(s) externe(s) sans langue reconnue") - - if metadata.external_tracks: - print() - info(c("Pistes externes:", BOLD)) - for track in metadata.external_tracks: - track_summary(track, verbose) - - if metadata.embedded_tracks: - print() - info(c("Pistes embarquées:", BOLD)) - for track in metadata.embedded_tracks: - track_summary(track, verbose) - - return metadata - - -def step_apply_rules( - metadata: MediaSubtitleMetadata, - release_group: str | None, -) -> tuple[SubtitleMatchingRules | None, list, list]: - from alfred.domain.subtitles.aggregates import DEFAULT_RULES - from alfred.domain.subtitles.services.matcher import SubtitleMatcher - from alfred.domain.subtitles.services.utils import available_subtitles - from alfred.domain.subtitles.value_objects import ScanStrategy - - section("ÉTAPE 4 — Application des règles") - - # Cas embedded : pas de matcher, on liste directement les pistes disponibles - if metadata.detected_pattern_id == ScanStrategy.EMBEDDED.value: - info(c("Pattern embedded — le matcher est court-circuité", DIM), indent=4) - tracks = available_subtitles(metadata.embedded_tracks) - ok(f"{len(tracks)} piste(s) disponible(s)") - return None, tracks, [] - - rules = DEFAULT_RULES() - kv("Langues préférées", str(rules.preferred_languages)) - kv("Formats préférés", str(rules.preferred_formats)) - kv("Types autorisés", str(rules.allowed_types)) - kv("Confiance min", str(rules.min_confidence)) - info( - c("(règles globales par défaut — pas de .alfred/ en mode scan)", DIM), indent=4 - ) - - matcher = SubtitleMatcher() - matched, unresolved = matcher.match(metadata.external_tracks, rules) - - print() - ok(f"{len(matched)} piste(s) retenue(s)") - if unresolved: - warn(f"{len(unresolved)} piste(s) écartée(s) ou non résolue(s)") - - return rules, matched, unresolved - - -def step_show_results( - matched: list, - unresolved: list, - is_embedded: bool, - verbose: bool, -) -> None: - section("RÉSULTAT FINAL") - - if matched: - label = ( - "piste(s) disponible(s)" if is_embedded else "piste(s) qui seraient placées" - ) - ok(f"{len(matched)} {label}:") - for track in matched: - lang = track.language.code if track.language else "?" - typ = track.subtitle_type.value - if is_embedded: - print(f" {c(lang, CYAN)} {c(typ, GREEN)}") - else: - try: - dest = track.destination_name - src = track.file_path.name if track.file_path else "?" - print(f" {c(src, DIM)} → {c(dest, GREEN, BOLD)}") - except ValueError: - warn(f" Piste incomplète (lang ou format manquant): {track}") - else: - warn("Aucune piste retenue.") - - if unresolved: - print() - warn(f"{len(unresolved)} piste(s) écartées ou à clarifier:") - for track in unresolved: - src = track.file_path.name if track.file_path else "?" - reason = ( - "langue inconnue" - if track.language is None - else "confiance insuffisante" - ) - line = f" {c(src, DIM)} ({reason})" - if verbose and track.raw_tokens: - line += c(f" tokens: {track.raw_tokens}", YELLOW) - print(line) - - print() - - -# --------------------------------------------------------------------------- -# Scan multi-épisodes (résumé) -# --------------------------------------------------------------------------- - - -def scan_season( - kb: SubtitleKnowledgeBase, - pattern: SubtitlePattern, - season_folder: Path, - release_group: str | None, - verbose: bool, -) -> None: - from alfred.domain.subtitles.aggregates import DEFAULT_RULES - from alfred.domain.subtitles.services.identifier import SubtitleIdentifier - from alfred.domain.subtitles.services.matcher import SubtitleMatcher - - videos = find_videos(season_folder) - - section(f"SCAN COMPLET DE LA SAISON ({len(videos)} épisode(s))") - - if not videos: - warn("Aucun fichier vidéo trouvé dans ce dossier.") - return - - identifier = SubtitleIdentifier(kb) - matcher = SubtitleMatcher() - rules = DEFAULT_RULES() - - col_w = max(len(v.name) for v in videos) + 2 - - for video in videos: - metadata = identifier.identify( - video_path=video, - pattern=pattern, - media_id=None, - media_type="tv_show", - release_group=release_group, - ) - matched, unresolved = matcher.match(metadata.external_tracks, rules) - - placed_names = [] - for t in matched: - try: - placed_names.append(t.destination_name) - except ValueError: - pass - - status_icon = c("✓", GREEN, BOLD) if placed_names else c("✗", RED, BOLD) - warn_icon = ( - c(f" [{len(unresolved)} non-résolue(s)]", YELLOW) if unresolved else "" - ) - - print( - f" {status_icon} {video.name:{col_w}} {c(', '.join(placed_names) or '—', GREEN if placed_names else DIM)}{warn_icon}" - ) - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="Scanner de sous-titres Alfred — pipeline de diagnostic", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=textwrap.dedent(__doc__ or ""), - ) - parser.add_argument("season_folder", help="Dossier de la saison (ou du film)") - parser.add_argument( - "--release-group", - "-g", - metavar="GROUP", - help="Groupe de release (ex: RARBG, KONSTRAST)", - ) - parser.add_argument( - "--pattern", - "-p", - metavar="PATTERN", - help="Forcer un pattern (adjacent|flat|episode_subfolder|embedded)", - ) - parser.add_argument( - "--video", - "-v", - metavar="FILE", - help="Fichier vidéo de référence (défaut: premier trouvé)", - ) - parser.add_argument( - "--verbose", action="store_true", help="Affiche les tokens bruts par piste" - ) - parser.add_argument( - "--no-color", action="store_true", help="Désactive la colorisation ANSI" - ) - parser.add_argument( - "--season-scan", - action="store_true", - help="Après le diagnostic, scanner tous les épisodes de la saison", - ) - return parser.parse_args() - - -def main() -> None: - global USE_COLOR - - args = parse_args() - - if args.no_color or not sys.stdout.isatty(): - USE_COLOR = False - - season_folder = Path(args.season_folder).expanduser().resolve() - if not season_folder.is_dir(): - print(f"Erreur: '{season_folder}' n'est pas un dossier.", file=sys.stderr) - sys.exit(1) - - print() - print(c("━" * 70, BOLD)) - print(c(" Alfred — Subtitle Scanner", BOLD, MAGENTA)) - print(c("━" * 70, BOLD)) - kv("Dossier", str(season_folder), indent=2) - - # Trouver la vidéo de référence - if args.video: - sample_video = Path(args.video).expanduser().resolve() - if not sample_video.exists(): - print(f"Erreur: '{sample_video}' introuvable.", file=sys.stderr) - sys.exit(1) - else: - videos = find_videos(season_folder) - if not videos: - # Chercher un niveau plus bas (structure release root) - for sub in season_folder.iterdir(): - if sub.is_dir(): - videos = find_videos(sub) - if videos: - break - if not videos: - print( - "Erreur: aucun fichier vidéo trouvé dans ce dossier.", file=sys.stderr - ) - sys.exit(1) - sample_video = videos[0] - - kv("Vidéo de référence", sample_video.name, indent=2) - - # ---- Pipeline ---- - kb = step_load_kb() - - pattern = step_detect_pattern( - kb=kb, - season_folder=season_folder, - sample_video=sample_video, - release_group=args.release_group, - forced_pattern=args.pattern, - ) - - metadata = step_identify_tracks( - kb=kb, - sample_video=sample_video, - pattern=pattern, - release_group=args.release_group, - verbose=args.verbose, - ) - - rules, matched, unresolved = step_apply_rules( - metadata=metadata, - release_group=args.release_group, - ) - - step_show_results( - matched=matched, - unresolved=unresolved, - is_embedded=rules is None, - verbose=args.verbose, - ) - - if args.season_scan: - scan_season( - kb=kb, - pattern=pattern, - season_folder=season_folder, - release_group=args.release_group, - verbose=args.verbose, - ) - - print(c("━" * 70, BOLD)) - print() - - -if __name__ == "__main__": - main() diff --git a/testing/workflows/run_workflow.py b/testing/workflows/run_workflow.py deleted file mode 100755 index f238226..0000000 --- a/testing/workflows/run_workflow.py +++ /dev/null @@ -1,643 +0,0 @@ -#!/usr/bin/env python3 -""" -run_workflow.py — Simulate an Alfred workflow step by step (dry-run or live). - -Usage: - uv run testing/workflows/run_workflow.py organize_media [options] - -Options: - --dry-run Print what each step would do without executing tools (default). - --live Actually execute the tools (uses real filesystem + memory). - --source PATH Source video file (download folder). - --dest PATH Destination video file (library path). - --download-folder P Original download folder (for create_seed_links). - --imdb-id ID IMDb ID for identify_media step (tt1234567). - --seed Answer "yes" to the seeding question. - --no-color Disable ANSI colours. - -Examples: - uv run testing/workflows/run_workflow.py organize_media --dry-run \\ - --source "/downloads/Breaking.Bad.S01E01.mkv" \\ - --dest "/tv/Breaking Bad/Season 01/Breaking Bad.S01E01.mkv" - - uv run testing/workflows/run_workflow.py organize_media --live \\ - --source "/downloads/BB/Breaking.Bad.S01E01.mkv" \\ - --dest "/tv/Breaking Bad/Season 01/Breaking Bad.S01E01.mkv" \\ - --download-folder "/downloads/BB" --seed -""" - -import argparse -import sys -import textwrap -from pathlib import Path -from typing import Any - -# Project root on sys.path -_PROJECT_ROOT = Path(__file__).resolve().parents[2] -if str(_PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(_PROJECT_ROOT)) - -# --------------------------------------------------------------------------- -# Colours -# --------------------------------------------------------------------------- - -USE_COLOR = True - -RESET = "\033[0m" -BOLD = "\033[1m" -DIM = "\033[2m" -GREEN = "\033[32m" -YELLOW = "\033[33m" -RED = "\033[31m" -CYAN = "\033[36m" -BLUE = "\033[34m" -MAGENTA = "\033[35m" - - -def c(text: str, *codes: str) -> str: - if not USE_COLOR: - return text - return "".join(codes) + str(text) + RESET - - -def section(title: str) -> None: - print() - print(c("─" * 70, DIM)) - print(c(f" {title}", BOLD, CYAN)) - print(c("─" * 70, DIM)) - - -def ok(msg: str) -> None: - print(c(" ✓ ", GREEN, BOLD) + msg) - - -def warn(msg: str) -> None: - print(c(" ⚠ ", YELLOW, BOLD) + msg) - - -def err(msg: str) -> None: - print(c(" ✗ ", RED, BOLD) + msg) - - -def info(msg: str) -> None: - print(f" {msg}") - - -def kv(key: str, val: str) -> None: - print(f" {c(key + ':', BOLD)} {val}") - - -# --------------------------------------------------------------------------- -# Dry-run tool stubs -# --------------------------------------------------------------------------- - - -def _real_list_folder(folder_type: str, path: str = ".") -> dict[str, Any]: - """Call the real list_folder (read-only, safe in dry-run).""" - # TODO: remove hardcoded fallback once download path is configured in LTM - _HARDCODED_DOWNLOAD_ROOT = "/mnt/testipool/downloads" - - try: - from alfred.infrastructure.persistence import get_memory, init_memory - - try: - get_memory() - except Exception: - init_memory() - from alfred.agent.tools.filesystem import list_folder - - result = list_folder(folder_type=folder_type, path=path) - if result.get("status") == "error" and folder_type == "download": - raise RuntimeError(result.get("message", "not configured")) - return result - except Exception as e: - if folder_type == "download": - warn( - f"list_folder: {e} — using hardcoded download root: {_HARDCODED_DOWNLOAD_ROOT}" - ) - import os - - resolved = ( - os.path.join(_HARDCODED_DOWNLOAD_ROOT, path) - if path != "." - else _HARDCODED_DOWNLOAD_ROOT - ) - try: - entries = sorted(os.listdir(resolved)) - except OSError as oe: - return {"status": "error", "error": "os_error", "message": str(oe)} - return { - "status": "ok", - "folder_type": folder_type, - "path": resolved, - "entries": entries, - "count": len(entries), - } - warn(f"list_folder: filesystem unavailable ({e}), falling back to stub") - return { - "status": "ok", - "folder_type": folder_type, - "path": path, - "entries": ["[stub — filesystem unavailable]"], - "count": 1, - } - - -def _real_find_media_imdb_id(media_title: str, **kwargs) -> dict[str, Any]: - """Call the real TMDB API even in dry-run (read-only, no filesystem side effects).""" - try: - from alfred.infrastructure.persistence import get_memory, init_memory - - try: - get_memory() - except Exception: - init_memory() - from alfred.agent.tools.api import find_media_imdb_id - - return find_media_imdb_id(media_title=media_title) - except Exception as e: - warn(f"find_media_imdb_id: TMDB unavailable ({e}), falling back to stub") - return { - "status": "ok", - "imdb_id": "tt0000000", - "title": media_title, - "media_type": "tv_show", - "year": 2024, - } - - -def _dry_resolve_destination( - release_name: str, - source_file: str, - tmdb_title: str, - tmdb_year: int, - tmdb_episode_title: str | None = None, - confirmed_folder: str | None = None, -) -> dict[str, Any]: - from alfred.domain.release import parse_release - - parsed = parse_release(release_name) - ext = Path(source_file).suffix - if parsed.is_movie: - folder = parsed.movie_folder_name(tmdb_title, tmdb_year) - fname = parsed.movie_filename(tmdb_title, tmdb_year, ext) - return { - "status": "ok", - "library_file": f"/movies/{folder}/{fname}", - "series_folder": f"/movies/{folder}", - "series_folder_name": folder, - "season_folder": None, - "season_folder_name": None, - "filename": fname, - "is_new_series_folder": True, - } - season_folder = parsed.season_folder_name() - show_folder = confirmed_folder or parsed.show_folder_name(tmdb_title, tmdb_year) - fname = ( - parsed.episode_filename(tmdb_episode_title, ext) - if not parsed.is_season_pack - else season_folder + ext - ) - return { - "status": "ok", - "library_file": f"/tv/{show_folder}/{season_folder}/{fname}", - "series_folder": f"/tv/{show_folder}", - "season_folder": f"/tv/{show_folder}/{season_folder}", - "series_folder_name": show_folder, - "season_folder_name": season_folder, - "filename": fname, - "is_new_series_folder": confirmed_folder is None, - } - - -def _dry_move_media(source: str, destination: str) -> dict[str, Any]: - return { - "status": "ok", - "source": source, - "destination": destination, - "filename": Path(destination).name, - "size": 0, - } - - -def _dry_manage_subtitles(source_video: str, destination_video: str) -> dict[str, Any]: - return { - "status": "ok", - "video_path": destination_video, - "placed": [], - "placed_count": 0, - "skipped_count": 0, - } - - -def _dry_create_seed_links( - library_file: str, original_download_folder: str -) -> dict[str, Any]: - return { - "status": "ok", - "torrent_subfolder": f"/torrents/{Path(original_download_folder).name}", - "linked_file": f"/torrents/{Path(original_download_folder).name}/{Path(library_file).name}", - "copied_files": ["[dry-run — no real copy]"], - "copied_count": 1, - "skipped": [], - } - - -DRY_RUN_TOOLS: dict[str, Any] = { - "list_folder": _real_list_folder, - "find_media_imdb_id": _real_find_media_imdb_id, - "resolve_destination": _dry_resolve_destination, - "move_media": _dry_move_media, - "manage_subtitles": _dry_manage_subtitles, - "create_seed_links": _dry_create_seed_links, -} - - -# --------------------------------------------------------------------------- -# Live tools -# --------------------------------------------------------------------------- - - -def _load_live_tools() -> dict[str, Any]: - from alfred.agent.tools.filesystem import ( - create_seed_links, - list_folder, - manage_subtitles, - move_media, - ) - - # find_media_imdb_id lives in the api tools - try: - from alfred.agent.tools.api import find_media_imdb_id - except ImportError: - - def find_media_imdb_id(**kwargs): # type: ignore[misc] - return { - "status": "error", - "error": "not_available", - "message": "api tools not loaded", - } - - return { - "list_folder": list_folder, - "find_media_imdb_id": find_media_imdb_id, - "move_media": move_media, - "manage_subtitles": manage_subtitles, - "create_seed_links": create_seed_links, - } - - -# --------------------------------------------------------------------------- -# Workflow runner -# --------------------------------------------------------------------------- - - -class WorkflowRunner: - def __init__( - self, - workflow: dict, - tools: dict[str, Any], - live: bool, - args: argparse.Namespace, - ): - self.workflow = workflow - self.tools = tools - self.live = live - self.args = args - self.context: dict[str, Any] = {} # step results accumulate here - self.step_results: list[dict] = [] - - def run(self) -> None: - name = self.workflow.get("name", "?") - desc = self.workflow.get("description", "").strip() - mode = c("LIVE", RED, BOLD) if self.live else c("DRY-RUN", YELLOW, BOLD) - - print() - print(c("━" * 70, BOLD)) - print(c(f" Alfred — Workflow Simulator [{mode}]", BOLD, MAGENTA)) - print(c("━" * 70, BOLD)) - kv("Workflow", c(name, CYAN, BOLD)) - kv("Description", desc) - kv("Tools allowed", ", ".join(self.workflow.get("tools", []))) - - steps = self.workflow.get("steps", []) - for step in steps: - self._run_step(step) - - section("SIMULATION TERMINÉE") - ok(f"{len(self.step_results)} step(s) exécuté(s)") - errors = [ - r for r in self.step_results if r.get("result", {}).get("status") == "error" - ] - if errors: - warn(f"{len(errors)} step(s) en erreur") - for r in errors: - err( - f" {r['id']}: {r['result'].get('error')} — {r['result'].get('message')}" - ) - print() - print(c("━" * 70, BOLD)) - print() - - def _run_step(self, step: dict) -> None: - step_id = step.get("id", "?") - - # --- ask_user step --- - if "ask_user" in step: - section(f"STEP [{step_id}] — ask_user") - q = step["ask_user"].get("question", "") - answers = step["ask_user"].get("answers", {}) - info(c(f'Question: "{q}"', BOLD)) - info(f"Réponses possibles: {', '.join(str(k) for k in answers.keys())}") - - answer = "yes" if self.args.seed else "no" - # PyYAML parses bare yes/no as booleans — normalise keys to str - answers_str = {str(k): v for k, v in answers.items()} - next_step = answers_str.get(answer, {}).get("next_step", "update_library") - ok(f"Réponse simulée: {c(answer, CYAN)} → next: {c(next_step, CYAN)}") - self.context["seeding"] = answer == "yes" - self.context["ask_seeding_answer"] = answer - self.context["next_after_ask"] = next_step - - # If "no", skip create_seed_links - if answer == "no": - self.context["skip_create_seed_links"] = True - return - - # --- memory_write step --- - if "memory_write" in step: - section(f"STEP [{step_id}] — memory_write ({step['memory_write']})") - if self.live: - warn("memory_write: pas encore implémenté dans le simulator live") - else: - ok("(dry-run) Library entry would be written to LTM") - self.step_results.append({"id": step_id, "result": {"status": "ok"}}) - return - - # --- tool step --- - tool_name = step.get("tool") - if not tool_name: - warn(f"Step '{step_id}' has no tool or ask_user — skipped") - return - - # Skip create_seed_links if user said no to seeding - if tool_name == "create_seed_links" and self.context.get( - "skip_create_seed_links" - ): - section(f"STEP [{step_id}] — {tool_name}") - warn("Skipped (user chose not to seed)") - return - - section(f"STEP [{step_id}] — {c(tool_name, CYAN, BOLD)}") - - desc = step.get("description", "").strip() - if desc: - info(c(desc, DIM)) - - kwargs = self._build_kwargs(tool_name, step) - for k, v in kwargs.items(): - kv(k, str(v)) - - if tool_name not in self.tools: - err(f"Tool '{tool_name}' not found in tool registry") - self.step_results.append( - {"id": step_id, "result": {"status": "error", "error": "unknown_tool"}} - ) - return - - try: - result = self.tools[tool_name](**kwargs) - except Exception as e: - err(f"Tool raised an exception: {e}") - self.step_results.append( - {"id": step_id, "result": {"status": "error", "error": str(e)}} - ) - return - - self._print_result(result, tool_name=tool_name) - self.context[step_id] = result - self.step_results.append({"id": step_id, "result": result}) - - # After list_downloads: confirm the requested media folder exists in downloads - if ( - tool_name == "list_folder" - and result.get("status") == "ok" - and self.args.source - ): - folder_path = result.get("path", "") - entries = result.get("entries", []) - if self.args.source in entries: - media_folder = str(Path(folder_path) / self.args.source) - self.context["media_folder"] = media_folder - print() - print( - f" {c('Dossier media trouvé:', BOLD, GREEN)} {c(media_folder, CYAN, BOLD)}" - ) - else: - warn(f"Dossier '{self.args.source}' introuvable dans {folder_path}") - - def _build_kwargs(self, tool_name: str, step: dict) -> dict[str, Any]: - """Build tool kwargs from step params + CLI args + previous context.""" - # Start from step-level params (static defaults from YAML) - kwargs: dict[str, Any] = dict(step.get("params") or {}) - - a = self.args - - if tool_name == "list_folder": - kwargs.setdefault("folder_type", "download") - - elif tool_name == "find_media_imdb_id": - if a.imdb_id: - kwargs["imdb_id"] = a.imdb_id - - elif tool_name == "resolve_destination": - media_folder = self.context.get("media_folder") - if a.release: - kwargs["release_name"] = a.release - elif a.source: - kwargs.setdefault("release_name", a.source) - if media_folder: - kwargs["source_file"] = media_folder - if a.tmdb_title: - kwargs["tmdb_title"] = a.tmdb_title - if a.tmdb_year: - kwargs["tmdb_year"] = a.tmdb_year - if a.episode_title: - kwargs["tmdb_episode_title"] = a.episode_title - - elif tool_name == "move_media": - # If resolve_destination ran, use its library_file as destination - resolved = self.context.get("resolve_destination", {}) - media_folder = self.context.get("media_folder") - if media_folder: - kwargs["source"] = media_folder - dest = a.dest or resolved.get("library_file") - if dest: - kwargs["destination"] = dest - - elif tool_name == "manage_subtitles": - resolved = self.context.get("resolve_destination", {}) - media_folder = self.context.get("media_folder") - if media_folder: - kwargs["source_video"] = media_folder - dest = a.dest or resolved.get("library_file") - if dest: - kwargs["destination_video"] = dest - - elif tool_name == "create_seed_links": - resolved = self.context.get("resolve_destination", {}) - library_file = a.dest or resolved.get("library_file") - if library_file: - kwargs["library_file"] = library_file - if a.download_folder: - kwargs["original_download_folder"] = a.download_folder - else: - # Use the resolved folder path from list_downloads context - list_result = self.context.get("list_downloads", {}) - folder_path = list_result.get("path") - if folder_path: - kwargs.setdefault("original_download_folder", folder_path) - - return kwargs - - def _print_result(self, result: dict, tool_name: str = "") -> None: - status = result.get("status", "?") - if status == "ok": - ok(f"status={c('ok', GREEN)}") - elif status == "needs_clarification": - warn(f"status={c('needs_clarification', YELLOW)}") - else: - err( - f"status={c(status, RED)} error={result.get('error')} msg={result.get('message')}" - ) - return - - # Highlight resolved folder path for list_folder - if tool_name == "list_folder" and result.get("path"): - print() - print( - f" {c('Dossier résolu:', BOLD, GREEN)} {c(result['path'], CYAN, BOLD)}" - ) - - # Pretty-print notable fields - skip = {"status", "error", "message"} - for k, v in result.items(): - if k in skip: - continue - if isinstance(v, list): - if v: - info(c(f"{k}:", BOLD)) - for item in v[:10]: - info(f" • {item}") - if len(v) > 10: - info(c(f" … and {len(v) - 10} more", DIM)) - else: - info(f"{c(k + ':', BOLD)} (empty)") - else: - kv(k, str(v)) - - -# --------------------------------------------------------------------------- -# CLI -# --------------------------------------------------------------------------- - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="Alfred workflow simulator", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=textwrap.dedent(__doc__ or ""), - ) - parser.add_argument("workflow", help="Workflow name (e.g. organize_media)") - parser.add_argument( - "--dry-run", - dest="dry_run", - action="store_true", - default=True, - help="Simulate steps without executing tools (default)", - ) - parser.add_argument( - "--live", - action="store_true", - help="Actually execute tools against the real filesystem", - ) - parser.add_argument( - "--source", - metavar="FOLDER_NAME", - help="Release folder name inside the download root (e.g. Oz.S03.1080p.WEBRip.x265-KONTRAST)", - ) - parser.add_argument( - "--dest", - metavar="PATH", - help="Destination video file (in library, overrides resolve_destination)", - ) - parser.add_argument( - "--download-folder", - metavar="PATH", - help="Original download folder (for create_seed_links)", - ) - parser.add_argument( - "--imdb-id", metavar="ID", help="IMDb ID for identify_media (tt1234567)" - ) - parser.add_argument( - "--release", - metavar="NAME", - help="Release name (e.g. Oz.S03.1080p.WEBRip.x265-KONTRAST)", - ) - parser.add_argument( - "--tmdb-title", metavar="TITLE", help="Canonical title from TMDB (e.g. 'Oz')" - ) - parser.add_argument( - "--tmdb-year", - metavar="YEAR", - type=int, - help="Start/release year from TMDB (e.g. 1997)", - ) - parser.add_argument( - "--episode-title", - metavar="TITLE", - help="Episode title from TMDB for single-episode releases", - ) - parser.add_argument( - "--seed", action="store_true", help='Answer "yes" to the seeding question' - ) - parser.add_argument("--no-color", action="store_true") - return parser.parse_args() - - -def main() -> None: - global USE_COLOR - args = parse_args() - - if args.no_color or not sys.stdout.isatty(): - USE_COLOR = False - - if args.live: - args.dry_run = False - - # Load workflow - from alfred.agent.workflows.loader import WorkflowLoader - - loader = WorkflowLoader() - workflow = loader.get(args.workflow) - if not workflow: - print(f"Erreur: workflow '{args.workflow}' introuvable.", file=sys.stderr) - print(f"Disponibles: {', '.join(loader.names())}", file=sys.stderr) - sys.exit(1) - - # Load tools - if args.live: - try: - tools = _load_live_tools() - except Exception as e: - print(f"Erreur chargement des tools live: {e}", file=sys.stderr) - sys.exit(1) - else: - tools = DRY_RUN_TOOLS - - runner = WorkflowRunner(workflow, tools, live=args.live, args=args) - runner.run() - - -if __name__ == "__main__": - main() diff --git a/tests/application/test_detect_media_type.py b/tests/application/test_detect_media_type.py index 1d55a62..82ba398 100644 --- a/tests/application/test_detect_media_type.py +++ b/tests/application/test_detect_media_type.py @@ -19,7 +19,7 @@ from pathlib import Path import pytest from alfred.application.release.detect_media_type import detect_media_type -from alfred.domain.release.services import parse_release +from alfred.domain.releases.parser.services import parse_release from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge _KB = YamlReleaseKnowledge() diff --git a/tests/domain/release/test_parser_v2_easy.py b/tests/domain/release/test_parser_v2_easy.py index 9721c5e..cbf899c 100644 --- a/tests/domain/release/test_parser_v2_easy.py +++ b/tests/domain/release/test_parser_v2_easy.py @@ -11,8 +11,8 @@ can't quietly drop EASY without us noticing. from __future__ import annotations -from alfred.domain.release.parser import TokenRole -from alfred.domain.release.parser.pipeline import ( +from alfred.domain.releases.parser import TokenRole +from alfred.domain.releases.parser import ( _detect_group, annotate, assemble, diff --git a/tests/domain/release/test_parser_v2_scaffolding.py b/tests/domain/release/test_parser_v2_scaffolding.py index 995c242..b1ee54f 100644 --- a/tests/domain/release/test_parser_v2_scaffolding.py +++ b/tests/domain/release/test_parser_v2_scaffolding.py @@ -8,8 +8,8 @@ is implemented and the fixtures-based suite switches over. from __future__ import annotations -from alfred.domain.release.parser import Token, TokenRole -from alfred.domain.release.parser.pipeline import strip_site_tag, tokenize +from alfred.domain.releases.parser import Token, TokenRole +from alfred.domain.releases.parser import strip_site_tag, tokenize from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge _KB = YamlReleaseKnowledge() diff --git a/tests/domain/release/test_parser_v2_scoring.py b/tests/domain/release/test_parser_v2_scoring.py index 9dca58b..4bd51a7 100644 --- a/tests/domain/release/test_parser_v2_scoring.py +++ b/tests/domain/release/test_parser_v2_scoring.py @@ -10,15 +10,15 @@ from __future__ import annotations import pytest -from alfred.domain.release.parser.scoring import ( +from alfred.domain.releases.parser import ( Road, collect_missing_critical, collect_unknown_tokens, compute_score, decide_road, ) -from alfred.domain.release.parser.tokens import Token, TokenRole -from alfred.domain.release.services import parse_release +from alfred.domain.releases.parser.tokens import Token, TokenRole +from alfred.domain.releases.parser.services import parse_release from alfred.domain.release.value_objects import ( MediaTypeToken, ParsedRelease, diff --git a/tests/domain/test_release.py b/tests/domain/test_release.py index b09e235..2016685 100644 --- a/tests/domain/test_release.py +++ b/tests/domain/test_release.py @@ -18,7 +18,7 @@ from __future__ import annotations import pytest -from alfred.domain.release.services import parse_release +from alfred.domain.releases.parser.services import parse_release from alfred.domain.release.value_objects import ParsedRelease from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge diff --git a/tests/domain/test_release_fixtures.py b/tests/domain/test_release_fixtures.py index 9c410c2..8cccbe7 100644 --- a/tests/domain/test_release_fixtures.py +++ b/tests/domain/test_release_fixtures.py @@ -18,7 +18,7 @@ from dataclasses import asdict import pytest -from alfred.domain.release.services import parse_release +from alfred.domain.releases.parser.services import parse_release from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge from tests.fixtures.releases.conftest import ReleaseFixture, discover_fixtures