Files
alfred/testing/debug_release.py
francwa 9f1ce94690 refactor(application): inject kb/prober into resolve_destination use cases
Remove the module-level _KB / _PROBER singletons from
alfred/application/filesystem/resolve_destination.py. The four
resolve_{season,episode,movie,series}_destination use cases now take
kb: ReleaseKnowledge and prober: MediaProber as required arguments,
matching the shape of inspect_release.

The singletons now live at the agent-tools frontier
(alfred/agent/tools/filesystem.py), where the LLM-facing wrappers
instantiate YamlReleaseKnowledge / FfprobeMediaProber once and thread
them through. The wrappers' Python signatures are unchanged — the
inspect-based JSON-schema generator in agent/registry.py still sees the
same LLM-passable params.

analyze_release drops the dirty 'from ... import _KB' indirection.

Tests inject their own stubs by keyword (prober=_StubProber(...)) via
thin convenience wrappers, replacing the prior
monkeypatch.setattr(rd, '_PROBER', ...) pattern.

testing/debug_release.py: instantiate YamlReleaseKnowledge() /
FfprobeMediaProber() inline at the two call sites.

Suite: 1077 passed.
2026-05-21 07:46:13 +02:00

435 lines
14 KiB
Python

"""CLI de debug pour analyser une release et dry-run le déplacement."""
import json
import sys
from pathlib import Path
# Permet de lancer le script depuis n'importe où sans install du package
sys.path.insert(0, str(Path(__file__).parent.parent))
def _init_memory():
from alfred.infrastructure.persistence import init_memory
from alfred.settings import settings
init_memory(settings.data_storage_dir)
def _resolve_via_tmdb(release_name: str) -> tuple[str, int] | None:
"""Parse le release name, interroge TMDB, retourne (tmdb_title, tmdb_year)."""
from alfred.application.movies import SearchMovieUseCase
from alfred.domain.release.services import parse_release
from alfred.infrastructure.api.tmdb import tmdb_client
parsed = parse_release(release_name)
raw_title = parsed.title.replace(".", " ").strip()
print(f" titre extrait : {raw_title}")
print(" interrogation TMDB...")
use_case = SearchMovieUseCase(tmdb_client)
result = use_case.execute(raw_title).to_dict()
if result.get("status") != "ok":
print(f" TMDB error: {result.get('message')}")
return None
title = result["title"]
release_date = result.get("release_date", "")
year = int(release_date[:4]) if release_date and len(release_date) >= 4 else None
if not year:
print(f" TMDB: pas d'année trouvée pour '{title}'")
return None
print(f" TMDB: {title} ({year})")
return title, year
def _extract_release_name(release_name: str) -> tuple[str, str]:
"""
Retourne (release_name, source_path).
Si c'est un path absolu existant → extrait le basename et utilise le path comme source.
Sinon → cherche le dossier dans workspace.download configuré en LTM.
"""
p = Path(release_name)
if p.is_absolute() and p.exists():
return p.name, str(p)
from alfred.infrastructure.persistence import get_memory
memory = get_memory()
download_root = memory.ltm.workspace.download
if download_root:
candidate = Path(download_root) / release_name
if candidate.exists():
return release_name, str(candidate)
return release_name, ""
def analyze(release_name: str, source_path: str | None = None) -> None:
from alfred.domain.release.services import parse_release
release_name, resolved_path = _extract_release_name(release_name)
if source_path is None and resolved_path:
source_path = resolved_path
print(f"\n=== PARSE: {release_name} ===")
r = parse_release(release_name)
for k, v in vars(r).items():
if v is not None and v != [] and v != "":
print(f" {k}: {v}")
if source_path:
path = Path(source_path)
print(f"\n=== PROBE: {path} ===")
if not path.exists():
print(" (chemin inexistant, probe skipped)")
else:
from alfred.infrastructure.filesystem.find_video import find_video_file
from alfred.infrastructure.probe import FfprobeMediaProber
video = find_video_file(path) if path.is_dir() else path
if video:
print(f" video file: {video.name}")
info = FfprobeMediaProber().probe(video)
if info:
print(f" codec: {info.video_codec}")
print(f" resolution: {info.resolution}")
print(
f" audio_tracks: {[(t.codec, t.language) for t in info.audio_tracks]}"
)
print(
f" subtitle_tracks: {[(t.codec, t.language) for t in info.subtitle_tracks]}"
)
else:
print(" probe failed (ffprobe dispo ?)")
else:
print(" aucun fichier vidéo trouvé")
def dry_run(release_name: str) -> None:
_init_memory()
release_name, _ = _extract_release_name(release_name)
print(f"\n=== DRY-RUN: {release_name} ===")
tmdb = _resolve_via_tmdb(release_name)
if not tmdb:
sys.exit(1)
tmdb_title, tmdb_year = tmdb
from alfred.application.filesystem.resolve_destination import (
resolve_season_destination,
)
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.probe import FfprobeMediaProber
result = resolve_season_destination(
release_name,
tmdb_title,
tmdb_year,
YamlReleaseKnowledge(),
FfprobeMediaProber(),
)
d = result.to_dict()
print()
print(json.dumps(d, indent=2, ensure_ascii=False))
if d["status"] == "ok":
print("\n=== MOVE PREVIEW ===")
print(" src : <source_folder>")
print(f" dst : {d['season_folder']}")
def _translate_path(path: str) -> str:
"""Translate a host-side path to the qBittorrent container path."""
from alfred.settings import settings
host_prefix = settings.qbittorrent_host_path
container_prefix = settings.qbittorrent_container_path
if host_prefix and container_prefix and path.startswith(host_prefix):
return container_prefix + path[len(host_prefix) :]
return path
def _qbittorrent_update(torrent_name: str, new_location: str | None) -> None:
"""
Find the torrent in qBittorrent by name, update its save_path, and force recheck.
Args:
torrent_name: Exact torrent name (release folder basename)
new_location: New save path on the host (parent of the torrent folder).
None if the torrent was sent to trash — skip location change.
"""
try:
from alfred.infrastructure.api.qbittorrent.client import QBittorrentClient
client = QBittorrentClient()
client.login()
torrent = client.find_by_name(torrent_name)
if torrent is None:
print(f" ⚠ qBittorrent: torrent '{torrent_name}' not found — skipping")
return
print(f" qBittorrent: found '{torrent.name}' (hash={torrent.hash[:8]}…)")
if new_location:
container_location = _translate_path(new_location)
client.set_location(torrent.hash, container_location)
print(f" ✓ qBittorrent: location → {container_location}")
client.recheck(torrent.hash)
print(" ✓ qBittorrent: recheck triggered")
except Exception as e:
# Non-fatal — the files are already in place
print(f" ⚠ qBittorrent update failed (non-fatal): {e}")
def do_move(release_name: str, source_folder: str | None = None) -> None:
_init_memory()
release_name, resolved_path = _extract_release_name(release_name)
if source_folder is None:
source_folder = resolved_path
if not source_folder:
print(
" Erreur: source introuvable. Configure workspace.download ou passe le path complet."
)
sys.exit(1)
print(f"\n=== MOVE: {release_name} ===")
tmdb = _resolve_via_tmdb(release_name)
if not tmdb:
sys.exit(1)
tmdb_title, tmdb_year = tmdb
from alfred.application.filesystem.resolve_destination import (
resolve_season_destination,
)
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.probe import FfprobeMediaProber
result = resolve_season_destination(
release_name,
tmdb_title,
tmdb_year,
YamlReleaseKnowledge(),
FfprobeMediaProber(),
)
d = result.to_dict()
if d["status"] == "needs_clarification":
print(f"\n {d['question']}")
for i, opt in enumerate(d["options"]):
print(f" {i + 1}. {opt}")
choice = input(" Choix (numéro) : ").strip()
try:
chosen = d["options"][int(choice) - 1]
except (ValueError, IndexError):
print(" Choix invalide.")
sys.exit(1)
result = resolve_season_destination(
release_name, tmdb_title, tmdb_year, confirmed_folder=chosen
)
d = result.to_dict()
if d["status"] != "ok":
print(json.dumps(d, indent=2, ensure_ascii=False))
sys.exit(1)
src_path = Path(source_folder)
season_folder = d["season_folder"]
mkv_files = sorted(src_path.glob("*.mkv")) or sorted(src_path.glob("*.mp4"))
from alfred.infrastructure.persistence import get_memory
memory = get_memory()
torrent_root = memory.ltm.workspace.torrent
trash_root = memory.ltm.workspace.trash
torrent_dst = str(Path(torrent_root) / src_path.name) if torrent_root else None
trash_dst = str(Path(trash_root) / src_path.name) if trash_root else None
rebuild = input(" Recréer le torrent ? [y/N] : ").strip().lower() == "y"
# --- PHASE 1: PLAN ---
print("\n=== PLAN ===")
print(f" destination : {season_folder}")
from alfred.application.filesystem.manage_subtitles import ManageSubtitlesUseCase
from alfred.domain.release.services import parse_release
parsed = parse_release(release_name)
# Dict: video_path → sub_result (pre-scanned, files not yet moved)
plan: list[tuple[Path, str, object]] = [] # (src_file, dst_path, sub_result)
has_errors = False
for f in mkv_files:
dst = str(Path(season_folder) / f.name)
ghost_src = str(src_path / f.name)
sub_result = ManageSubtitlesUseCase().execute(
source_video=ghost_src,
destination_video=dst,
media_type="tv_show",
release_group=parsed.group,
season=parsed.season,
dry_run=True,
)
print(f"\n {f.name}")
print(f"{dst}")
if sub_result.status == "ok":
if sub_result.placed:
for p in sub_result.placed:
print(f" sub: {p.filename}")
elif sub_result.available:
for a in sub_result.available:
print(f" sub (embedded): {a.language} {a.subtitle_type}")
else:
print(" subs: aucun")
elif sub_result.status == "needs_clarification":
print(" ✗ subs non résolus:")
for u in sub_result.unresolved:
print(f" {u.raw_tokens} ({u.reason})")
has_errors = True
elif sub_result.status == "error":
print(f" ✗ erreur subs: {sub_result.message}")
has_errors = True
plan.append((f, dst, sub_result))
if rebuild and torrent_dst:
print(f"\n source → torrents : {torrent_dst}")
print(f" hard-links : {len(mkv_files)} fichier(s)")
elif trash_dst:
print(f"\n source → trash : {trash_dst}")
else:
print("\n source : laissée en place")
if has_errors:
print("\n ✗ Plan invalide — subs non résolus, abandon.")
sys.exit(1)
# --- CONFIRMATION ---
print()
confirm = input(" Confirmer ? [y/N] : ").strip().lower()
if confirm != "y":
print(" Annulé.")
sys.exit(0)
# --- PHASE 2: EXECUTE ---
import os
from alfred.infrastructure.filesystem.filesystem_operations import (
create_folder,
move,
)
print("\n=== EXECUTE ===")
# 1. Créer le season_folder
r = create_folder(season_folder)
if r["status"] != "ok":
print(f" ✗ create_folder: {r}")
sys.exit(1)
print(f" ✓ dossier : {season_folder}")
# 2. Déplacer chaque fichier vidéo + placer les subs (re-run après move)
for f, dst, _pre_scan in plan:
r = move(str(f), dst)
if r["status"] != "ok":
print(f"{f.name}: {r['message']}")
sys.exit(1)
print(f"{f.name}")
# Re-run manage_subtitles maintenant que dst existe (pour le hard-link)
ghost_src = str(src_path / f.name)
sub_result = ManageSubtitlesUseCase().execute(
source_video=ghost_src,
destination_video=dst,
media_type="tv_show",
release_group=parsed.group,
season=parsed.season,
)
if sub_result.status == "ok" and sub_result.placed:
for p in sub_result.placed:
print(f" ✓ sub: {p.filename}")
# 3. Dossier source → torrents ou trash
if rebuild and torrent_dst:
r = move(source_folder, torrent_dst)
if r["status"] != "ok":
print(f" ✗ source → torrents: {r['message']}")
sys.exit(1)
print(" ✓ source → torrents")
# 4. Hard-link depuis season_folder → torrent_dst
torrent_dst_path = Path(torrent_dst)
for f, dst, _ in plan:
lib_file = Path(season_folder) / f.name
link_dst = torrent_dst_path / f.name
try:
os.link(lib_file, link_dst)
print(f" ✓ hard-link: {f.name}")
except OSError as e:
print(f" ✗ hard-link {f.name}: {e}")
sys.exit(1)
elif trash_dst:
r = move(source_folder, trash_dst)
if r["status"] != "ok":
print(f" ✗ source → trash: {r['message']}")
sys.exit(1)
print(" ✓ source → trash")
# 5. qBittorrent: update location + recheck
qb_location = torrent_root if (rebuild and torrent_dst) else None
_qbittorrent_update(src_path.name, qb_location)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Debug release parsing + dry-run/move")
sub = parser.add_subparsers(dest="cmd")
p_analyze = sub.add_parser(
"analyze", help="Parser une release (+ probe si path fourni)"
)
p_analyze.add_argument("release_name")
p_analyze.add_argument("--path", help="Chemin vers le dossier/fichier source")
p_dry = sub.add_parser(
"dryrun", help="Résout via TMDB et affiche les chemins sans rien bouger"
)
p_dry.add_argument("release_name")
p_move = sub.add_parser(
"move", help="Résout via TMDB et déplace le dossier (confirmation requise)"
)
p_move.add_argument("release_name")
p_move.add_argument(
"source_folder",
nargs="?",
default=None,
help="Chemin absolu du dossier source (optionnel si workspace.download est configuré)",
)
args = parser.parse_args()
if args.cmd == "analyze":
analyze(args.release_name, args.path)
elif args.cmd == "dryrun":
dry_run(args.release_name)
elif args.cmd == "move":
do_move(args.release_name, args.source_folder)
else:
parser.print_help()
sys.exit(1)