Files
alfred/testing/subtitles/scan_subtitles.py
T
francwa e45465d52d feat: split resolve_destination, persona-driven prompts, qBittorrent relocation
Destination resolution
- Replace the single ResolveDestinationUseCase with four dedicated
  functions, one per release type:
    resolve_season_destination    (pack season, folder move)
    resolve_episode_destination   (single episode, file move)
    resolve_movie_destination     (movie, file move)
    resolve_series_destination    (multi-season pack, folder move)
- Each returns a dedicated DTO carrying only the fields relevant to
  that release type — no more polymorphic ResolvedDestination with
  half the fields unused depending on the case.
- Looser series folder matching: exact computed-name match is reused
  silently; any deviation (different group, multiple candidates) now
  prompts the user with all options including the computed name.

Agent tools
- Four new tools wrapping the use cases above; old resolve_destination
  removed from the registry.
- New move_to_destination tool: create_folder + move, chained — used
  after a resolve_* call to perform the actual relocation.
- Low-level filesystem_operations module (create_folder, move via mv)
  for instant same-FS renames (ZFS).

Prompt & persona
- New PromptBuilder (alfred/agent/prompt.py) replacing prompts.py:
  identity + personality block, situational expressions, memory
  schema, episodic/STM/config context, tool catalogue.
- Per-user expression system: knowledge/users/common.yaml +
  {username}.yaml are merged at runtime; one phrase per situation
  (greeting/success/error/...) is sampled into the system prompt.

qBittorrent integration
- Credentials now come from settings (qbittorrent_url/username/password)
  instead of hardcoded defaults.
- New client methods: find_by_name, set_location, recheck — the trio
  needed to update a torrent's save path and re-verify after a move.
- Host→container path translation settings (qbittorrent_host_path /
  qbittorrent_container_path) for docker-mounted setups.

Subtitles
- Identifier: strip parenthesized qualifiers (simplified, brazil…) at
  tokenization; new _tokenize_suffix used for the episode_subfolder
  pattern so episode-stem tokens no longer pollute language detection.
- Placer: extract _build_dest_name so it can be reused by the new
  dry_run path in ManageSubtitlesUseCase.
- Knowledge: add yue, ell, ind, msa, rus, vie, heb, tam, tel, tha,
  hin, ukr; add 'fre' to fra; add 'simplified'/'traditional' to zho.

Misc
- LTM workspace: add 'trash' folder slot.
- Default LLM provider switched to deepseek.
- testing/debug_release.py: CLI to parse a release, hit TMDB, and
  dry-run the destination resolution end-to-end.
2026-05-14 05:01:59 +02:00

576 lines
18 KiB
Python

