CHOP CHOP CHOP

This commit is contained in:
2026-05-26 05:45:07 +02:00
parent b3abad4da4
commit cffafa2e60
47 changed files with 48 additions and 3132 deletions
+4 -29
View File
@@ -53,31 +53,6 @@ class MoveMediaResponse:
}
@dataclass
class SetFolderPathResponse:
"""Response from setting a folder path."""
status: str
folder_name: str | None = None
path: str | None = None
error: str | None = None
message: str | None = None
def to_dict(self):
"""Convert to dict for agent compatibility."""
result = {"status": self.status}
if self.error:
result["error"] = self.error
result["message"] = self.message
else:
if self.folder_name:
result["folder_name"] = self.folder_name
if self.path:
result["path"] = self.path
return result
@dataclass
class PlacedSubtitle:
@@ -186,10 +161,10 @@ class ListFolderResponse:
"""Response from listing a folder."""
status: str
folder_type: str | None = None
path: str | None = None
entries: list[str] | None = None
count: int | None = None
folder_type: str | None = None # SHOULD BE A PROPERTY
path: str | None = None # NOT NONE - Should be path
entries: list[str] | None = None # NOT NONE - Empty list of path
count: int | None = None # USELESS
error: str | None = None
message: str | None = None
@@ -24,7 +24,7 @@ from pathlib import Path
from alfred.application.release import inspect_release
from alfred.domain.release import parse_release
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.releases.ports import ReleaseKnowledge
from alfred.domain.release.value_objects import ParsedRelease
from alfred.domain.shared.ports import MediaProber
from alfred.infrastructure.persistence import get_memory
@@ -1,50 +0,0 @@
"""Set folder path use case."""
import logging
from alfred.infrastructure.filesystem import FileManager
from .dto import SetFolderPathResponse
logger = logging.getLogger(__name__)
class SetFolderPathUseCase:
"""
Use case for setting a folder path in configuration.
This orchestrates the FileManager to set folder paths.
"""
def __init__(self, file_manager: FileManager):
"""
Initialize use case.
Args:
file_manager: FileManager instance
"""
self.file_manager = file_manager
def execute(self, folder_name: str, path_value: str) -> SetFolderPathResponse:
"""
Set a folder path in configuration.
Args:
folder_name: Name of folder to set (download, tvshow, movie, torrent)
path_value: Absolute path to the folder
Returns:
SetFolderPathResponse with success or error information
"""
result = self.file_manager.set_folder_path(folder_name, path_value)
if result.get("status") == "ok":
return SetFolderPathResponse(
status="ok",
folder_name=result.get("folder_name"),
path=result.get("path"),
)
else:
return SetFolderPathResponse(
status="error", error=result.get("error"), message=result.get("message")
)
-6
View File
@@ -12,9 +12,7 @@ class SearchMovieResponse:
title: str | None = None
media_type: str | None = None
tmdb_id: int | None = None
overview: str | None = None
release_date: str | None = None
vote_average: float | None = None
error: str | None = None
message: str | None = None
@@ -34,11 +32,7 @@ class SearchMovieResponse:
result["media_type"] = self.media_type
if self.tmdb_id:
result["tmdb_id"] = self.tmdb_id
if self.overview:
result["overview"] = self.overview
if self.release_date:
result["release_date"] = self.release_date
if self.vote_average:
result["vote_average"] = self.vote_average
return result
-122
View File
@@ -1,122 +0,0 @@
"""``rescan_movie`` — rebuild a MovieRelease from disk and persist it.
The orchestrator locates the main video inside a movie folder, runs
``inspect_release`` on it (same single-source-of-truth as the TV
rescan flow), and assembles the result into a frozen
:class:`MovieRelease` written to the per-movie v2 ``.alfred`` sidecar.
Folder convention
-----------------
Movies are **one folder, one main file** in Alfred's library layout:
movies/
Inception (2010)/
Inception.2010.1080p.BluRay.x264-GROUP.mkv
optional.srt
optional.nfo
``find_video_file`` is responsible for picking the main video
(recursive walk, deterministic ordering). Adjacent subtitles / nfos
are ignored by this orchestrator — only embedded subtitle tracks are
captured (same scope as TV rescan).
TMDB
----
``rescan_movie`` does **not** call TMDB. Identity (``tmdb_id``,
optional ``imdb_id``) is supplied by the caller; the library index
auto-heals from the new sidecar on its next read. A subsequent TMDB
sync (Phase 5) layers identity facts (``name``, ``release_year``) on
top of the on-disk truth.
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path
from alfred.application.release.inspect import inspect_release
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.releases.entities import MovieRelease, TrackProfile
from alfred.domain.shared.ports import MediaProber
from alfred.domain.shared.value_objects import FilePath, ImdbId, TmdbId
from alfred.infrastructure.filesystem.find_video import find_video_file
from alfred.infrastructure.persistence.dot_alfred.v2.repository import (
DotAlfredMovieReleaseRepository,
)
_LOG = logging.getLogger(__name__)
class MovieRescanFailed(RuntimeError):
"""Raised when ``rescan_movie`` cannot produce a release.
The orchestrator surfaces a single explicit failure mode — no main
video found inside ``movie_dir``. All other adapter-level errors
(probe failure, parser low-confidence) degrade gracefully into a
sidecar with empty / partial fields, since the file is on disk
regardless.
"""
def rescan_movie(
movie_dir: Path,
*,
tmdb_id: TmdbId,
imdb_id: ImdbId | None = None,
movie_repo: DotAlfredMovieReleaseRepository,
prober: MediaProber,
kb: ReleaseKnowledge,
) -> MovieRelease:
"""Rebuild and persist the :class:`MovieRelease` for ``movie_dir``.
``movie_dir.name`` is used as both the sidecar location (relative
to the movie library root) and the ``folder`` field on the
aggregate.
Args:
movie_dir: absolute path to the movie folder under the movie
library root.
tmdb_id: TMDB primary key (required, no coercion).
imdb_id: optional secondary anchor.
movie_repo: v2 per-movie ``.alfred`` repository.
prober: ffprobe adapter (or stub).
kb: release knowledge base (video extensions, codecs, …).
Returns:
The rebuilt :class:`MovieRelease` (also written to disk).
Raises:
MovieRescanFailed: if no video file can be located inside
``movie_dir``. ``added_at`` is fresh
(``datetime.now(UTC)``) on every rescan — the v2 sidecar
records when the release was last reconciled with disk,
not when the file appeared on the filesystem.
"""
main_video = find_video_file(movie_dir, kb)
if main_video is None:
raise MovieRescanFailed(
f"no video file found in {movie_dir}"
)
result = inspect_release(main_video.name, main_video, kb, prober)
media_info = result.media_info
audio_tracks = media_info.audio_tracks if media_info else ()
subtitle_tracks = media_info.subtitle_tracks if media_info else ()
rel_path = main_video.relative_to(movie_dir)
release = MovieRelease(
tmdb_id=tmdb_id,
imdb_id=imdb_id,
folder=movie_dir.name,
file_path=FilePath(str(rel_path)),
added_at=datetime.now(UTC),
tracks=TrackProfile(
audio_tracks=audio_tracks,
subtitle_tracks=subtitle_tracks,
),
)
movie_repo.save(release)
return release
+1 -3
View File
@@ -44,7 +44,7 @@ class SearchMovieUseCase:
# Use the TMDB client to search for media
result = self.tmdb_client.search_media(media_title)
# Check if IMDb ID was found
# Check if IMDb ID was found # FUCK THIS
if result.imdb_id:
logger.info(f"IMDb ID found for '{media_title}': {result.imdb_id}")
return SearchMovieResponse(
@@ -53,9 +53,7 @@ class SearchMovieUseCase:
title=result.title,
media_type=result.media_type,
tmdb_id=result.tmdb_id,
overview=result.overview,
release_date=result.release_date,
vote_average=result.vote_average,
)
else:
logger.warning(f"No IMDb ID available for '{media_title}'")
-122
View File
@@ -1,122 +0,0 @@
"""``sync_movie`` — refresh TMDB-cached fields on the movies library index.
Parallel to :func:`alfred.application.tv_shows.sync.sync_show`. See
that module for the TTL / placeholder / force-flag policy — the
movies flow is structurally identical, differing only in the DTO
shape (movies have no seasons) and the placeholder marker (movies
use ``name == ""`` rather than ``status == "unknown"``).
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from datetime import UTC, datetime
from pathlib import Path
from alfred.application.exceptions import MovieNotFoundInLibrary
from alfred.domain.shared.value_objects import TmdbId
from alfred.infrastructure.api.tmdb.client import TMDBClient
from alfred.infrastructure.persistence.dot_alfred.v2.repository import (
DotAlfredMovieLibraryIndex,
DotAlfredMovieReleaseRepository,
)
from alfred.infrastructure.persistence.dot_alfred.v2.sidecar_root import (
MovieIndexEntry,
)
_LOG = logging.getLogger(__name__)
# Placeholder signature written by the movie library index's
# auto-heal path: ``MovieIndexEntry.name == metadata.path`` (the
# heal copies the folder name into ``name`` because it has no TMDB
# title to write). The sidecar schema requires ``name`` to be
# non-empty, so we cannot use ``name == ""`` as the marker — the
# folder-name equality is the next best signature.
# See ``DotAlfredMovieLibraryIndex._build_from_releases``.
def sync_movie(
library_root: Path,
*,
tmdb_id: TmdbId,
index: DotAlfredMovieLibraryIndex,
release_repo: DotAlfredMovieReleaseRepository,
tmdb_client: TMDBClient,
ttl_days: int,
now: Callable[[], datetime] = lambda: datetime.now(UTC),
force: bool = False,
) -> MovieIndexEntry:
"""Refresh TMDB-cached fields for ``tmdb_id`` on the movie index.
Args:
library_root: movies library root (informational).
tmdb_id: movie identifier.
index: library-root index to read and upsert into.
release_repo: per-movie sidecar repository.
tmdb_client: TMDB HTTP client.
ttl_days: max age (days) for an already-synced entry before
it is considered stale. Sourced from
:attr:`Settings.tmdb_cache_ttl_days`.
now: clock injection for deterministic tests.
force: bypass TTL gate; placeholders always refresh.
Returns:
The fresh :class:`MovieIndexEntry`.
Raises:
MovieNotFoundInLibrary: when no on-disk movie carries
``tmdb_id`` (no per-movie sidecar and no index entry).
TMDBAPIError: re-raised from the client.
"""
del library_root # documented for symmetry with sync_show
current_time = now()
existing = index.find_by_tmdb_id(tmdb_id)
if existing is not None and not force and not _needs_refresh(
existing, ttl_days=ttl_days, now=current_time
):
return existing
info = tmdb_client.get_movie_info(tmdb_id.value)
release = release_repo.load_by_tmdb_id(tmdb_id)
if release is None and existing is None:
raise MovieNotFoundInLibrary(
f"no on-disk movie carries tmdb_id={tmdb_id.value}"
)
if release is None:
# Index entry exists but per-movie sidecar is gone or corrupt.
# We cannot upsert because index.upsert(release=...) is the
# only path that knows the imdb_id and folder anchor for
# movies (it's all carried on the release). Warn and skip —
# let the caller rescan to repopulate the per-movie sidecar.
_LOG.warning(
"sync_movie: per-movie sidecar missing for tmdb_id=%s; "
"skipping index upsert (anchor=%s) — rescan to repair",
tmdb_id.value,
existing.metadata.path,
)
return existing
index.upsert(
release,
name=info.title,
release_year=info.release_year,
path=release.folder,
fetched_at=current_time,
)
refreshed = index.find_by_tmdb_id(tmdb_id)
assert refreshed is not None, "upsert did not persist entry"
return refreshed
def _needs_refresh(
entry: MovieIndexEntry, *, ttl_days: int, now: datetime
) -> bool:
"""True if ``entry`` is a placeholder or older than ``ttl_days``."""
if entry.name == entry.metadata.path:
return True
age = now - entry.metadata.fetched_at
return age.days >= ttl_days
@@ -19,7 +19,7 @@ from __future__ import annotations
from pathlib import Path
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.releases.ports import ReleaseKnowledge
from alfred.domain.release.value_objects import ParsedRelease
@@ -4,7 +4,7 @@ from __future__ import annotations
from dataclasses import replace
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.releases.ports import ReleaseKnowledge
from alfred.domain.release.value_objects import ParsedRelease
from alfred.domain.shared.media import MediaInfo
+2 -2
View File
@@ -51,8 +51,8 @@ from pathlib import Path
from alfred.application.release.detect_media_type import detect_media_type
from alfred.application.release.enrich_from_probe import enrich_from_probe
from alfred.application.release.supported_media import find_main_video
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.release.services import parse_release
from alfred.domain.releases.ports import ReleaseKnowledge
from alfred.domain.releases.parser.services import parse_release
from alfred.domain.release.value_objects import (
MediaTypeToken,
ParsedRelease,
@@ -32,7 +32,7 @@ from __future__ import annotations
from pathlib import Path
from alfred.domain.release.ports.knowledge import ReleaseKnowledge
from alfred.domain.releases.ports.knowledge import ReleaseKnowledge
def is_supported_video(path: Path, kb: ReleaseKnowledge) -> bool:
-204
View File
@@ -1,204 +0,0 @@
"""``rescan_show`` — rebuild a SeriesRelease from disk and persist it.
The orchestrator walks the show folder, runs the existing release
pipeline (``inspect_release``) on every video file, and assembles the
result into a frozen :class:`SeriesRelease` written to the per-show
v2 ``.alfred`` sidecar.
Why reuse ``inspect_release``?
-------------------------------
The "fresh download" flow already parses release names, picks a main
video, runs ffprobe and refines media type. We want exactly the same
intelligence applied to library content — running it again here keeps
a single source of truth for parsing / probing rules. The orchestrator
just translates per-file :class:`InspectedResult` into release
aggregate construction.
PACK vs EPISODIC
----------------
Classification is done by the walker, by inspecting the season
folder's filesystem structure (flat videos → PACK, subfolders →
EPISODIC). See :mod:`alfred.application.tv_shows.walker`. The
orchestrator trusts ``season_folder.mode`` and never re-derives.
Files whose parser yields ``season is None`` or ``episode is None``
are logged and skipped — a real PACK or EPISODIC file always carries
both. Mixed-season folders (two different ``Sxx`` numbers in the
same directory) are skipped with a warning.
TMDB
----
``rescan_show`` does **not** call TMDB. It writes the release
sidecar; the library index is updated transparently by its auto-heal
path on the next read. A subsequent TMDB sync (Phase 5) layers
identity / season cache facts on top of the on-disk truth.
Out of scope (tracked as tech debt):
* Adjacent ``.srt`` files — only embedded subtitle tracks are
captured.
* Multi-episode files — ``ParsedRelease`` has no ``episode_end``
field yet.
"""
from __future__ import annotations
import logging
from pathlib import Path
from alfred.application.release.inspect import inspect_release
from alfred.application.tv_shows.walker import SeasonFolder, walk_show
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.releases.entities import (
EpisodeRelease,
SeasonRelease,
SeriesRelease,
TrackProfile,
)
from alfred.domain.releases.value_objects import EpisodeRange, ReleaseMode
from alfred.domain.shared.media import MediaInfo
from alfred.domain.shared.ports import FilesystemScanner, MediaProber
from alfred.domain.shared.value_objects import FilePath, ImdbId, TmdbId
from alfred.domain.tv_shows.value_objects import EpisodeNumber, SeasonNumber
from alfred.infrastructure.persistence.dot_alfred.v2.repository import (
DotAlfredSeriesReleaseRepository,
)
_LOG = logging.getLogger(__name__)
def rescan_show(
show_root: Path,
*,
tmdb_id: TmdbId,
imdb_id: ImdbId | None = None,
series_repo: DotAlfredSeriesReleaseRepository,
scanner: FilesystemScanner,
prober: MediaProber,
kb: ReleaseKnowledge,
) -> SeriesRelease:
"""Rebuild and persist the :class:`SeriesRelease` for ``show_root``.
The show's folder name (``show_root.name``) is used as the sidecar
location relative to the library root. TMDB identity comes from the
caller — the orchestrator does not call TMDB.
Returns the rebuilt frozen aggregate (also written to disk by
``series_repo.save``).
"""
tree = walk_show(show_root, scanner=scanner, kb=kb)
seasons: list[SeasonRelease] = []
for season_folder in tree.season_folders:
season = _ingest_season(season_folder, show_root, kb, prober)
if season is not None:
seasons.append(season)
release = SeriesRelease(
tmdb_id=tmdb_id,
imdb_id=imdb_id,
seasons=tuple(seasons),
)
series_repo.save(release, show_folder=show_root.name)
return release
# --------------------------------------------------------------------------- #
# Per-season ingestion #
# --------------------------------------------------------------------------- #
def _ingest_season(
season_folder: SeasonFolder,
show_root: Path,
kb: ReleaseKnowledge,
prober: MediaProber,
) -> SeasonRelease | None:
if season_folder.mode is None:
# Walker already logged the reason (empty / malformed mix /
# multi-video subfolder). Just skip.
return None
if not season_folder.video_files:
_LOG.warning(
"rescan_show: season folder %s contains no video file — skipping",
season_folder.season_dir,
)
return None
# Inspect every video to extract season + episode numbers.
inspected = []
for video_path in season_folder.video_files:
result = inspect_release(video_path.name, video_path, kb, prober)
inspected.append((video_path, result))
season_numbers = {
r.parsed.season for _, r in inspected if r.parsed.season is not None
}
if not season_numbers:
_LOG.warning(
"rescan_show: no season number parsed in %s — skipping",
season_folder.season_dir,
)
return None
if len(season_numbers) > 1:
_LOG.warning(
"rescan_show: mixed season numbers %s in %s — skipping",
sorted(season_numbers),
season_folder.season_dir,
)
return None
season_number = SeasonNumber(season_numbers.pop())
folder_name = season_folder.season_dir.name
episodes: list[EpisodeRelease] = []
for video_path, result in inspected:
if result.parsed.episode is None:
_LOG.warning(
"rescan_show: no episode number parsed for %s — skipping",
video_path,
)
continue
episodes.append(
_make_episode_release(
episode_number=EpisodeNumber(result.parsed.episode),
video_path=video_path,
show_root=show_root,
media_info=result.media_info,
)
)
if not episodes:
_LOG.warning(
"rescan_show: no parseable episodes in %s — skipping",
season_folder.season_dir,
)
return None
return SeasonRelease(
season_number=season_number,
folder=folder_name,
mode=season_folder.mode,
episodes=tuple(episodes),
)
def _make_episode_release(
*,
episode_number: EpisodeNumber,
video_path: Path,
show_root: Path,
media_info: MediaInfo | None,
) -> EpisodeRelease:
rel_path = video_path.relative_to(show_root)
audio_tracks = media_info.audio_tracks if media_info else ()
subtitle_tracks = media_info.subtitle_tracks if media_info else ()
return EpisodeRelease(
episodes=EpisodeRange(start=episode_number, end=episode_number),
file_path=FilePath(str(rel_path)),
tracks=TrackProfile(
audio_tracks=audio_tracks,
subtitle_tracks=subtitle_tracks,
),
)
-146
View File
@@ -1,146 +0,0 @@
"""``sync_show`` — refresh TMDB-cached fields on the TV library index.
The orchestrator hits TMDB for one show, combines the response with
the on-disk release (if any), and upserts the result into the
library-root index. It is the only place that calls
:meth:`TMDBClient.get_tv_show_info`; the rescan flow stays
TMDB-free.
TTL & placeholder policy
------------------------
``ttl_days`` (passed by the caller, sourced from
:attr:`Settings.tmdb_cache_ttl_days`) gates refreshes for entries
that already carry real TMDB facts. Placeholder entries — those
produced by the library index's auto-heal path, recognizable by
``status == "unknown"`` — always refresh regardless of TTL, because
auto-heal leaves the cache empty on purpose. ``force=True`` overrides
both gates.
Missing on-disk release
-----------------------
If the library index has an entry for ``tmdb_id`` but the per-show
sidecar is absent or corrupt, the sync still proceeds: ``release`` is
passed as ``None`` to :meth:`DotAlfredTVShowLibraryIndex.upsert`,
which produces an entry with TMDB facts but an empty episode-slot
map. Callers can then run :func:`rescan_show` to repopulate the slot
map. This is the "library knows the show, files temporarily missing"
state — a stale index pointing at a deleted folder.
If both index and release sidecar are missing, the show genuinely
isn't in the library and we raise
:class:`ShowNotFoundInLibrary` — sync cannot invent a folder anchor.
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from datetime import UTC, datetime
from pathlib import Path
from alfred.application.exceptions import ShowNotFoundInLibrary
from alfred.domain.shared.value_objects import TmdbId
from alfred.infrastructure.api.tmdb.client import TMDBClient
from alfred.infrastructure.persistence.dot_alfred.v2.repository import (
DotAlfredSeriesReleaseRepository,
DotAlfredTVShowLibraryIndex,
)
from alfred.infrastructure.persistence.dot_alfred.v2.sidecar_root import (
ShowIndexEntry,
)
_LOG = logging.getLogger(__name__)
# Placeholder marker written by the library index's auto-heal path.
# See ``DotAlfredTVShowLibraryIndex._build_from_releases``.
_PLACEHOLDER_STATUS = "unknown"
def sync_show(
library_root: Path,
*,
tmdb_id: TmdbId,
index: DotAlfredTVShowLibraryIndex,
release_repo: DotAlfredSeriesReleaseRepository,
tmdb_client: TMDBClient,
ttl_days: int,
now: Callable[[], datetime] = lambda: datetime.now(UTC),
force: bool = False,
) -> ShowIndexEntry:
"""Refresh TMDB-cached fields for ``tmdb_id`` on the TV index.
Args:
library_root: TV shows library root (informational — the
index and repos already carry their own root).
tmdb_id: show identifier.
index: library-root index to read and upsert into.
release_repo: per-show sidecar repository, used to resolve
the on-disk release + folder anchor.
tmdb_client: TMDB HTTP client.
ttl_days: max age (days) for an already-synced entry before
it is considered stale. Sourced from
:attr:`Settings.tmdb_cache_ttl_days` by the caller.
now: clock injection for deterministic tests.
force: bypass TTL gate; placeholders always refresh
regardless of this flag.
Returns:
The fresh :class:`ShowIndexEntry` (the one already in the
index when fresh, the newly-upserted one otherwise).
Raises:
ShowNotFoundInLibrary: when neither index nor release repo
knows the show.
TMDBAPIError: re-raised from the client.
"""
del library_root # not needed for the algorithm; documented for symmetry with rescan_show
current_time = now()
existing = index.find_by_tmdb_id(tmdb_id)
if existing is not None and not force and not _needs_refresh(
existing, ttl_days=ttl_days, now=current_time
):
return existing
info = tmdb_client.get_tv_show_info(tmdb_id.value)
loaded = release_repo.load_by_tmdb_id(tmdb_id)
if loaded is None and existing is None:
raise ShowNotFoundInLibrary(
f"no on-disk TV show carries tmdb_id={tmdb_id.value}"
)
if loaded is not None:
release, folder = loaded
else:
# Index entry exists but per-show sidecar is gone or corrupt.
# Use the anchor recorded in the index; the slot map will be
# empty until a rescan repopulates it.
release = None
folder = existing.metadata.path
_LOG.warning(
"sync_show: per-show sidecar missing for tmdb_id=%s; "
"upserting index entry with empty episode slots (anchor=%s)",
tmdb_id.value,
folder,
)
index.upsert(info, release, path=folder, fetched_at=current_time)
refreshed = index.find_by_tmdb_id(tmdb_id)
# ``upsert`` writes synchronously; the follow-up read returns the
# entry we just persisted. Defensive ``assert``: if it isn't there,
# the index implementation has regressed.
assert refreshed is not None, "upsert did not persist entry"
return refreshed
def _needs_refresh(
entry: ShowIndexEntry, *, ttl_days: int, now: datetime
) -> bool:
"""True if ``entry`` is a placeholder or older than ``ttl_days``."""
if entry.status == _PLACEHOLDER_STATUS:
return True
age = now - entry.metadata.fetched_at
return age.days >= ttl_days
+1 -1
View File
@@ -50,7 +50,7 @@ import re
from dataclasses import dataclass
from pathlib import Path
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.releases.ports import ReleaseKnowledge
from alfred.domain.releases.value_objects import ReleaseMode
from alfred.domain.shared.ports import FilesystemScanner
+2
View File
@@ -55,6 +55,7 @@ class Movie:
def __hash__(self) -> int:
return hash(self.tmdb_id)
# WRONG
def get_folder_name(self) -> str:
"""
Get the folder name for this movie.
@@ -66,6 +67,7 @@ class Movie:
return f"{self.title.value} ({self.release_year.value})"
return self.title.value
# WRONG
def get_filename(self) -> str:
"""
Get the suggested base filename (without extension) for this movie.
+2 -14
View File
@@ -4,7 +4,6 @@ from dataclasses import dataclass
from enum import Enum
from ..shared.exceptions import ValidationError
from ..shared.value_objects import to_dot_folder_name
class Quality(Enum):
@@ -56,18 +55,11 @@ class MovieTitle:
f"Movie title must be a string, got {type(self.value)}"
)
if len(self.value) > 500:
if len(self.value) > 100:
raise ValidationError(
f"Movie title too long: {len(self.value)} characters (max 500)"
f"Movie title too long: {len(self.value)} characters (max 100)"
)
def normalized(self) -> str:
"""
Return normalized title for file system usage.
Removes special characters and replaces spaces with dots.
"""
return to_dot_folder_name(self.value)
def __str__(self) -> str:
return self.value
@@ -93,10 +85,6 @@ class ReleaseYear:
f"Release year must be an integer, got {type(self.value)}"
)
# Movies started around 1888, and we shouldn't have movies from the future
if self.value < 1888 or self.value > 2100:
raise ValidationError(f"Invalid release year: {self.value}")
def __str__(self) -> str:
return str(self.value)
-6
View File
@@ -1,6 +0,0 @@
"""Release domain — release name parsing and naming conventions."""
from .services import parse_release
from .value_objects import ParsedRelease, ParseReport
__all__ = ["ParsedRelease", "ParseReport", "parse_release"]
+9
View File
@@ -42,6 +42,15 @@ from .entities import (
)
from .value_objects import ReleaseMode
# ════════════════════════════════════════════════════════════════════════════
# MovieReleaseBuilder
# ════════════════════════════════════════════════════════════════════════════
# ...
# ════════════════════════════════════════════════════════════════════════════
# SeasonReleaseBuilder
# ════════════════════════════════════════════════════════════════════════════
@@ -17,10 +17,6 @@ The pipeline has three internal paths driven by the detected release group:
knowledge sets, with a 0-100 confidence score.
- **PATH OF PAIN**: score below threshold OR critical chunks missing
signaled to the caller, who decides whether to involve the LLM/user.
Today the package exposes scaffolding only (token VOs and a thin pipeline
stub). The legacy ``parse_release`` in ``release.services`` keeps serving
production until each piece of the v2 pipeline is wired in.
"""
from __future__ import annotations
@@ -29,7 +29,7 @@ arrives through ``kb: ReleaseKnowledge``.
from __future__ import annotations
from ..ports.knowledge import ReleaseKnowledge
from ..value_objects import MediaTypeToken
from alfred.domain.release.value_objects import MediaTypeToken
from .schema import GroupSchema
from .tokens import Token, TokenRole
@@ -27,7 +27,7 @@ from __future__ import annotations
from enum import Enum
from ..ports.knowledge import ReleaseKnowledge
from ..value_objects import ParsedRelease
from alfred.domain.release.value_objects import ParsedRelease
from .tokens import Token, TokenRole
@@ -18,10 +18,9 @@ score, the road, and diagnostic info for downstream callers.
from __future__ import annotations
from .parser import pipeline as _v2
from .parser import scoring as _scoring
from .ports import ReleaseKnowledge
from .value_objects import MediaTypeToken, ParsedRelease, ParseReport, TokenizationRoute
from alfred.domain.releases.parser import scoring as _scoring, pipeline as _v2
from alfred.domain.releases.ports import ReleaseKnowledge
from alfred.domain.release.value_objects import MediaTypeToken, ParsedRelease, ParseReport, TokenizationRoute
def parse_release(
@@ -18,7 +18,7 @@ from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from ..shared.exceptions import ValidationError
from alfred.domain.shared.exceptions import ValidationError
class MediaTypeToken(str, Enum):
@@ -128,7 +128,6 @@ class ParsedRelease:
"""
raw: str # original release name (untouched)
clean: str # raw minus site_tag and apostrophes — used by season_folder_name()
title: str # show/movie title (dots, no year/season/tech)
title_sanitized: str # title with filesystem-forbidden chars stripped
year: int | None # movie year or show start year (from TMDB)
@@ -157,18 +156,6 @@ class ParsedRelease:
raise ValidationError("ParsedRelease.raw cannot be empty")
if not self.group:
raise ValidationError("ParsedRelease.group cannot be empty")
if self.year is not None and not (1888 <= self.year <= 2100):
raise ValidationError(
f"ParsedRelease.year out of range: {self.year}"
)
if self.season is not None and not (0 <= self.season <= 100):
raise ValidationError(
f"ParsedRelease.season out of range: {self.season}"
)
if self.episode is not None and not (0 <= self.episode <= 9999):
raise ValidationError(
f"ParsedRelease.episode out of range: {self.episode}"
)
if self.episode_end is not None:
if not (0 <= self.episode_end <= 9999):
raise ValidationError(
@@ -194,78 +181,3 @@ class ParsedRelease:
def is_season_pack(self) -> bool:
return self.season is not None and self.episode is None
@property
def tech_string(self) -> str:
"""``quality.source.codec`` joined by dots, skipping ``None`` parts.
Derived on every access so it stays in sync with the underlying
fields no manual refresh needed after enrichment.
"""
return ".".join(p for p in (self.quality, self.source, self.codec) if p)
def show_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str:
"""
Build the series root folder name.
Format: {Title}.{Year}.{Tech}-{Group}
Example: Oz.1997.1080p.WEBRip.x265-KONTRAST
``tmdb_title_safe`` must already be filesystem-safe (the caller is
expected to have run it through ``kb.sanitize_for_fs``).
"""
title_part = tmdb_title_safe.replace(" ", ".")
tech = self.tech_string or "Unknown"
return f"{title_part}.{tmdb_year}.{tech}-{self.group}"
def season_folder_name(self) -> str:
"""
Build the season subfolder name = normalized release name (no episode).
Example: Oz.S03.1080p.WEBRip.x265-KONTRAST
For a single-episode release we still strip the episode token so the
folder can hold the whole season.
"""
return _strip_episode_from_normalized(self.clean)
def episode_filename(self, tmdb_episode_title_safe: str | None, ext: str) -> str:
"""
Build the episode filename.
Format: {Title}.{SxxExx}.{EpisodeTitle}.{Tech}-{Group}.{ext}
Example: Oz.S01E01.The.Routine.1080p.WEBRip.x265-KONTRAST.mkv
``tmdb_episode_title_safe`` must already be filesystem-safe; pass
``None`` to omit the episode title segment.
"""
title_part = self.title_sanitized
s = f"S{self.season:02d}" if self.season is not None else ""
e = f"E{self.episode:02d}" if self.episode is not None else ""
se = s + e
ep_title = ""
if tmdb_episode_title_safe:
ep_title = "." + tmdb_episode_title_safe.replace(" ", ".")
tech = self.tech_string or "Unknown"
ext_clean = ext.lstrip(".")
return f"{title_part}.{se}{ep_title}.{tech}-{self.group}.{ext_clean}"
def movie_folder_name(self, tmdb_title_safe: str, tmdb_year: int) -> str:
"""
Build the movie folder name.
Format: {Title}.{Year}.{Tech}-{Group}
Example: Inception.2010.1080p.BluRay.x265-GROUP
"""
return self.show_folder_name(tmdb_title_safe, tmdb_year)
def movie_filename(
self, tmdb_title_safe: str, tmdb_year: int, ext: str
) -> str:
"""
Build the movie filename (same as folder name + extension).
Example: Inception.2010.1080p.BluRay.x265-GROUP.mkv
"""
ext_clean = ext.lstrip(".")
return f"{self.movie_folder_name(tmdb_title_safe, tmdb_year)}.{ext_clean}"
+2 -2
View File
@@ -77,7 +77,7 @@ class TmdbId:
def __int__(self) -> int:
return self.value
# GOOD
@dataclass(frozen=True)
class FilePath:
"""
@@ -265,7 +265,7 @@ class Language:
# literal dots, and hyphens. Everything else is stripped.
_FS_SAFE_CHARS = re.compile(r"[^\w\s\.\-]")
# USELESS - TO REMOVE
def to_dot_folder_name(title: str) -> str:
"""Sanitize ``title`` for filesystem use and convert spaces to dots.
+2 -3
View File
@@ -71,7 +71,6 @@ class Episode:
season_number: SeasonNumber
episode_number: EpisodeNumber
title: str
def __post_init__(self) -> None:
if not isinstance(self.season_number, SeasonNumber):
@@ -97,7 +96,7 @@ class Episode:
return hash((self.season_number, self.episode_number))
# ── Naming ─────────────────────────────────────────────────────────────
# WRONG - NO TITLE REQUIRED
def get_filename(self) -> str:
"""Suggested filename: ``S01E05.Pilot``."""
season_str = f"S{self.season_number.value:02d}"
@@ -261,7 +260,7 @@ class TVShow:
)
# ── Naming ─────────────────────────────────────────────────────────────
# WRONG
def get_folder_name(self) -> str:
"""Dot-separated folder name (e.g. ``Breaking.Bad``)."""
return to_dot_folder_name(self.title)
@@ -4,7 +4,7 @@ from __future__ import annotations
from pathlib import Path
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.releases.ports import ReleaseKnowledge
def find_video_file(path: Path, kb: ReleaseKnowledge) -> Path | None:
@@ -14,8 +14,8 @@ filesystem-level concerns.
from __future__ import annotations
from alfred.domain.release.parser.schema import GroupSchema, SchemaChunk
from alfred.domain.release.parser.tokens import TokenRole
from alfred.domain.releases.parser import GroupSchema, SchemaChunk
from alfred.domain.releases.parser.tokens import TokenRole
from .release import (
load_audio,
-434
View File
@@ -1,434 +0,0 @@
"""CLI de debug pour analyser une release et dry-run le déplacement."""
import json
import sys
from pathlib import Path
# Permet de lancer le script depuis n'importe où sans install du package
sys.path.insert(0, str(Path(__file__).parent.parent))
def _init_memory():
from alfred.infrastructure.persistence import init_memory
from alfred.settings import settings
init_memory(settings.data_storage_dir)
def _resolve_via_tmdb(release_name: str) -> tuple[str, int] | None:
"""Parse le release name, interroge TMDB, retourne (tmdb_title, tmdb_year)."""
from alfred.application.movies import SearchMovieUseCase
from alfred.domain.release.services import parse_release
from alfred.infrastructure.api.tmdb import tmdb_client
parsed = parse_release(release_name)
raw_title = parsed.title.replace(".", " ").strip()
print(f" titre extrait : {raw_title}")
print(" interrogation TMDB...")
use_case = SearchMovieUseCase(tmdb_client)
result = use_case.execute(raw_title).to_dict()
if result.get("status") != "ok":
print(f" TMDB error: {result.get('message')}")
return None
title = result["title"]
release_date = result.get("release_date", "")
year = int(release_date[:4]) if release_date and len(release_date) >= 4 else None
if not year:
print(f" TMDB: pas d'année trouvée pour '{title}'")
return None
print(f" TMDB: {title} ({year})")
return title, year
def _extract_release_name(release_name: str) -> tuple[str, str]:
"""
Retourne (release_name, source_path).
Si c'est un path absolu existant → extrait le basename et utilise le path comme source.
Sinon → cherche le dossier dans workspace.download configuré en LTM.
"""
p = Path(release_name)
if p.is_absolute() and p.exists():
return p.name, str(p)
from alfred.infrastructure.persistence import get_memory
memory = get_memory()
download_root = memory.ltm.workspace.download
if download_root:
candidate = Path(download_root) / release_name
if candidate.exists():
return release_name, str(candidate)
return release_name, ""
def analyze(release_name: str, source_path: str | None = None) -> None:
from alfred.domain.release.services import parse_release
release_name, resolved_path = _extract_release_name(release_name)
if source_path is None and resolved_path:
source_path = resolved_path
print(f"\n=== PARSE: {release_name} ===")
r = parse_release(release_name)
for k, v in vars(r).items():
if v is not None and v != [] and v != "":
print(f" {k}: {v}")
if source_path:
path = Path(source_path)
print(f"\n=== PROBE: {path} ===")
if not path.exists():
print(" (chemin inexistant, probe skipped)")
else:
from alfred.infrastructure.filesystem.find_video import find_video_file
from alfred.infrastructure.probe import FfprobeMediaProber
video = find_video_file(path) if path.is_dir() else path
if video:
print(f" video file: {video.name}")
info = FfprobeMediaProber().probe(video)
if info:
print(f" codec: {info.video_codec}")
print(f" resolution: {info.resolution}")
print(
f" audio_tracks: {[(t.codec, t.language) for t in info.audio_tracks]}"
)
print(
f" subtitle_tracks: {[(t.codec, t.language) for t in info.subtitle_tracks]}"
)
else:
print(" probe failed (ffprobe dispo ?)")
else:
print(" aucun fichier vidéo trouvé")
def dry_run(release_name: str) -> None:
_init_memory()
release_name, _ = _extract_release_name(release_name)
print(f"\n=== DRY-RUN: {release_name} ===")
tmdb = _resolve_via_tmdb(release_name)
if not tmdb:
sys.exit(1)
tmdb_title, tmdb_year = tmdb
from alfred.application.filesystem.resolve_destination import (
resolve_season_destination,
)
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.probe import FfprobeMediaProber
result = resolve_season_destination(
release_name,
tmdb_title,
tmdb_year,
YamlReleaseKnowledge(),
FfprobeMediaProber(),
)
d = result.to_dict()
print()
print(json.dumps(d, indent=2, ensure_ascii=False))
if d["status"] == "ok":
print("\n=== MOVE PREVIEW ===")
print(" src : <source_folder>")
print(f" dst : {d['season_folder']}")
def _translate_path(path: str) -> str:
"""Translate a host-side path to the qBittorrent container path."""
from alfred.settings import settings
host_prefix = settings.qbittorrent_host_path
container_prefix = settings.qbittorrent_container_path
if host_prefix and container_prefix and path.startswith(host_prefix):
return container_prefix + path[len(host_prefix) :]
return path
def _qbittorrent_update(torrent_name: str, new_location: str | None) -> None:
"""
Find the torrent in qBittorrent by name, update its save_path, and force recheck.
Args:
torrent_name: Exact torrent name (release folder basename)
new_location: New save path on the host (parent of the torrent folder).
None if the torrent was sent to trash — skip location change.
"""
try:
from alfred.infrastructure.api.qbittorrent.client import QBittorrentClient
client = QBittorrentClient()
client.login()
torrent = client.find_by_name(torrent_name)
if torrent is None:
print(f" ⚠ qBittorrent: torrent '{torrent_name}' not found — skipping")
return
print(f" qBittorrent: found '{torrent.name}' (hash={torrent.hash[:8]}…)")
if new_location:
container_location = _translate_path(new_location)
client.set_location(torrent.hash, container_location)
print(f" ✓ qBittorrent: location → {container_location}")
client.recheck(torrent.hash)
print(" ✓ qBittorrent: recheck triggered")
except Exception as e:
# Non-fatal — the files are already in place
print(f" ⚠ qBittorrent update failed (non-fatal): {e}")
def do_move(release_name: str, source_folder: str | None = None) -> None:
_init_memory()
release_name, resolved_path = _extract_release_name(release_name)
if source_folder is None:
source_folder = resolved_path
if not source_folder:
print(
" Erreur: source introuvable. Configure workspace.download ou passe le path complet."
)
sys.exit(1)
print(f"\n=== MOVE: {release_name} ===")
tmdb = _resolve_via_tmdb(release_name)
if not tmdb:
sys.exit(1)
tmdb_title, tmdb_year = tmdb
from alfred.application.filesystem.resolve_destination import (
resolve_season_destination,
)
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.probe import FfprobeMediaProber
result = resolve_season_destination(
release_name,
tmdb_title,
tmdb_year,
YamlReleaseKnowledge(),
FfprobeMediaProber(),
)
d = result.to_dict()
if d["status"] == "needs_clarification":
print(f"\n {d['question']}")
for i, opt in enumerate(d["options"]):
print(f" {i + 1}. {opt}")
choice = input(" Choix (numéro) : ").strip()
try:
chosen = d["options"][int(choice) - 1]
except (ValueError, IndexError):
print(" Choix invalide.")
sys.exit(1)
result = resolve_season_destination(
release_name, tmdb_title, tmdb_year, confirmed_folder=chosen
)
d = result.to_dict()
if d["status"] != "ok":
print(json.dumps(d, indent=2, ensure_ascii=False))
sys.exit(1)
src_path = Path(source_folder)
season_folder = d["season_folder"]
mkv_files = sorted(src_path.glob("*.mkv")) or sorted(src_path.glob("*.mp4"))
from alfred.infrastructure.persistence import get_memory
memory = get_memory()
torrent_root = memory.ltm.workspace.torrent
trash_root = memory.ltm.workspace.trash
torrent_dst = str(Path(torrent_root) / src_path.name) if torrent_root else None
trash_dst = str(Path(trash_root) / src_path.name) if trash_root else None
rebuild = input(" Recréer le torrent ? [y/N] : ").strip().lower() == "y"
# --- PHASE 1: PLAN ---
print("\n=== PLAN ===")
print(f" destination : {season_folder}")
from alfred.application.filesystem.manage_subtitles import ManageSubtitlesUseCase
from alfred.domain.release.services import parse_release
parsed = parse_release(release_name)
# Dict: video_path → sub_result (pre-scanned, files not yet moved)
plan: list[tuple[Path, str, object]] = [] # (src_file, dst_path, sub_result)
has_errors = False
for f in mkv_files:
dst = str(Path(season_folder) / f.name)
ghost_src = str(src_path / f.name)
sub_result = ManageSubtitlesUseCase().execute(
source_video=ghost_src,
destination_video=dst,
media_type="tv_show",
release_group=parsed.group,
season=parsed.season,
dry_run=True,
)
print(f"\n {f.name}")
print(f"{dst}")
if sub_result.status == "ok":
if sub_result.placed:
for p in sub_result.placed:
print(f" sub: {p.filename}")
elif sub_result.available:
for a in sub_result.available:
print(f" sub (embedded): {a.language} {a.subtitle_type}")
else:
print(" subs: aucun")
elif sub_result.status == "needs_clarification":
print(" ✗ subs non résolus:")
for u in sub_result.unresolved:
print(f" {u.raw_tokens} ({u.reason})")
has_errors = True
elif sub_result.status == "error":
print(f" ✗ erreur subs: {sub_result.message}")
has_errors = True
plan.append((f, dst, sub_result))
if rebuild and torrent_dst:
print(f"\n source → torrents : {torrent_dst}")
print(f" hard-links : {len(mkv_files)} fichier(s)")
elif trash_dst:
print(f"\n source → trash : {trash_dst}")
else:
print("\n source : laissée en place")
if has_errors:
print("\n ✗ Plan invalide — subs non résolus, abandon.")
sys.exit(1)
# --- CONFIRMATION ---
print()
confirm = input(" Confirmer ? [y/N] : ").strip().lower()
if confirm != "y":
print(" Annulé.")
sys.exit(0)
# --- PHASE 2: EXECUTE ---
import os
from alfred.infrastructure.filesystem.filesystem_operations import (
create_folder,
move,
)
print("\n=== EXECUTE ===")
# 1. Créer le season_folder
r = create_folder(season_folder)
if r["status"] != "ok":
print(f" ✗ create_folder: {r}")
sys.exit(1)
print(f" ✓ dossier : {season_folder}")
# 2. Déplacer chaque fichier vidéo + placer les subs (re-run après move)
for f, dst, _pre_scan in plan:
r = move(str(f), dst)
if r["status"] != "ok":
print(f"{f.name}: {r['message']}")
sys.exit(1)
print(f"{f.name}")
# Re-run manage_subtitles maintenant que dst existe (pour le hard-link)
ghost_src = str(src_path / f.name)
sub_result = ManageSubtitlesUseCase().execute(
source_video=ghost_src,
destination_video=dst,
media_type="tv_show",
release_group=parsed.group,
season=parsed.season,
)
if sub_result.status == "ok" and sub_result.placed:
for p in sub_result.placed:
print(f" ✓ sub: {p.filename}")
# 3. Dossier source → torrents ou trash
if rebuild and torrent_dst:
r = move(source_folder, torrent_dst)
if r["status"] != "ok":
print(f" ✗ source → torrents: {r['message']}")
sys.exit(1)
print(" ✓ source → torrents")
# 4. Hard-link depuis season_folder → torrent_dst
torrent_dst_path = Path(torrent_dst)
for f, dst, _ in plan:
lib_file = Path(season_folder) / f.name
link_dst = torrent_dst_path / f.name
try:
os.link(lib_file, link_dst)
print(f" ✓ hard-link: {f.name}")
except OSError as e:
print(f" ✗ hard-link {f.name}: {e}")
sys.exit(1)
elif trash_dst:
r = move(source_folder, trash_dst)
if r["status"] != "ok":
print(f" ✗ source → trash: {r['message']}")
sys.exit(1)
print(" ✓ source → trash")
# 5. qBittorrent: update location + recheck
qb_location = torrent_root if (rebuild and torrent_dst) else None
_qbittorrent_update(src_path.name, qb_location)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Debug release parsing + dry-run/move")
sub = parser.add_subparsers(dest="cmd")
p_analyze = sub.add_parser(
"analyze", help="Parser une release (+ probe si path fourni)"
)
p_analyze.add_argument("release_name")
p_analyze.add_argument("--path", help="Chemin vers le dossier/fichier source")
p_dry = sub.add_parser(
"dryrun", help="Résout via TMDB et affiche les chemins sans rien bouger"
)
p_dry.add_argument("release_name")
p_move = sub.add_parser(
"move", help="Résout via TMDB et déplace le dossier (confirmation requise)"
)
p_move.add_argument("release_name")
p_move.add_argument(
"source_folder",
nargs="?",
default=None,
help="Chemin absolu du dossier source (optionnel si workspace.download est configuré)",
)
args = parser.parse_args()
if args.cmd == "analyze":
analyze(args.release_name, args.path)
elif args.cmd == "dryrun":
dry_run(args.release_name)
elif args.cmd == "move":
do_move(args.release_name, args.source_folder)
else:
parser.print_help()
sys.exit(1)
-270
View File
@@ -1,270 +0,0 @@
#!/usr/bin/env python3
"""
parse_release.py — Test ParsedRelease interactively or via CLI args.
Usage:
uv run testing/parse_release.py "Oz.S03.1080p.WEBRip.x265-KONTRAST"
uv run testing/parse_release.py "Oz.S03.1080p.WEBRip.x265-KONTRAST" --tmdb
uv run testing/parse_release.py "Inception.2010.1080p.BluRay.x265-GROUP" --tmdb-title "Inception" --tmdb-year 2010
uv run testing/parse_release.py --interactive
"""
import argparse
import sys
from pathlib import Path
_PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colours
# ---------------------------------------------------------------------------
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
BLUE = "\033[34m"
USE_COLOR = True
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return str(text)
return "".join(codes) + str(text) + RESET
def kv(key: str, val: str, color: str = CYAN) -> None:
print(f" {c(key + ':', BOLD)} {c(val, color)}")
def hr() -> None:
print(c("" * 64, DIM))
# ---------------------------------------------------------------------------
# TMDB lookup
# ---------------------------------------------------------------------------
def _fetch_tmdb(title: str) -> tuple[str | None, int | None]:
"""
Call TMDBClient.search_media() and return (canonical_title, year).
Returns (None, None) on failure.
"""
try:
from alfred.infrastructure.api.tmdb import TMDBClient
client = TMDBClient()
result = client.search_media(title)
year: int | None = None
if result.release_date:
try:
year = int(result.release_date[:4])
except (ValueError, IndexError):
pass
print(
c(
f" TMDB → {result.title} ({year}) [{result.media_type}] imdb={result.imdb_id}",
DIM,
)
)
return result.title, year
except Exception as e:
print(c(f" TMDB lookup failed: {e}", YELLOW))
return None, None
# ---------------------------------------------------------------------------
# Display
# ---------------------------------------------------------------------------
def _show(
release_name: str,
tmdb_title: str | None,
tmdb_year: int | None,
tmdb_episode_title: str | None,
ext: str,
) -> None:
from alfred.domain.release import parse_release
p = parse_release(release_name)
# Auto-fetch TMDB if requested and not already provided
if not (tmdb_title and tmdb_year):
fetched_title, fetched_year = _fetch_tmdb(p.title.replace(".", " "))
tmdb_title = tmdb_title or fetched_title
tmdb_year = tmdb_year or fetched_year
print()
print(c("" * 64, BOLD))
print(c(f" ParsedRelease — {p.raw}", BOLD, CYAN))
print(c("" * 64, BOLD))
# Core fields
hr()
kv("raw", p.raw)
kv("normalised", p.normalised)
kv("title", p.title)
kv("year", str(p.year) if p.year else c("None", DIM))
kv("season", str(p.season) if p.season is not None else c("None", DIM))
kv("episode", str(p.episode) if p.episode is not None else c("None", DIM))
kv(
"episode_end",
str(p.episode_end) if p.episode_end is not None else c("None", DIM),
)
kv("quality", p.quality or c("None", DIM))
kv("source", p.source or c("None", DIM))
kv("codec", p.codec or c("None", DIM))
kv("group", p.group, YELLOW if p.group == "UNKNOWN" else GREEN)
kv("tech_string", p.tech_string or c("(empty)", DIM))
# Derived booleans
hr()
kv("is_movie", c(str(p.is_movie), GREEN if p.is_movie else DIM))
kv("is_season_pack", c(str(p.is_season_pack), GREEN if p.is_season_pack else DIM))
# Generated names
hr()
title_for_names = tmdb_title or p.title.replace(".", " ")
year_for_names = tmdb_year or p.year or 0
if p.is_movie:
kv("movie_folder_name", p.movie_folder_name(title_for_names, year_for_names))
kv("movie_filename", p.movie_filename(title_for_names, year_for_names, ext))
else:
kv("show_folder_name", p.show_folder_name(title_for_names, year_for_names))
kv("season_folder_name", p.season_folder_name())
if not p.is_season_pack:
kv("episode_filename", p.episode_filename(tmdb_episode_title, ext))
else:
kv("episode_filename", c("(season pack — no episode filename)", DIM))
if tmdb_title or tmdb_year or tmdb_episode_title:
hr()
print(c(" TMDB data used:", DIM))
if tmdb_title:
kv(" tmdb_title", tmdb_title)
if tmdb_year:
kv(" tmdb_year", str(tmdb_year))
if tmdb_episode_title:
kv(" tmdb_episode_title", tmdb_episode_title)
print(c("" * 64, BOLD))
print()
# ---------------------------------------------------------------------------
# Interactive mode
# ---------------------------------------------------------------------------
def _interactive() -> None:
print(c("\n Alfred — Release Parser REPL", BOLD, CYAN))
print(c(" Type a release name, or 'q' to quit.", DIM))
print(
c(
" Inline overrides: ::title=Oz ::year=1997 ::ep=The.Routine ::ext=.mkv\n",
DIM,
)
)
while True:
try:
raw = input(c(" release> ", BOLD)).strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not raw or raw.lower() in ("q", "quit", "exit"):
break
# Parse inline overrides: "Oz.S03E01... ::title=Oz ::year=1997 ::tmdb"
parts = raw.split("::")
release = parts[0].strip()
overrides: dict[str, str] = {}
for part in parts[1:]:
part = part.strip()
if "=" in part:
k, _, v = part.partition("=")
overrides[k.strip()] = v.strip()
else:
overrides[part] = "1" # flag-style: ::tmdb
tmdb_title = overrides.get("title")
tmdb_year = int(overrides["year"]) if "year" in overrides else None
tmdb_episode_title = overrides.get("ep")
ext = overrides.get("ext", ".mkv")
try:
_show(release, tmdb_title, tmdb_year, tmdb_episode_title, ext)
except Exception as e:
print(c(f" Error: {e}", RED))
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
global USE_COLOR
parser = argparse.ArgumentParser(
description="Test ParsedRelease from domain/release/release_parser.py",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("release", nargs="?", help="Release name to parse")
parser.add_argument(
"-i", "--interactive", action="store_true", help="Interactive REPL mode"
)
parser.add_argument(
"--tmdb-title", metavar="TITLE", help="Override TMDB title for name generation"
)
parser.add_argument(
"--tmdb-year",
metavar="YEAR",
type=int,
help="Override TMDB year for name generation",
)
parser.add_argument(
"--episode-title",
metavar="TITLE",
help="TMDB episode title for episode_filename()",
)
parser.add_argument(
"--ext",
default=".mkv",
metavar="EXT",
help="File extension for filename generation (default: .mkv)",
)
parser.add_argument("--no-color", action="store_true")
args = parser.parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
if args.interactive:
_interactive()
return
if not args.release:
parser.print_help()
sys.exit(1)
try:
_show(
args.release, args.tmdb_title, args.tmdb_year, args.episode_title, args.ext
)
except Exception as e:
print(c(f"Error: {e}", RED), file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
-164
View File
@@ -1,164 +0,0 @@
#!/usr/bin/env python3
"""
probe_video.py — Display MediaInfo extracted by ffprobe for a video file.
Usage:
uv run testing/probe_video.py /path/to/video.mkv
uv run testing/probe_video.py /path/to/video.mkv --no-color
"""
import argparse
import sys
from pathlib import Path
_PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colours
# ---------------------------------------------------------------------------
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
BLUE = "\033[34m"
USE_COLOR = True
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return str(text)
return "".join(codes) + str(text) + RESET
def kv(key: str, val: str, indent: int = 4, color: str = CYAN) -> None:
print(f"{' ' * indent}{c(key + ':', BOLD)} {c(val, color)}")
def section(title: str) -> None:
print()
print(f" {c('' + title, BOLD, BLUE)}")
def hr() -> None:
print(c("" * 70, DIM))
# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------
def fmt_duration(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
if h:
return f"{h}h {m:02d}m {s:02d}s"
return f"{m}m {s:02d}s"
def fmt_channels(channels: int | None, layout: str | None) -> str:
parts = []
if channels is not None:
parts.append(str(channels) + "ch")
if layout:
parts.append(f"({layout})")
return " ".join(parts) if parts else ""
def flag(val: bool) -> str:
return c("yes", GREEN) if val else c("no", DIM)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
global USE_COLOR
parser = argparse.ArgumentParser(description="Probe a video file with ffprobe")
parser.add_argument("file", help="Path to the video file")
parser.add_argument("--no-color", action="store_true")
args = parser.parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
path = Path(args.file)
if not path.exists():
print(c(f"Error: {path} does not exist", RED), file=sys.stderr)
sys.exit(1)
from alfred.infrastructure.probe import FfprobeMediaProber
info = FfprobeMediaProber().probe(path)
if info is None:
print(c("Error: ffprobe failed to probe the file", RED), file=sys.stderr)
sys.exit(1)
print()
print(c("" * 70, BOLD))
print(c(f" {path.name}", BOLD, CYAN))
print(c(f" {path}", DIM))
print(c("" * 70, BOLD))
# --- Video ---
section("Video")
kv("codec", info.video_codec or c("", DIM))
kv("resolution", info.resolution or c("", DIM))
if info.width and info.height:
kv("dimensions", f"{info.width} × {info.height}")
if info.duration_seconds is not None:
kv("duration", fmt_duration(info.duration_seconds))
if info.bitrate_kbps is not None:
kv("bitrate", f"{info.bitrate_kbps} kbps")
# --- Audio ---
section(f"Audio {c(str(len(info.audio_tracks)) + ' track(s)', DIM)}")
if not info.audio_tracks:
print(f" {c('no audio tracks found', DIM)}")
for track in info.audio_tracks:
lang = track.language or "und"
default_marker = f" {c('default', GREEN, DIM)}" if track.is_default else ""
print(f" {c(f'[{track.index}]', BOLD)} {c(lang, YELLOW)}{default_marker}")
kv("codec", track.codec or c("", DIM), indent=8)
kv("channels", fmt_channels(track.channels, track.channel_layout), indent=8)
# --- Subtitles ---
section(f"Subtitles {c(str(len(info.subtitle_tracks)) + ' track(s)', DIM)}")
if not info.subtitle_tracks:
print(f" {c('no embedded subtitle tracks', DIM)}")
for track in info.subtitle_tracks:
lang = track.language or "und"
markers = []
if track.is_default:
markers.append(c("default", GREEN, DIM))
if track.is_forced:
markers.append(c("forced", YELLOW, DIM))
marker_str = (" " + " ".join(markers)) if markers else ""
print(f" {c(f'[{track.index}]', BOLD)} {c(lang, YELLOW)}{marker_str}")
kv("codec", track.codec or c("", DIM), indent=8)
# --- Summary ---
print()
hr()
multi = c("yes", GREEN) if info.is_multi_audio else c("no", DIM)
langs = ", ".join(info.audio_languages) if info.audio_languages else c("", DIM)
print(
f" {c('multi-audio:', BOLD)} {multi} {c('languages:', BOLD)} {c(langs, CYAN)}"
)
hr()
print()
if __name__ == "__main__":
main()
-220
View File
@@ -1,220 +0,0 @@
#!/usr/bin/env python3
"""
recognize_folders_in_downloads.py — Parse every folder/file in the downloads directory.
Usage:
uv run testing/recognize_folders_in_downloads.py
uv run testing/recognize_folders_in_downloads.py --path /mnt/testipool/downloads
uv run testing/recognize_folders_in_downloads.py --failures-only
uv run testing/recognize_folders_in_downloads.py --successes-only
"""
import argparse
import sys
from pathlib import Path
_PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colours
# ---------------------------------------------------------------------------
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
USE_COLOR = True
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return str(text)
return "".join(codes) + str(text) + RESET
def kv(key: str, val: str, indent: int = 4, color: str = CYAN) -> None:
print(f"{' ' * indent}{c(key + ':', BOLD)} {c(val, color)}")
def hr() -> None:
print(c("" * 70, DIM))
# ---------------------------------------------------------------------------
# Parsing quality check
# ---------------------------------------------------------------------------
def _assess(p) -> list[str]:
"""Return a list of warning strings for fields that look wrong."""
if p.media_type in ("other", "unknown"):
return []
warnings = []
if p.group == "UNKNOWN":
warnings.append("group not found")
if not p.quality:
warnings.append("resolution not found")
if not p.codec:
warnings.append("codec not found")
if not p.title or p.title == p.normalised:
warnings.append("title extraction likely wrong")
return warnings
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
global USE_COLOR
parser = argparse.ArgumentParser(
description="Recognize release folders in downloads"
)
parser.add_argument(
"--path",
default="/mnt/testipool/downloads",
help="Downloads directory (default: /mnt/testipool/downloads)",
)
parser.add_argument(
"--failures-only", action="store_true", help="Show only entries with warnings"
)
parser.add_argument(
"--successes-only", action="store_true", help="Show only fully parsed entries"
)
parser.add_argument("--no-color", action="store_true")
args = parser.parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
downloads = Path(args.path)
if not downloads.exists():
print(c(f"Error: {downloads} does not exist", RED), file=sys.stderr)
sys.exit(1)
from dataclasses import replace
from alfred.application.release.detect_media_type import detect_media_type
from alfred.application.release.enrich_from_probe import enrich_from_probe
from alfred.domain.release.services import parse_release
from alfred.domain.release.value_objects import MediaTypeToken
from alfred.infrastructure.filesystem.find_video import find_video_file
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.probe import FfprobeMediaProber
_kb = YamlReleaseKnowledge()
_prober = FfprobeMediaProber()
entries = sorted(downloads.iterdir(), key=lambda p: p.name.lower())
total = len(entries)
ok_count = 0
warn_count = 0
print()
print(c("" * 70, BOLD))
print(c(f" Downloads — {downloads}", BOLD, CYAN))
print(c(f" {total} entries", DIM))
print(c("" * 70, BOLD))
for entry in entries:
name = entry.name
try:
p, _report = parse_release(name, _kb)
p = replace(p, media_type=MediaTypeToken(detect_media_type(p, entry, _kb)))
if p.media_type not in ("unknown", "other"):
video_file = find_video_file(entry)
if video_file:
media_info = _prober.probe(video_file)
if media_info:
p = enrich_from_probe(p, media_info, _kb)
warnings = _assess(p)
except Exception as e:
warnings = [f"parse error: {e}"]
p = None
has_warnings = bool(warnings)
if args.failures_only and not has_warnings:
continue
if args.successes_only and has_warnings:
continue
print()
path_label = ""
if p:
path_label = {
"direct": c("direct", GREEN, DIM),
"sanitized": c("sanitized", YELLOW),
"ai": c("ai", RED),
}.get(p.parse_path, p.parse_path)
if has_warnings:
warn_count += 1
print(f" {c('', YELLOW, BOLD)} {c(name, YELLOW)} {path_label}")
else:
ok_count += 1
print(f" {c('', GREEN, BOLD)} {c(name, BOLD)} {path_label}")
if p:
kind = {
"movie": "movie",
"tv_show": "season pack" if p.is_season_pack else "episode",
"tv_complete": c("tv complete", CYAN),
"documentary": c("documentary", CYAN),
"concert": c("concert", CYAN),
"other": c("other", RED),
"unknown": c("unknown", YELLOW),
}.get(p.media_type, p.media_type)
kv("type", kind)
kv("title", p.title)
if p.season is not None:
ep = f"E{p.episode:02d}" if p.episode is not None else ""
kv("season/ep", f"S{p.season:02d} / {ep}")
if p.year:
kv("year", str(p.year))
if p.languages:
kv("langs", " ".join(p.languages))
kv("quality", p.quality or c("", DIM))
kv("source", p.source or c("", DIM))
kv("codec", p.codec or c("", DIM))
if p.audio_codec:
ch = f" {p.audio_channels}" if p.audio_channels else ""
kv("audio", f"{p.audio_codec}{ch}")
if p.bit_depth or p.hdr_format:
hdr_parts = [x for x in [p.bit_depth, p.hdr_format] if x]
kv("hdr/depth", " ".join(hdr_parts))
if p.edition:
kv("edition", p.edition, color=YELLOW)
kv("group", p.group, color=YELLOW if p.group == "UNKNOWN" else GREEN)
if p.site_tag:
kv("site tag", p.site_tag, color=YELLOW)
if warnings:
for w in warnings:
print(f" {c('' + w, YELLOW)}")
# Summary
print()
hr()
skipped = total - ok_count - warn_count
print(
f" {c('Total:', BOLD)} {total} "
f"{c(str(ok_count) + ' ok', GREEN, BOLD)} "
f"{c(str(warn_count) + ' warnings', YELLOW, BOLD)}"
+ (f" {c(str(skipped) + ' filtered', DIM)}" if skipped else "")
)
hr()
print()
if __name__ == "__main__":
main()
-575
View File
@@ -1,575 +0,0 @@
#!/usr/bin/env python3
"""
scan_subtitles.py — CLI pour tester le pipeline de scan de sous-titres Alfred.
Usage:
uv run testing/subtitles/scan_subtitles.py <season_folder> [options]
Options:
--release-group RARBG Groupe de release (optionnel — active les known patterns)
--pattern adjacent Forcer un pattern (adjacent|flat|episode_subfolder|embedded)
--video FILE Fichier vidéo de référence (défaut: premier .mkv/.mp4 trouvé)
--verbose Détails sur chaque token analysé
--no-color Désactive la colorisation
Exemples:
uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01"
uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" --release-group RARBG
uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" --pattern episode_subfolder --verbose
"""
import argparse
import sys
import textwrap
from pathlib import Path
# Ajoute la racine du projet au path (testing/subtitles/ → ../../)
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colorisation simple (pas de dépendance externe)
# ---------------------------------------------------------------------------
USE_COLOR = True
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return text
return "".join(codes) + text + RESET
def section(title: str) -> None:
width = 70
print()
print(c("" * width, DIM))
print(c(f" {title}", BOLD, CYAN))
print(c("" * width, DIM))
def ok(msg: str) -> None:
print(c("", GREEN, BOLD) + msg)
def warn(msg: str) -> None:
print(c("", YELLOW, BOLD) + msg)
def err(msg: str) -> None:
print(c("", RED, BOLD) + msg)
def info(msg: str, indent: int = 2) -> None:
print(" " * indent + msg)
def kv(key: str, value: str, indent: int = 4) -> None:
print(" " * indent + c(f"{key}: ", BOLD) + value)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".mov", ".ts", ".m2ts"}
def find_videos(folder: Path) -> list[Path]:
return sorted(
p for p in folder.iterdir() if p.is_file() and p.suffix.lower() in VIDEO_EXTS
)
def confidence_bar(conf: float, width: int = 20) -> str:
filled = int(conf * width)
bar = "" * filled + "" * (width - filled)
if conf >= 0.8:
color = GREEN
elif conf >= 0.5:
color = YELLOW
else:
color = RED
return c(bar, color) + c(f" {conf:.0%}", BOLD)
def track_summary(track, verbose: bool = False) -> None:
lang = track.language.code if track.language else c("?", RED)
fmt = track.format.id if track.format else c("?", RED)
typ = track.subtitle_type.value
src = (
"embedded"
if track.is_embedded
else (track.file_path.name if track.file_path else "?")
)
# Couleur du type
type_colors = {
"standard": GREEN,
"sdh": YELLOW,
"forced": BLUE,
"unknown": RED,
}
typ_str = c(typ, type_colors.get(typ, RESET))
unresolved = not track.is_embedded and track.language is None
clarif = c(" [langue inconnue]", RED, BOLD) if unresolved else ""
print(f" {c(src, BOLD)}")
print(f" lang={c(lang, CYAN)} type={typ_str} format={fmt}")
conf_str = (
c("n/a (embedded)", DIM)
if track.is_embedded
else confidence_bar(track.confidence)
)
print(f" confidence={conf_str}{clarif}")
if track.entry_count is not None:
print(
f" entries={track.entry_count} size={track.file_size_kb:.1f} KB"
if track.file_size_kb
else f" entries={track.entry_count}"
)
if verbose and track.raw_tokens:
print(f" tokens={track.raw_tokens}")
if track.is_resolved() and track.language and track.format:
try:
dest = track.destination_name
print(f"{c(dest, GREEN, BOLD)}")
except ValueError:
pass
# ---------------------------------------------------------------------------
# Étapes du pipeline
# ---------------------------------------------------------------------------
def step_load_kb() -> SubtitleKnowledgeBase:
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader
section("ÉTAPE 1 — Chargement de la base de connaissances")
kb = SubtitleKnowledgeBase(KnowledgeLoader())
fmts = kb.formats()
langs = kb.languages()
patterns = kb.patterns()
ok(f"{len(fmts)} format(s) connu(s): {', '.join(fmts.keys())}")
ok(f"{len(langs)} langue(s) connue(s): {', '.join(langs.keys())}")
ok(f"{len(patterns)} pattern(s) connu(s): {', '.join(patterns.keys())}")
total_tokens = sum(len(l.tokens) for l in langs.values())
info(c(f"{total_tokens} tokens de langue au total", DIM), indent=4)
return kb
def step_detect_pattern(
kb: SubtitleKnowledgeBase,
season_folder: Path,
sample_video: Path,
release_group: str | None,
forced_pattern: str | None,
) -> SubtitlePattern:
from alfred.domain.subtitles.services.pattern_detector import PatternDetector
section("ÉTAPE 2 — Détection du pattern de release")
# Priorité: forced > known patterns from release_group > auto-detect
if forced_pattern:
pattern = kb.pattern(forced_pattern)
if not pattern:
err(f"Pattern inconnu: '{forced_pattern}'")
print(f" Patterns disponibles: {', '.join(kb.patterns().keys())}")
sys.exit(1)
ok(f"Pattern forcé: {c(forced_pattern, CYAN, BOLD)}")
return pattern
if release_group:
known = kb.patterns_for_group(release_group)
if known:
kv("Release group", release_group)
ok(
f"Pattern(s) connu(s) pour {release_group}: {', '.join(p.id for p in known)}"
)
pattern = known[0]
kv("Pattern sélectionné", c(pattern.id, CYAN, BOLD))
return pattern
else:
warn(f"Groupe '{release_group}' inconnu — lancement de la détection auto")
# Auto-detect
kv("Dossier analysé", str(season_folder))
kv("Vidéo de référence", sample_video.name)
detector = PatternDetector(kb)
result = detector.detect(season_folder, sample_video)
findings = result.get("raw_findings", {})
info(c("Observations:", BOLD), indent=4)
for key, val in findings.items():
if val not in (False, None, 0):
info(f" {key}: {c(str(val), CYAN)}", indent=4)
detected = result.get("detected")
confidence = result.get("confidence", 0.0)
description = result.get("description", "")
print()
info(c(f'Description: "{description}"', DIM), indent=4)
print(f" Confiance: {confidence_bar(confidence)}")
if detected:
ok(f"Pattern détecté: {c(detected.id, CYAN, BOLD)}")
kv("Stratégie de scan", detected.scan_strategy.value)
kv("Détection de type", detected.type_detection.value)
if detected.root_folder:
kv("Dossier racine", detected.root_folder)
return detected
else:
warn("Aucun pattern détecté avec confiance suffisante — fallback: adjacent")
fallback = kb.pattern("adjacent")
if not fallback:
err("Pattern 'adjacent' introuvable dans la KB !")
sys.exit(1)
return fallback
def step_identify_tracks(
kb: SubtitleKnowledgeBase,
sample_video: Path,
pattern: SubtitlePattern,
release_group: str | None,
verbose: bool,
) -> MediaSubtitleMetadata:
from alfred.domain.subtitles.services.identifier import SubtitleIdentifier
section("ÉTAPE 3 — Identification des pistes")
kv("Vidéo", sample_video.name)
kv("Pattern", pattern.id)
identifier = SubtitleIdentifier(kb)
metadata = identifier.identify(
video_path=sample_video,
pattern=pattern,
media_id=None,
media_type="tv_show",
release_group=release_group,
)
n_emb = len(metadata.embedded_tracks)
n_ext = len(metadata.external_tracks)
n_unresolved = len(metadata.unresolved_tracks)
print()
ok(f"{n_ext} piste(s) externe(s) trouvée(s)")
if n_emb:
ok(f"{n_emb} piste(s) embarquée(s) (ffprobe)")
if n_unresolved:
warn(f"{n_unresolved} piste(s) externe(s) sans langue reconnue")
if metadata.external_tracks:
print()
info(c("Pistes externes:", BOLD))
for track in metadata.external_tracks:
track_summary(track, verbose)
if metadata.embedded_tracks:
print()
info(c("Pistes embarquées:", BOLD))
for track in metadata.embedded_tracks:
track_summary(track, verbose)
return metadata
def step_apply_rules(
metadata: MediaSubtitleMetadata,
release_group: str | None,
) -> tuple[SubtitleMatchingRules | None, list, list]:
from alfred.domain.subtitles.aggregates import DEFAULT_RULES
from alfred.domain.subtitles.services.matcher import SubtitleMatcher
from alfred.domain.subtitles.services.utils import available_subtitles
from alfred.domain.subtitles.value_objects import ScanStrategy
section("ÉTAPE 4 — Application des règles")
# Cas embedded : pas de matcher, on liste directement les pistes disponibles
if metadata.detected_pattern_id == ScanStrategy.EMBEDDED.value:
info(c("Pattern embedded — le matcher est court-circuité", DIM), indent=4)
tracks = available_subtitles(metadata.embedded_tracks)
ok(f"{len(tracks)} piste(s) disponible(s)")
return None, tracks, []
rules = DEFAULT_RULES()
kv("Langues préférées", str(rules.preferred_languages))
kv("Formats préférés", str(rules.preferred_formats))
kv("Types autorisés", str(rules.allowed_types))
kv("Confiance min", str(rules.min_confidence))
info(
c("(règles globales par défaut — pas de .alfred/ en mode scan)", DIM), indent=4
)
matcher = SubtitleMatcher()
matched, unresolved = matcher.match(metadata.external_tracks, rules)
print()
ok(f"{len(matched)} piste(s) retenue(s)")
if unresolved:
warn(f"{len(unresolved)} piste(s) écartée(s) ou non résolue(s)")
return rules, matched, unresolved
def step_show_results(
matched: list,
unresolved: list,
is_embedded: bool,
verbose: bool,
) -> None:
section("RÉSULTAT FINAL")
if matched:
label = (
"piste(s) disponible(s)" if is_embedded else "piste(s) qui seraient placées"
)
ok(f"{len(matched)} {label}:")
for track in matched:
lang = track.language.code if track.language else "?"
typ = track.subtitle_type.value
if is_embedded:
print(f" {c(lang, CYAN)} {c(typ, GREEN)}")
else:
try:
dest = track.destination_name
src = track.file_path.name if track.file_path else "?"
print(f" {c(src, DIM)}{c(dest, GREEN, BOLD)}")
except ValueError:
warn(f" Piste incomplète (lang ou format manquant): {track}")
else:
warn("Aucune piste retenue.")
if unresolved:
print()
warn(f"{len(unresolved)} piste(s) écartées ou à clarifier:")
for track in unresolved:
src = track.file_path.name if track.file_path else "?"
reason = (
"langue inconnue"
if track.language is None
else "confiance insuffisante"
)
line = f" {c(src, DIM)} ({reason})"
if verbose and track.raw_tokens:
line += c(f" tokens: {track.raw_tokens}", YELLOW)
print(line)
print()
# ---------------------------------------------------------------------------
# Scan multi-épisodes (résumé)
# ---------------------------------------------------------------------------
def scan_season(
kb: SubtitleKnowledgeBase,
pattern: SubtitlePattern,
season_folder: Path,
release_group: str | None,
verbose: bool,
) -> None:
from alfred.domain.subtitles.aggregates import DEFAULT_RULES
from alfred.domain.subtitles.services.identifier import SubtitleIdentifier
from alfred.domain.subtitles.services.matcher import SubtitleMatcher
videos = find_videos(season_folder)
section(f"SCAN COMPLET DE LA SAISON ({len(videos)} épisode(s))")
if not videos:
warn("Aucun fichier vidéo trouvé dans ce dossier.")
return
identifier = SubtitleIdentifier(kb)
matcher = SubtitleMatcher()
rules = DEFAULT_RULES()
col_w = max(len(v.name) for v in videos) + 2
for video in videos:
metadata = identifier.identify(
video_path=video,
pattern=pattern,
media_id=None,
media_type="tv_show",
release_group=release_group,
)
matched, unresolved = matcher.match(metadata.external_tracks, rules)
placed_names = []
for t in matched:
try:
placed_names.append(t.destination_name)
except ValueError:
pass
status_icon = c("", GREEN, BOLD) if placed_names else c("", RED, BOLD)
warn_icon = (
c(f" [{len(unresolved)} non-résolue(s)]", YELLOW) if unresolved else ""
)
print(
f" {status_icon} {video.name:{col_w}} {c(', '.join(placed_names) or '', GREEN if placed_names else DIM)}{warn_icon}"
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Scanner de sous-titres Alfred — pipeline de diagnostic",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(__doc__ or ""),
)
parser.add_argument("season_folder", help="Dossier de la saison (ou du film)")
parser.add_argument(
"--release-group",
"-g",
metavar="GROUP",
help="Groupe de release (ex: RARBG, KONSTRAST)",
)
parser.add_argument(
"--pattern",
"-p",
metavar="PATTERN",
help="Forcer un pattern (adjacent|flat|episode_subfolder|embedded)",
)
parser.add_argument(
"--video",
"-v",
metavar="FILE",
help="Fichier vidéo de référence (défaut: premier trouvé)",
)
parser.add_argument(
"--verbose", action="store_true", help="Affiche les tokens bruts par piste"
)
parser.add_argument(
"--no-color", action="store_true", help="Désactive la colorisation ANSI"
)
parser.add_argument(
"--season-scan",
action="store_true",
help="Après le diagnostic, scanner tous les épisodes de la saison",
)
return parser.parse_args()
def main() -> None:
global USE_COLOR
args = parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
season_folder = Path(args.season_folder).expanduser().resolve()
if not season_folder.is_dir():
print(f"Erreur: '{season_folder}' n'est pas un dossier.", file=sys.stderr)
sys.exit(1)
print()
print(c("" * 70, BOLD))
print(c(" Alfred — Subtitle Scanner", BOLD, MAGENTA))
print(c("" * 70, BOLD))
kv("Dossier", str(season_folder), indent=2)
# Trouver la vidéo de référence
if args.video:
sample_video = Path(args.video).expanduser().resolve()
if not sample_video.exists():
print(f"Erreur: '{sample_video}' introuvable.", file=sys.stderr)
sys.exit(1)
else:
videos = find_videos(season_folder)
if not videos:
# Chercher un niveau plus bas (structure release root)
for sub in season_folder.iterdir():
if sub.is_dir():
videos = find_videos(sub)
if videos:
break
if not videos:
print(
"Erreur: aucun fichier vidéo trouvé dans ce dossier.", file=sys.stderr
)
sys.exit(1)
sample_video = videos[0]
kv("Vidéo de référence", sample_video.name, indent=2)
# ---- Pipeline ----
kb = step_load_kb()
pattern = step_detect_pattern(
kb=kb,
season_folder=season_folder,
sample_video=sample_video,
release_group=args.release_group,
forced_pattern=args.pattern,
)
metadata = step_identify_tracks(
kb=kb,
sample_video=sample_video,
pattern=pattern,
release_group=args.release_group,
verbose=args.verbose,
)
rules, matched, unresolved = step_apply_rules(
metadata=metadata,
release_group=args.release_group,
)
step_show_results(
matched=matched,
unresolved=unresolved,
is_embedded=rules is None,
verbose=args.verbose,
)
if args.season_scan:
scan_season(
kb=kb,
pattern=pattern,
season_folder=season_folder,
release_group=args.release_group,
verbose=args.verbose,
)
print(c("" * 70, BOLD))
print()
if __name__ == "__main__":
main()
-643
View File
@@ -1,643 +0,0 @@
#!/usr/bin/env python3
"""
run_workflow.py — Simulate an Alfred workflow step by step (dry-run or live).
Usage:
uv run testing/workflows/run_workflow.py organize_media [options]
Options:
--dry-run Print what each step would do without executing tools (default).
--live Actually execute the tools (uses real filesystem + memory).
--source PATH Source video file (download folder).
--dest PATH Destination video file (library path).
--download-folder P Original download folder (for create_seed_links).
--imdb-id ID IMDb ID for identify_media step (tt1234567).
--seed Answer "yes" to the seeding question.
--no-color Disable ANSI colours.
Examples:
uv run testing/workflows/run_workflow.py organize_media --dry-run \\
--source "/downloads/Breaking.Bad.S01E01.mkv" \\
--dest "/tv/Breaking Bad/Season 01/Breaking Bad.S01E01.mkv"
uv run testing/workflows/run_workflow.py organize_media --live \\
--source "/downloads/BB/Breaking.Bad.S01E01.mkv" \\
--dest "/tv/Breaking Bad/Season 01/Breaking Bad.S01E01.mkv" \\
--download-folder "/downloads/BB" --seed
"""
import argparse
import sys
import textwrap
from pathlib import Path
from typing import Any
# Project root on sys.path
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colours
# ---------------------------------------------------------------------------
USE_COLOR = True
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return text
return "".join(codes) + str(text) + RESET
def section(title: str) -> None:
print()
print(c("" * 70, DIM))
print(c(f" {title}", BOLD, CYAN))
print(c("" * 70, DIM))
def ok(msg: str) -> None:
print(c("", GREEN, BOLD) + msg)
def warn(msg: str) -> None:
print(c("", YELLOW, BOLD) + msg)
def err(msg: str) -> None:
print(c("", RED, BOLD) + msg)
def info(msg: str) -> None:
print(f" {msg}")
def kv(key: str, val: str) -> None:
print(f" {c(key + ':', BOLD)} {val}")
# ---------------------------------------------------------------------------
# Dry-run tool stubs
# ---------------------------------------------------------------------------
def _real_list_folder(folder_type: str, path: str = ".") -> dict[str, Any]:
"""Call the real list_folder (read-only, safe in dry-run)."""
# TODO: remove hardcoded fallback once download path is configured in LTM
_HARDCODED_DOWNLOAD_ROOT = "/mnt/testipool/downloads"
try:
from alfred.infrastructure.persistence import get_memory, init_memory
try:
get_memory()
except Exception:
init_memory()
from alfred.agent.tools.filesystem import list_folder
result = list_folder(folder_type=folder_type, path=path)
if result.get("status") == "error" and folder_type == "download":
raise RuntimeError(result.get("message", "not configured"))
return result
except Exception as e:
if folder_type == "download":
warn(
f"list_folder: {e} — using hardcoded download root: {_HARDCODED_DOWNLOAD_ROOT}"
)
import os
resolved = (
os.path.join(_HARDCODED_DOWNLOAD_ROOT, path)
if path != "."
else _HARDCODED_DOWNLOAD_ROOT
)
try:
entries = sorted(os.listdir(resolved))
except OSError as oe:
return {"status": "error", "error": "os_error", "message": str(oe)}
return {
"status": "ok",
"folder_type": folder_type,
"path": resolved,
"entries": entries,
"count": len(entries),
}
warn(f"list_folder: filesystem unavailable ({e}), falling back to stub")
return {
"status": "ok",
"folder_type": folder_type,
"path": path,
"entries": ["[stub — filesystem unavailable]"],
"count": 1,
}
def _real_find_media_imdb_id(media_title: str, **kwargs) -> dict[str, Any]:
"""Call the real TMDB API even in dry-run (read-only, no filesystem side effects)."""
try:
from alfred.infrastructure.persistence import get_memory, init_memory
try:
get_memory()
except Exception:
init_memory()
from alfred.agent.tools.api import find_media_imdb_id
return find_media_imdb_id(media_title=media_title)
except Exception as e:
warn(f"find_media_imdb_id: TMDB unavailable ({e}), falling back to stub")
return {
"status": "ok",
"imdb_id": "tt0000000",
"title": media_title,
"media_type": "tv_show",
"year": 2024,
}
def _dry_resolve_destination(
release_name: str,
source_file: str,
tmdb_title: str,
tmdb_year: int,
tmdb_episode_title: str | None = None,
confirmed_folder: str | None = None,
) -> dict[str, Any]:
from alfred.domain.release import parse_release
parsed = parse_release(release_name)
ext = Path(source_file).suffix
if parsed.is_movie:
folder = parsed.movie_folder_name(tmdb_title, tmdb_year)
fname = parsed.movie_filename(tmdb_title, tmdb_year, ext)
return {
"status": "ok",
"library_file": f"/movies/{folder}/{fname}",
"series_folder": f"/movies/{folder}",
"series_folder_name": folder,
"season_folder": None,
"season_folder_name": None,
"filename": fname,
"is_new_series_folder": True,
}
season_folder = parsed.season_folder_name()
show_folder = confirmed_folder or parsed.show_folder_name(tmdb_title, tmdb_year)
fname = (
parsed.episode_filename(tmdb_episode_title, ext)
if not parsed.is_season_pack
else season_folder + ext
)
return {
"status": "ok",
"library_file": f"/tv/{show_folder}/{season_folder}/{fname}",
"series_folder": f"/tv/{show_folder}",
"season_folder": f"/tv/{show_folder}/{season_folder}",
"series_folder_name": show_folder,
"season_folder_name": season_folder,
"filename": fname,
"is_new_series_folder": confirmed_folder is None,
}
def _dry_move_media(source: str, destination: str) -> dict[str, Any]:
return {
"status": "ok",
"source": source,
"destination": destination,
"filename": Path(destination).name,
"size": 0,
}
def _dry_manage_subtitles(source_video: str, destination_video: str) -> dict[str, Any]:
return {
"status": "ok",
"video_path": destination_video,
"placed": [],
"placed_count": 0,
"skipped_count": 0,
}
def _dry_create_seed_links(
library_file: str, original_download_folder: str
) -> dict[str, Any]:
return {
"status": "ok",
"torrent_subfolder": f"/torrents/{Path(original_download_folder).name}",
"linked_file": f"/torrents/{Path(original_download_folder).name}/{Path(library_file).name}",
"copied_files": ["[dry-run — no real copy]"],
"copied_count": 1,
"skipped": [],
}
DRY_RUN_TOOLS: dict[str, Any] = {
"list_folder": _real_list_folder,
"find_media_imdb_id": _real_find_media_imdb_id,
"resolve_destination": _dry_resolve_destination,
"move_media": _dry_move_media,
"manage_subtitles": _dry_manage_subtitles,
"create_seed_links": _dry_create_seed_links,
}
# ---------------------------------------------------------------------------
# Live tools
# ---------------------------------------------------------------------------
def _load_live_tools() -> dict[str, Any]:
from alfred.agent.tools.filesystem import (
create_seed_links,
list_folder,
manage_subtitles,
move_media,
)
# find_media_imdb_id lives in the api tools
try:
from alfred.agent.tools.api import find_media_imdb_id
except ImportError:
def find_media_imdb_id(**kwargs): # type: ignore[misc]
return {
"status": "error",
"error": "not_available",
"message": "api tools not loaded",
}
return {
"list_folder": list_folder,
"find_media_imdb_id": find_media_imdb_id,
"move_media": move_media,
"manage_subtitles": manage_subtitles,
"create_seed_links": create_seed_links,
}
# ---------------------------------------------------------------------------
# Workflow runner
# ---------------------------------------------------------------------------
class WorkflowRunner:
def __init__(
self,
workflow: dict,
tools: dict[str, Any],
live: bool,
args: argparse.Namespace,
):
self.workflow = workflow
self.tools = tools
self.live = live
self.args = args
self.context: dict[str, Any] = {} # step results accumulate here
self.step_results: list[dict] = []
def run(self) -> None:
name = self.workflow.get("name", "?")
desc = self.workflow.get("description", "").strip()
mode = c("LIVE", RED, BOLD) if self.live else c("DRY-RUN", YELLOW, BOLD)
print()
print(c("" * 70, BOLD))
print(c(f" Alfred — Workflow Simulator [{mode}]", BOLD, MAGENTA))
print(c("" * 70, BOLD))
kv("Workflow", c(name, CYAN, BOLD))
kv("Description", desc)
kv("Tools allowed", ", ".join(self.workflow.get("tools", [])))
steps = self.workflow.get("steps", [])
for step in steps:
self._run_step(step)
section("SIMULATION TERMINÉE")
ok(f"{len(self.step_results)} step(s) exécuté(s)")
errors = [
r for r in self.step_results if r.get("result", {}).get("status") == "error"
]
if errors:
warn(f"{len(errors)} step(s) en erreur")
for r in errors:
err(
f" {r['id']}: {r['result'].get('error')}{r['result'].get('message')}"
)
print()
print(c("" * 70, BOLD))
print()
def _run_step(self, step: dict) -> None:
step_id = step.get("id", "?")
# --- ask_user step ---
if "ask_user" in step:
section(f"STEP [{step_id}] — ask_user")
q = step["ask_user"].get("question", "")
answers = step["ask_user"].get("answers", {})
info(c(f'Question: "{q}"', BOLD))
info(f"Réponses possibles: {', '.join(str(k) for k in answers.keys())}")
answer = "yes" if self.args.seed else "no"
# PyYAML parses bare yes/no as booleans — normalise keys to str
answers_str = {str(k): v for k, v in answers.items()}
next_step = answers_str.get(answer, {}).get("next_step", "update_library")
ok(f"Réponse simulée: {c(answer, CYAN)} → next: {c(next_step, CYAN)}")
self.context["seeding"] = answer == "yes"
self.context["ask_seeding_answer"] = answer
self.context["next_after_ask"] = next_step
# If "no", skip create_seed_links
if answer == "no":
self.context["skip_create_seed_links"] = True
return
# --- memory_write step ---
if "memory_write" in step:
section(f"STEP [{step_id}] — memory_write ({step['memory_write']})")
if self.live:
warn("memory_write: pas encore implémenté dans le simulator live")
else:
ok("(dry-run) Library entry would be written to LTM")
self.step_results.append({"id": step_id, "result": {"status": "ok"}})
return
# --- tool step ---
tool_name = step.get("tool")
if not tool_name:
warn(f"Step '{step_id}' has no tool or ask_user — skipped")
return
# Skip create_seed_links if user said no to seeding
if tool_name == "create_seed_links" and self.context.get(
"skip_create_seed_links"
):
section(f"STEP [{step_id}] — {tool_name}")
warn("Skipped (user chose not to seed)")
return
section(f"STEP [{step_id}] — {c(tool_name, CYAN, BOLD)}")
desc = step.get("description", "").strip()
if desc:
info(c(desc, DIM))
kwargs = self._build_kwargs(tool_name, step)
for k, v in kwargs.items():
kv(k, str(v))
if tool_name not in self.tools:
err(f"Tool '{tool_name}' not found in tool registry")
self.step_results.append(
{"id": step_id, "result": {"status": "error", "error": "unknown_tool"}}
)
return
try:
result = self.tools[tool_name](**kwargs)
except Exception as e:
err(f"Tool raised an exception: {e}")
self.step_results.append(
{"id": step_id, "result": {"status": "error", "error": str(e)}}
)
return
self._print_result(result, tool_name=tool_name)
self.context[step_id] = result
self.step_results.append({"id": step_id, "result": result})
# After list_downloads: confirm the requested media folder exists in downloads
if (
tool_name == "list_folder"
and result.get("status") == "ok"
and self.args.source
):
folder_path = result.get("path", "")
entries = result.get("entries", [])
if self.args.source in entries:
media_folder = str(Path(folder_path) / self.args.source)
self.context["media_folder"] = media_folder
print()
print(
f" {c('Dossier media trouvé:', BOLD, GREEN)} {c(media_folder, CYAN, BOLD)}"
)
else:
warn(f"Dossier '{self.args.source}' introuvable dans {folder_path}")
def _build_kwargs(self, tool_name: str, step: dict) -> dict[str, Any]:
"""Build tool kwargs from step params + CLI args + previous context."""
# Start from step-level params (static defaults from YAML)
kwargs: dict[str, Any] = dict(step.get("params") or {})
a = self.args
if tool_name == "list_folder":
kwargs.setdefault("folder_type", "download")
elif tool_name == "find_media_imdb_id":
if a.imdb_id:
kwargs["imdb_id"] = a.imdb_id
elif tool_name == "resolve_destination":
media_folder = self.context.get("media_folder")
if a.release:
kwargs["release_name"] = a.release
elif a.source:
kwargs.setdefault("release_name", a.source)
if media_folder:
kwargs["source_file"] = media_folder
if a.tmdb_title:
kwargs["tmdb_title"] = a.tmdb_title
if a.tmdb_year:
kwargs["tmdb_year"] = a.tmdb_year
if a.episode_title:
kwargs["tmdb_episode_title"] = a.episode_title
elif tool_name == "move_media":
# If resolve_destination ran, use its library_file as destination
resolved = self.context.get("resolve_destination", {})
media_folder = self.context.get("media_folder")
if media_folder:
kwargs["source"] = media_folder
dest = a.dest or resolved.get("library_file")
if dest:
kwargs["destination"] = dest
elif tool_name == "manage_subtitles":
resolved = self.context.get("resolve_destination", {})
media_folder = self.context.get("media_folder")
if media_folder:
kwargs["source_video"] = media_folder
dest = a.dest or resolved.get("library_file")
if dest:
kwargs["destination_video"] = dest
elif tool_name == "create_seed_links":
resolved = self.context.get("resolve_destination", {})
library_file = a.dest or resolved.get("library_file")
if library_file:
kwargs["library_file"] = library_file
if a.download_folder:
kwargs["original_download_folder"] = a.download_folder
else:
# Use the resolved folder path from list_downloads context
list_result = self.context.get("list_downloads", {})
folder_path = list_result.get("path")
if folder_path:
kwargs.setdefault("original_download_folder", folder_path)
return kwargs
def _print_result(self, result: dict, tool_name: str = "") -> None:
status = result.get("status", "?")
if status == "ok":
ok(f"status={c('ok', GREEN)}")
elif status == "needs_clarification":
warn(f"status={c('needs_clarification', YELLOW)}")
else:
err(
f"status={c(status, RED)} error={result.get('error')} msg={result.get('message')}"
)
return
# Highlight resolved folder path for list_folder
if tool_name == "list_folder" and result.get("path"):
print()
print(
f" {c('Dossier résolu:', BOLD, GREEN)} {c(result['path'], CYAN, BOLD)}"
)
# Pretty-print notable fields
skip = {"status", "error", "message"}
for k, v in result.items():
if k in skip:
continue
if isinstance(v, list):
if v:
info(c(f"{k}:", BOLD))
for item in v[:10]:
info(f"{item}")
if len(v) > 10:
info(c(f" … and {len(v) - 10} more", DIM))
else:
info(f"{c(k + ':', BOLD)} (empty)")
else:
kv(k, str(v))
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Alfred workflow simulator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(__doc__ or ""),
)
parser.add_argument("workflow", help="Workflow name (e.g. organize_media)")
parser.add_argument(
"--dry-run",
dest="dry_run",
action="store_true",
default=True,
help="Simulate steps without executing tools (default)",
)
parser.add_argument(
"--live",
action="store_true",
help="Actually execute tools against the real filesystem",
)
parser.add_argument(
"--source",
metavar="FOLDER_NAME",
help="Release folder name inside the download root (e.g. Oz.S03.1080p.WEBRip.x265-KONTRAST)",
)
parser.add_argument(
"--dest",
metavar="PATH",
help="Destination video file (in library, overrides resolve_destination)",
)
parser.add_argument(
"--download-folder",
metavar="PATH",
help="Original download folder (for create_seed_links)",
)
parser.add_argument(
"--imdb-id", metavar="ID", help="IMDb ID for identify_media (tt1234567)"
)
parser.add_argument(
"--release",
metavar="NAME",
help="Release name (e.g. Oz.S03.1080p.WEBRip.x265-KONTRAST)",
)
parser.add_argument(
"--tmdb-title", metavar="TITLE", help="Canonical title from TMDB (e.g. 'Oz')"
)
parser.add_argument(
"--tmdb-year",
metavar="YEAR",
type=int,
help="Start/release year from TMDB (e.g. 1997)",
)
parser.add_argument(
"--episode-title",
metavar="TITLE",
help="Episode title from TMDB for single-episode releases",
)
parser.add_argument(
"--seed", action="store_true", help='Answer "yes" to the seeding question'
)
parser.add_argument("--no-color", action="store_true")
return parser.parse_args()
def main() -> None:
global USE_COLOR
args = parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
if args.live:
args.dry_run = False
# Load workflow
from alfred.agent.workflows.loader import WorkflowLoader
loader = WorkflowLoader()
workflow = loader.get(args.workflow)
if not workflow:
print(f"Erreur: workflow '{args.workflow}' introuvable.", file=sys.stderr)
print(f"Disponibles: {', '.join(loader.names())}", file=sys.stderr)
sys.exit(1)
# Load tools
if args.live:
try:
tools = _load_live_tools()
except Exception as e:
print(f"Erreur chargement des tools live: {e}", file=sys.stderr)
sys.exit(1)
else:
tools = DRY_RUN_TOOLS
runner = WorkflowRunner(workflow, tools, live=args.live, args=args)
runner.run()
if __name__ == "__main__":
main()
+1 -1
View File
@@ -19,7 +19,7 @@ from pathlib import Path
import pytest
from alfred.application.release.detect_media_type import detect_media_type
from alfred.domain.release.services import parse_release
from alfred.domain.releases.parser.services import parse_release
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
_KB = YamlReleaseKnowledge()
+2 -2
View File
@@ -11,8 +11,8 @@ can't quietly drop EASY without us noticing.
from __future__ import annotations
from alfred.domain.release.parser import TokenRole
from alfred.domain.release.parser.pipeline import (
from alfred.domain.releases.parser import TokenRole
from alfred.domain.releases.parser import (
_detect_group,
annotate,
assemble,
@@ -8,8 +8,8 @@ is implemented and the fixtures-based suite switches over.
from __future__ import annotations
from alfred.domain.release.parser import Token, TokenRole
from alfred.domain.release.parser.pipeline import strip_site_tag, tokenize
from alfred.domain.releases.parser import Token, TokenRole
from alfred.domain.releases.parser import strip_site_tag, tokenize
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
_KB = YamlReleaseKnowledge()
@@ -10,15 +10,15 @@ from __future__ import annotations
import pytest
from alfred.domain.release.parser.scoring import (
from alfred.domain.releases.parser import (
Road,
collect_missing_critical,
collect_unknown_tokens,
compute_score,
decide_road,
)
from alfred.domain.release.parser.tokens import Token, TokenRole
from alfred.domain.release.services import parse_release
from alfred.domain.releases.parser.tokens import Token, TokenRole
from alfred.domain.releases.parser.services import parse_release
from alfred.domain.release.value_objects import (
MediaTypeToken,
ParsedRelease,
+1 -1
View File
@@ -18,7 +18,7 @@ from __future__ import annotations
import pytest
from alfred.domain.release.services import parse_release
from alfred.domain.releases.parser.services import parse_release
from alfred.domain.release.value_objects import ParsedRelease
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
+1 -1
View File
@@ -18,7 +18,7 @@ from dataclasses import asdict
import pytest
from alfred.domain.release.services import parse_release
from alfred.domain.releases.parser.services import parse_release
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from tests.fixtures.releases.conftest import ReleaseFixture, discover_fixtures