#!/usr/bin/env python3 """ scan_subtitles.py — CLI pour tester le pipeline de scan de sous-titres Alfred. Usage: uv run testing/subtitles/scan_subtitles.py [options] Options: --release-group RARBG Groupe de release (optionnel — active les known patterns) --pattern adjacent Forcer un pattern (adjacent|flat|episode_subfolder|embedded) --video FILE Fichier vidéo de référence (défaut: premier .mkv/.mp4 trouvé) --verbose Détails sur chaque token analysé --no-color Désactive la colorisation Exemples: uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" --release-group RARBG uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" --pattern episode_subfolder --verbose """ import argparse import sys import textwrap from pathlib import Path # Ajoute la racine du projet au path (testing/subtitles/ → ../../) _PROJECT_ROOT = Path(__file__).resolve().parents[2] if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) # --------------------------------------------------------------------------- # Colorisation simple (pas de dépendance externe) # --------------------------------------------------------------------------- USE_COLOR = True RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" GREEN = "\033[32m" YELLOW = "\033[33m" RED = "\033[31m" CYAN = "\033[36m" BLUE = "\033[34m" MAGENTA = "\033[35m" def c(text: str, *codes: str) -> str: if not USE_COLOR: return text return "".join(codes) + text + RESET def section(title: str) -> None: width = 70 print() print(c("─" * width, DIM)) print(c(f" {title}", BOLD, CYAN)) print(c("─" * width, DIM)) def ok(msg: str) -> None: print(c(" ✓ ", GREEN, BOLD) + msg) def warn(msg: str) -> None: print(c(" ⚠ ", YELLOW, BOLD) + msg) def err(msg: str) -> None: print(c(" ✗ ", RED, BOLD) + msg) def info(msg: str, indent: int = 2) -> None: print(" " * indent + msg) def kv(key: str, value: str, indent: int = 4) -> None: print(" " * indent + c(f"{key}: ", BOLD) + value) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".mov", ".ts", ".m2ts"} def find_videos(folder: Path) -> list[Path]: return sorted( p for p in folder.iterdir() if p.is_file() and p.suffix.lower() in VIDEO_EXTS ) def confidence_bar(conf: float, width: int = 20) -> str: filled = int(conf * width) bar = "█" * filled + "░" * (width - filled) if conf >= 0.8: color = GREEN elif conf >= 0.5: color = YELLOW else: color = RED return c(bar, color) + c(f" {conf:.0%}", BOLD) def track_summary(track, verbose: bool = False) -> None: lang = track.language.code if track.language else c("?", RED) fmt = track.format.id if track.format else c("?", RED) typ = track.subtitle_type.value src = ( "embedded" if track.is_embedded else (track.file_path.name if track.file_path else "?") ) # Couleur du type type_colors = { "standard": GREEN, "sdh": YELLOW, "forced": BLUE, "unknown": RED, } typ_str = c(typ, type_colors.get(typ, RESET)) unresolved = not track.is_embedded and track.language is None clarif = c(" [langue inconnue]", RED, BOLD) if unresolved else "" print(f" {c(src, BOLD)}") print(f" lang={c(lang, CYAN)} type={typ_str} format={fmt}") conf_str = ( c("n/a (embedded)", DIM) if track.is_embedded else confidence_bar(track.confidence) ) print(f" confidence={conf_str}{clarif}") if track.entry_count is not None: print( f" entries={track.entry_count} size={track.file_size_kb:.1f} KB" if track.file_size_kb else f" entries={track.entry_count}" ) if verbose and track.raw_tokens: print(f" tokens={track.raw_tokens}") if track.is_resolved() and track.language and track.format: try: dest = track.destination_name print(f" → {c(dest, GREEN, BOLD)}") except ValueError: pass # --------------------------------------------------------------------------- # Étapes du pipeline # --------------------------------------------------------------------------- def step_load_kb() -> SubtitleKnowledgeBase: from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader section("ÉTAPE 1 — Chargement de la base de connaissances") kb = SubtitleKnowledgeBase(KnowledgeLoader()) fmts = kb.formats() langs = kb.languages() patterns = kb.patterns() ok(f"{len(fmts)} format(s) connu(s): {', '.join(fmts.keys())}") ok(f"{len(langs)} langue(s) connue(s): {', '.join(langs.keys())}") ok(f"{len(patterns)} pattern(s) connu(s): {', '.join(patterns.keys())}") total_tokens = sum(len(l.tokens) for l in langs.values()) info(c(f"→ {total_tokens} tokens de langue au total", DIM), indent=4) return kb def step_detect_pattern( kb: SubtitleKnowledgeBase, season_folder: Path, sample_video: Path, release_group: str | None, forced_pattern: str | None, ) -> SubtitlePattern: from alfred.domain.subtitles.services.pattern_detector import PatternDetector section("ÉTAPE 2 — Détection du pattern de release") # Priorité: forced > known patterns from release_group > auto-detect if forced_pattern: pattern = kb.pattern(forced_pattern) if not pattern: err(f"Pattern inconnu: '{forced_pattern}'") print(f" Patterns disponibles: {', '.join(kb.patterns().keys())}") sys.exit(1) ok(f"Pattern forcé: {c(forced_pattern, CYAN, BOLD)}") return pattern if release_group: known = kb.patterns_for_group(release_group) if known: kv("Release group", release_group) ok( f"Pattern(s) connu(s) pour {release_group}: {', '.join(p.id for p in known)}" ) pattern = known[0] kv("Pattern sélectionné", c(pattern.id, CYAN, BOLD)) return pattern else: warn(f"Groupe '{release_group}' inconnu — lancement de la détection auto") # Auto-detect kv("Dossier analysé", str(season_folder)) kv("Vidéo de référence", sample_video.name) detector = PatternDetector(kb) result = detector.detect(season_folder, sample_video) findings = result.get("raw_findings", {}) info(c("Observations:", BOLD), indent=4) for key, val in findings.items(): if val not in (False, None, 0): info(f" {key}: {c(str(val), CYAN)}", indent=4) detected = result.get("detected") confidence = result.get("confidence", 0.0) description = result.get("description", "") print() info(c(f'Description: "{description}"', DIM), indent=4) print(f" Confiance: {confidence_bar(confidence)}") if detected: ok(f"Pattern détecté: {c(detected.id, CYAN, BOLD)}") kv("Stratégie de scan", detected.scan_strategy.value) kv("Détection de type", detected.type_detection.value) if detected.root_folder: kv("Dossier racine", detected.root_folder) return detected else: warn("Aucun pattern détecté avec confiance suffisante — fallback: adjacent") fallback = kb.pattern("adjacent") if not fallback: err("Pattern 'adjacent' introuvable dans la KB !") sys.exit(1) return fallback def step_identify_tracks( kb: SubtitleKnowledgeBase, sample_video: Path, pattern: SubtitlePattern, release_group: str | None, verbose: bool, ) -> MediaSubtitleMetadata: from alfred.domain.subtitles.services.identifier import SubtitleIdentifier section("ÉTAPE 3 — Identification des pistes") kv("Vidéo", sample_video.name) kv("Pattern", pattern.id) identifier = SubtitleIdentifier(kb) metadata = identifier.identify( video_path=sample_video, pattern=pattern, media_id=None, media_type="tv_show", release_group=release_group, ) n_emb = len(metadata.embedded_tracks) n_ext = len(metadata.external_tracks) n_unresolved = len(metadata.unresolved_tracks) print() ok(f"{n_ext} piste(s) externe(s) trouvée(s)") if n_emb: ok(f"{n_emb} piste(s) embarquée(s) (ffprobe)") if n_unresolved: warn(f"{n_unresolved} piste(s) externe(s) sans langue reconnue") if metadata.external_tracks: print() info(c("Pistes externes:", BOLD)) for track in metadata.external_tracks: track_summary(track, verbose) if metadata.embedded_tracks: print() info(c("Pistes embarquées:", BOLD)) for track in metadata.embedded_tracks: track_summary(track, verbose) return metadata def step_apply_rules( metadata: MediaSubtitleMetadata, release_group: str | None, ) -> tuple[SubtitleMatchingRules | None, list, list]: from alfred.domain.subtitles.aggregates import DEFAULT_RULES from alfred.domain.subtitles.services.matcher import SubtitleMatcher from alfred.domain.subtitles.services.utils import available_subtitles from alfred.domain.subtitles.value_objects import ScanStrategy section("ÉTAPE 4 — Application des règles") # Cas embedded : pas de matcher, on liste directement les pistes disponibles if metadata.detected_pattern_id == ScanStrategy.EMBEDDED.value: info(c("Pattern embedded — le matcher est court-circuité", DIM), indent=4) tracks = available_subtitles(metadata.embedded_tracks) ok(f"{len(tracks)} piste(s) disponible(s)") return None, tracks, [] rules = DEFAULT_RULES() kv("Langues préférées", str(rules.preferred_languages)) kv("Formats préférés", str(rules.preferred_formats)) kv("Types autorisés", str(rules.allowed_types)) kv("Confiance min", str(rules.min_confidence)) info( c("(règles globales par défaut — pas de .alfred/ en mode scan)", DIM), indent=4 ) matcher = SubtitleMatcher() matched, unresolved = matcher.match(metadata.external_tracks, rules) print() ok(f"{len(matched)} piste(s) retenue(s)") if unresolved: warn(f"{len(unresolved)} piste(s) écartée(s) ou non résolue(s)") return rules, matched, unresolved def step_show_results( matched: list, unresolved: list, is_embedded: bool, verbose: bool, ) -> None: section("RÉSULTAT FINAL") if matched: label = ( "piste(s) disponible(s)" if is_embedded else "piste(s) qui seraient placées" ) ok(f"{len(matched)} {label}:") for track in matched: lang = track.language.code if track.language else "?" typ = track.subtitle_type.value if is_embedded: print(f" {c(lang, CYAN)} {c(typ, GREEN)}") else: try: dest = track.destination_name src = track.file_path.name if track.file_path else "?" print(f" {c(src, DIM)} → {c(dest, GREEN, BOLD)}") except ValueError: warn(f" Piste incomplète (lang ou format manquant): {track}") else: warn("Aucune piste retenue.") if unresolved: print() warn(f"{len(unresolved)} piste(s) écartées ou à clarifier:") for track in unresolved: src = track.file_path.name if track.file_path else "?" reason = ( "langue inconnue" if track.language is None else "confiance insuffisante" ) line = f" {c(src, DIM)} ({reason})" if verbose and track.raw_tokens: line += c(f" tokens: {track.raw_tokens}", YELLOW) print(line) print() # --------------------------------------------------------------------------- # Scan multi-épisodes (résumé) # --------------------------------------------------------------------------- def scan_season( kb: SubtitleKnowledgeBase, pattern: SubtitlePattern, season_folder: Path, release_group: str | None, verbose: bool, ) -> None: from alfred.domain.subtitles.aggregates import DEFAULT_RULES from alfred.domain.subtitles.services.identifier import SubtitleIdentifier from alfred.domain.subtitles.services.matcher import SubtitleMatcher videos = find_videos(season_folder) section(f"SCAN COMPLET DE LA SAISON ({len(videos)} épisode(s))") if not videos: warn("Aucun fichier vidéo trouvé dans ce dossier.") return identifier = SubtitleIdentifier(kb) matcher = SubtitleMatcher() rules = DEFAULT_RULES() col_w = max(len(v.name) for v in videos) + 2 for video in videos: metadata = identifier.identify( video_path=video, pattern=pattern, media_id=None, media_type="tv_show", release_group=release_group, ) matched, unresolved = matcher.match(metadata.external_tracks, rules) placed_names = [] for t in matched: try: placed_names.append(t.destination_name) except ValueError: pass status_icon = c("✓", GREEN, BOLD) if placed_names else c("✗", RED, BOLD) warn_icon = ( c(f" [{len(unresolved)} non-résolue(s)]", YELLOW) if unresolved else "" ) print( f" {status_icon} {video.name:{col_w}} {c(', '.join(placed_names) or '—', GREEN if placed_names else DIM)}{warn_icon}" ) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Scanner de sous-titres Alfred — pipeline de diagnostic", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent(__doc__ or ""), ) parser.add_argument("season_folder", help="Dossier de la saison (ou du film)") parser.add_argument( "--release-group", "-g", metavar="GROUP", help="Groupe de release (ex: RARBG, KONSTRAST)", ) parser.add_argument( "--pattern", "-p", metavar="PATTERN", help="Forcer un pattern (adjacent|flat|episode_subfolder|embedded)", ) parser.add_argument( "--video", "-v", metavar="FILE", help="Fichier vidéo de référence (défaut: premier trouvé)", ) parser.add_argument( "--verbose", action="store_true", help="Affiche les tokens bruts par piste" ) parser.add_argument( "--no-color", action="store_true", help="Désactive la colorisation ANSI" ) parser.add_argument( "--season-scan", action="store_true", help="Après le diagnostic, scanner tous les épisodes de la saison", ) return parser.parse_args() def main() -> None: global USE_COLOR args = parse_args() if args.no_color or not sys.stdout.isatty(): USE_COLOR = False season_folder = Path(args.season_folder).expanduser().resolve() if not season_folder.is_dir(): print(f"Erreur: '{season_folder}' n'est pas un dossier.", file=sys.stderr) sys.exit(1) print() print(c("━" * 70, BOLD)) print(c(" Alfred — Subtitle Scanner", BOLD, MAGENTA)) print(c("━" * 70, BOLD)) kv("Dossier", str(season_folder), indent=2) # Trouver la vidéo de référence if args.video: sample_video = Path(args.video).expanduser().resolve() if not sample_video.exists(): print(f"Erreur: '{sample_video}' introuvable.", file=sys.stderr) sys.exit(1) else: videos = find_videos(season_folder) if not videos: # Chercher un niveau plus bas (structure release root) for sub in season_folder.iterdir(): if sub.is_dir(): videos = find_videos(sub) if videos: break if not videos: print( "Erreur: aucun fichier vidéo trouvé dans ce dossier.", file=sys.stderr ) sys.exit(1) sample_video = videos[0] kv("Vidéo de référence", sample_video.name, indent=2) # ---- Pipeline ---- kb = step_load_kb() pattern = step_detect_pattern( kb=kb, season_folder=season_folder, sample_video=sample_video, release_group=args.release_group, forced_pattern=args.pattern, ) metadata = step_identify_tracks( kb=kb, sample_video=sample_video, pattern=pattern, release_group=args.release_group, verbose=args.verbose, ) rules, matched, unresolved = step_apply_rules( metadata=metadata, release_group=args.release_group, ) step_show_results( matched=matched, unresolved=unresolved, is_embedded=rules is None, verbose=args.verbose, ) if args.season_scan: scan_season( kb=kb, pattern=pattern, season_folder=season_folder, release_group=args.release_group, verbose=args.verbose, ) print(c("━" * 70, BOLD)) print() if __name__ == "__main__": main()