#!/usr/bin/env python3
"""
scan_subtitles.py — CLI pour tester le pipeline de scan de sous-titres Alfred.
Usage:
uv run testing/subtitles/scan_subtitles.py <season_folder> [options]
Options:
--release-group RARBG Groupe de release (optionnel — active les known patterns)
--pattern adjacent Forcer un pattern (adjacent|flat|episode_subfolder|embedded)
--video FILE Fichier vidéo de référence (défaut: premier .mkv/.mp4 trouvé)
--verbose Détails sur chaque token analysé
--no-color Désactive la colorisation
Exemples:
uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01"
uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" --release-group RARBG
uv run scripts/scan_subtitles.py "/media/tv/The X-Files/Season 01" --pattern episode_subfolder --verbose
"""
import argparse
import sys
import textwrap
from pathlib import Path
# Ajoute la racine du projet au path (testing/subtitles/ → ../../)
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colorisation simple (pas de dépendance externe)
# ---------------------------------------------------------------------------
USE_COLOR = True
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return text
return "".join(codes) + text + RESET
def section(title: str) -> None:
width = 70
print()
print(c("" * width, DIM))
print(c(f" {title}", BOLD, CYAN))
print(c("" * width, DIM))
def ok(msg: str) -> None:
print(c("", GREEN, BOLD) + msg)
def warn(msg: str) -> None:
print(c("", YELLOW, BOLD) + msg)
def err(msg: str) -> None:
print(c("", RED, BOLD) + msg)
def info(msg: str, indent: int = 2) -> None:
print(" " * indent + msg)
def kv(key: str, value: str, indent: int = 4) -> None:
print(" " * indent + c(f"{key}: ", BOLD) + value)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".mov", ".ts", ".m2ts"}
def find_videos(folder: Path) -> list[Path]:
return sorted(
p for p in folder.iterdir() if p.is_file() and p.suffix.lower() in VIDEO_EXTS
)
def confidence_bar(conf: float, width: int = 20) -> str:
filled = int(conf * width)
bar = "" * filled + "" * (width - filled)
if conf >= 0.8:
color = GREEN
elif conf >= 0.5:
color = YELLOW
else:
color = RED
return c(bar, color) + c(f" {conf:.0%}", BOLD)
def track_summary(track, verbose: bool = False) -> None:
lang = track.language.code if track.language else c("?", RED)
fmt = track.format.id if track.format else c("?", RED)
typ = track.subtitle_type.value
src = (
"embedded"
if track.is_embedded
else (track.file_path.name if track.file_path else "?")
)
# Couleur du type
type_colors = {
"standard": GREEN,
"sdh": YELLOW,
"forced": BLUE,
"unknown": RED,
}
typ_str = c(typ, type_colors.get(typ, RESET))
unresolved = not track.is_embedded and track.language is None
clarif = c(" [langue inconnue]", RED, BOLD) if unresolved else ""
print(f" {c(src, BOLD)}")
print(f" lang={c(lang, CYAN)} type={typ_str} format={fmt}")
conf_str = (
c("n/a (embedded)", DIM)
if track.is_embedded
else confidence_bar(track.confidence)
)
print(f" confidence={conf_str}{clarif}")
if track.entry_count is not None:
print(
f" entries={track.entry_count} size={track.file_size_kb:.1f} KB"
if track.file_size_kb
else f" entries={track.entry_count}"
)
if verbose and track.raw_tokens:
print(f" tokens={track.raw_tokens}")
if track.is_resolved() and track.language and track.format:
try:
dest = track.destination_name
print(f"{c(dest, GREEN, BOLD)}")
except ValueError:
pass
# ---------------------------------------------------------------------------
# Étapes du pipeline
# ---------------------------------------------------------------------------
def step_load_kb() -> SubtitleKnowledgeBase:
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader
section("ÉTAPE 1 — Chargement de la base de connaissances")
kb = SubtitleKnowledgeBase(KnowledgeLoader())
fmts = kb.formats()
langs = kb.languages()
patterns = kb.patterns()
ok(f"{len(fmts)} format(s) connu(s): {', '.join(fmts.keys())}")
ok(f"{len(langs)} langue(s) connue(s): {', '.join(langs.keys())}")
ok(f"{len(patterns)} pattern(s) connu(s): {', '.join(patterns.keys())}")
total_tokens = sum(len(l.tokens) for l in langs.values())
info(c(f"{total_tokens} tokens de langue au total", DIM), indent=4)
return kb
def step_detect_pattern(
kb: SubtitleKnowledgeBase,
season_folder: Path,
sample_video: Path,
release_group: str | None,
forced_pattern: str | None,
) -> SubtitlePattern:
from alfred.domain.subtitles.services.pattern_detector import PatternDetector
section("ÉTAPE 2 — Détection du pattern de release")
# Priorité: forced > known patterns from release_group > auto-detect
if forced_pattern:
pattern = kb.pattern(forced_pattern)
if not pattern:
err(f"Pattern inconnu: '{forced_pattern}'")
print(f" Patterns disponibles: {', '.join(kb.patterns().keys())}")
sys.exit(1)
ok(f"Pattern forcé: {c(forced_pattern, CYAN, BOLD)}")
return pattern
if release_group:
known = kb.patterns_for_group(release_group)
if known:
kv("Release group", release_group)
ok(
f"Pattern(s) connu(s) pour {release_group}: {', '.join(p.id for p in known)}"
)
pattern = known[0]
kv("Pattern sélectionné", c(pattern.id, CYAN, BOLD))
return pattern
else:
warn(f"Groupe '{release_group}' inconnu — lancement de la détection auto")
# Auto-detect
kv("Dossier analysé", str(season_folder))
kv("Vidéo de référence", sample_video.name)
detector = PatternDetector(kb)
result = detector.detect(season_folder, sample_video)
findings = result.get("raw_findings", {})
info(c("Observations:", BOLD), indent=4)
for key, val in findings.items():
if val not in (False, None, 0):
info(f" {key}: {c(str(val), CYAN)}", indent=4)
detected = result.get("detected")
confidence = result.get("confidence", 0.0)
description = result.get("description", "")
print()
info(c(f'Description: "{description}"', DIM), indent=4)
print(f" Confiance: {confidence_bar(confidence)}")
if detected:
ok(f"Pattern détecté: {c(detected.id, CYAN, BOLD)}")
kv("Stratégie de scan", detected.scan_strategy.value)
kv("Détection de type", detected.type_detection.value)
if detected.root_folder:
kv("Dossier racine", detected.root_folder)
return detected
else:
warn("Aucun pattern détecté avec confiance suffisante — fallback: adjacent")
fallback = kb.pattern("adjacent")
if not fallback:
err("Pattern 'adjacent' introuvable dans la KB !")
sys.exit(1)
return fallback
def step_identify_tracks(
kb: SubtitleKnowledgeBase,
sample_video: Path,
pattern: SubtitlePattern,
release_group: str | None,
verbose: bool,
) -> MediaSubtitleMetadata:
from alfred.domain.subtitles.services.identifier import SubtitleIdentifier
section("ÉTAPE 3 — Identification des pistes")
kv("Vidéo", sample_video.name)
kv("Pattern", pattern.id)
identifier = SubtitleIdentifier(kb)
metadata = identifier.identify(
video_path=sample_video,
pattern=pattern,
media_id=None,
media_type="tv_show",
release_group=release_group,
)
n_emb = len(metadata.embedded_tracks)
n_ext = len(metadata.external_tracks)
n_unresolved = len(metadata.unresolved_tracks)
print()
ok(f"{n_ext} piste(s) externe(s) trouvée(s)")
if n_emb:
ok(f"{n_emb} piste(s) embarquée(s) (ffprobe)")
if n_unresolved:
warn(f"{n_unresolved} piste(s) externe(s) sans langue reconnue")
if metadata.external_tracks:
print()
info(c("Pistes externes:", BOLD))
for track in metadata.external_tracks:
track_summary(track, verbose)
if metadata.embedded_tracks:
print()
info(c("Pistes embarquées:", BOLD))
for track in metadata.embedded_tracks:
track_summary(track, verbose)
return metadata
def step_apply_rules(
metadata: MediaSubtitleMetadata,
release_group: str | None,
) -> tuple[SubtitleMatchingRules | None, list, list]:
from alfred.domain.subtitles.aggregates import DEFAULT_RULES
from alfred.domain.subtitles.services.matcher import SubtitleMatcher
from alfred.domain.subtitles.services.utils import available_subtitles
from alfred.domain.subtitles.value_objects import ScanStrategy
section("ÉTAPE 4 — Application des règles")
# Cas embedded : pas de matcher, on liste directement les pistes disponibles
if metadata.detected_pattern_id == ScanStrategy.EMBEDDED.value:
info(c("Pattern embedded — le matcher est court-circuité", DIM), indent=4)
tracks = available_subtitles(metadata.embedded_tracks)
ok(f"{len(tracks)} piste(s) disponible(s)")
return None, tracks, []
rules = DEFAULT_RULES()
kv("Langues préférées", str(rules.preferred_languages))
kv("Formats préférés", str(rules.preferred_formats))
kv("Types autorisés", str(rules.allowed_types))
kv("Confiance min", str(rules.min_confidence))
info(
c("(règles globales par défaut — pas de .alfred/ en mode scan)", DIM), indent=4
)
matcher = SubtitleMatcher()
matched, unresolved = matcher.match(metadata.external_tracks, rules)
print()
ok(f"{len(matched)} piste(s) retenue(s)")
if unresolved:
warn(f"{len(unresolved)} piste(s) écartée(s) ou non résolue(s)")
return rules, matched, unresolved
def step_show_results(
matched: list,
unresolved: list,
is_embedded: bool,
verbose: bool,
) -> None:
section("RÉSULTAT FINAL")
if matched:
label = (
"piste(s) disponible(s)" if is_embedded else "piste(s) qui seraient placées"
)
ok(f"{len(matched)} {label}:")
for track in matched:
lang = track.language.code if track.language else "?"
typ = track.subtitle_type.value
if is_embedded:
print(f" {c(lang, CYAN)} {c(typ, GREEN)}")
else:
try:
dest = track.destination_name
src = track.file_path.name if track.file_path else "?"
print(f" {c(src, DIM)}{c(dest, GREEN, BOLD)}")
except ValueError:
warn(f" Piste incomplète (lang ou format manquant): {track}")
else:
warn("Aucune piste retenue.")
if unresolved:
print()
warn(f"{len(unresolved)} piste(s) écartées ou à clarifier:")
for track in unresolved:
src = track.file_path.name if track.file_path else "?"
reason = (
"langue inconnue"
if track.language is None
else "confiance insuffisante"
)
line = f" {c(src, DIM)} ({reason})"
if verbose and track.raw_tokens:
line += c(f" tokens: {track.raw_tokens}", YELLOW)
print(line)
print()
# ---------------------------------------------------------------------------
# Scan multi-épisodes (résumé)
# ---------------------------------------------------------------------------
def scan_season(
kb: SubtitleKnowledgeBase,
pattern: SubtitlePattern,
season_folder: Path,
release_group: str | None,
verbose: bool,
) -> None:
from alfred.domain.subtitles.aggregates import DEFAULT_RULES
from alfred.domain.subtitles.services.identifier import SubtitleIdentifier
from alfred.domain.subtitles.services.matcher import SubtitleMatcher
videos = find_videos(season_folder)
section(f"SCAN COMPLET DE LA SAISON ({len(videos)} épisode(s))")
if not videos:
warn("Aucun fichier vidéo trouvé dans ce dossier.")
return
identifier = SubtitleIdentifier(kb)
matcher = SubtitleMatcher()
rules = DEFAULT_RULES()
col_w = max(len(v.name) for v in videos) + 2
for video in videos:
metadata = identifier.identify(
video_path=video,
pattern=pattern,
media_id=None,
media_type="tv_show",
release_group=release_group,
)
matched, unresolved = matcher.match(metadata.external_tracks, rules)
placed_names = []
for t in matched:
try:
placed_names.append(t.destination_name)
except ValueError:
pass
status_icon = c("", GREEN, BOLD) if placed_names else c("", RED, BOLD)
warn_icon = (
c(f" [{len(unresolved)} non-résolue(s)]", YELLOW) if unresolved else ""
)
print(
f" {status_icon} {video.name:{col_w}} {c(', '.join(placed_names) or '', GREEN if placed_names else DIM)}{warn_icon}"
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Scanner de sous-titres Alfred — pipeline de diagnostic",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(__doc__ or ""),
)
parser.add_argument("season_folder", help="Dossier de la saison (ou du film)")
parser.add_argument(
"--release-group",
"-g",
metavar="GROUP",
help="Groupe de release (ex: RARBG, KONSTRAST)",
)
parser.add_argument(
"--pattern",
"-p",
metavar="PATTERN",
help="Forcer un pattern (adjacent|flat|episode_subfolder|embedded)",
)
parser.add_argument(
"--video",
"-v",
metavar="FILE",
help="Fichier vidéo de référence (défaut: premier trouvé)",
)
parser.add_argument(
"--verbose", action="store_true", help="Affiche les tokens bruts par piste"
)
parser.add_argument(
"--no-color", action="store_true", help="Désactive la colorisation ANSI"
)
parser.add_argument(
"--season-scan",
action="store_true",
help="Après le diagnostic, scanner tous les épisodes de la saison",
)
return parser.parse_args()
def main() -> None:
global USE_COLOR
args = parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
season_folder = Path(args.season_folder).expanduser().resolve()
if not season_folder.is_dir():
print(f"Erreur: '{season_folder}' n'est pas un dossier.", file=sys.stderr)
sys.exit(1)
print()
print(c("" * 70, BOLD))
print(c(" Alfred — Subtitle Scanner", BOLD, MAGENTA))
print(c("" * 70, BOLD))
kv("Dossier", str(season_folder), indent=2)
# Trouver la vidéo de référence
if args.video:
sample_video = Path(args.video).expanduser().resolve()
if not sample_video.exists():
print(f"Erreur: '{sample_video}' introuvable.", file=sys.stderr)
sys.exit(1)
else:
videos = find_videos(season_folder)
if not videos:
# Chercher un niveau plus bas (structure release root)
for sub in season_folder.iterdir():
if sub.is_dir():
videos = find_videos(sub)
if videos:
break
if not videos:
print(
"Erreur: aucun fichier vidéo trouvé dans ce dossier.", file=sys.stderr
)
sys.exit(1)
sample_video = videos[0]
kv("Vidéo de référence", sample_video.name, indent=2)
# ---- Pipeline ----
kb = step_load_kb()
pattern = step_detect_pattern(
kb=kb,
season_folder=season_folder,
sample_video=sample_video,
release_group=args.release_group,
forced_pattern=args.pattern,
)
metadata = step_identify_tracks(
kb=kb,
sample_video=sample_video,
pattern=pattern,
release_group=args.release_group,
verbose=args.verbose,
)
rules, matched, unresolved = step_apply_rules(
metadata=metadata,
release_group=args.release_group,
)
step_show_results(
matched=matched,
unresolved=unresolved,
is_embedded=rules is None,
verbose=args.verbose,
)
if args.season_scan:
scan_season(
kb=kb,
pattern=pattern,
season_folder=season_folder,
release_group=args.release_group,
verbose=args.verbose,
)
print(c("" * 70, BOLD))
print()
if __name__ == "__main__":
main()