Files
alfred/testing/recognize_folders_in_downloads.py
T
francwa c303efea48 refactor(probe): consolidate full probe() into MediaProber port
Add probe(video) -> MediaInfo | None to the MediaProber Protocol and
implement it on FfprobeMediaProber. The standalone
alfred/infrastructure/filesystem/ffprobe.py module is removed; all
callers (analyze_release / probe_media tools, testing scripts) now go
through the adapter.

Tests for the probe path moved to tests/infrastructure/test_ffprobe_prober.py
(patching subprocess.run at the adapter module level).

Unblocks the upcoming inspect_release orchestrator, which needs the
port — not a free function — to compose parse + main-video selection
+ probe in one shot.
2026-05-20 09:11:24 +02:00

216 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
recognize_folders_in_downloads.py — Parse every folder/file in the downloads directory.
Usage:
uv run testing/recognize_folders_in_downloads.py
uv run testing/recognize_folders_in_downloads.py --path /mnt/testipool/downloads
uv run testing/recognize_folders_in_downloads.py --failures-only
uv run testing/recognize_folders_in_downloads.py --successes-only
"""
import argparse
import sys
from pathlib import Path
_PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colours
# ---------------------------------------------------------------------------
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
USE_COLOR = True
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return str(text)
return "".join(codes) + str(text) + RESET
def kv(key: str, val: str, indent: int = 4, color: str = CYAN) -> None:
print(f"{' ' * indent}{c(key + ':', BOLD)} {c(val, color)}")
def hr() -> None:
print(c("" * 70, DIM))
# ---------------------------------------------------------------------------
# Parsing quality check
# ---------------------------------------------------------------------------
def _assess(p) -> list[str]:
"""Return a list of warning strings for fields that look wrong."""
if p.media_type in ("other", "unknown"):
return []
warnings = []
if p.group == "UNKNOWN":
warnings.append("group not found")
if not p.quality:
warnings.append("resolution not found")
if not p.codec:
warnings.append("codec not found")
if not p.title or p.title == p.normalised:
warnings.append("title extraction likely wrong")
return warnings
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
global USE_COLOR
parser = argparse.ArgumentParser(
description="Recognize release folders in downloads"
)
parser.add_argument(
"--path",
default="/mnt/testipool/downloads",
help="Downloads directory (default: /mnt/testipool/downloads)",
)
parser.add_argument(
"--failures-only", action="store_true", help="Show only entries with warnings"
)
parser.add_argument(
"--successes-only", action="store_true", help="Show only fully parsed entries"
)
parser.add_argument("--no-color", action="store_true")
args = parser.parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
downloads = Path(args.path)
if not downloads.exists():
print(c(f"Error: {downloads} does not exist", RED), file=sys.stderr)
sys.exit(1)
from alfred.application.filesystem.detect_media_type import detect_media_type
from alfred.application.filesystem.enrich_from_probe import enrich_from_probe
from alfred.domain.release.services import parse_release
from alfred.infrastructure.filesystem.find_video import find_video_file
from alfred.infrastructure.probe import FfprobeMediaProber
_prober = FfprobeMediaProber()
entries = sorted(downloads.iterdir(), key=lambda p: p.name.lower())
total = len(entries)
ok_count = 0
warn_count = 0
print()
print(c("" * 70, BOLD))
print(c(f" Downloads — {downloads}", BOLD, CYAN))
print(c(f" {total} entries", DIM))
print(c("" * 70, BOLD))
for entry in entries:
name = entry.name
try:
p = parse_release(name)
p.media_type = detect_media_type(p, entry)
if p.media_type not in ("unknown", "other"):
video_file = find_video_file(entry)
if video_file:
media_info = _prober.probe(video_file)
if media_info:
enrich_from_probe(p, media_info)
warnings = _assess(p)
except Exception as e:
warnings = [f"parse error: {e}"]
p = None
has_warnings = bool(warnings)
if args.failures_only and not has_warnings:
continue
if args.successes_only and has_warnings:
continue
print()
path_label = ""
if p:
path_label = {
"direct": c("direct", GREEN, DIM),
"sanitized": c("sanitized", YELLOW),
"ai": c("ai", RED),
}.get(p.parse_path, p.parse_path)
if has_warnings:
warn_count += 1
print(f" {c('', YELLOW, BOLD)} {c(name, YELLOW)} {path_label}")
else:
ok_count += 1
print(f" {c('', GREEN, BOLD)} {c(name, BOLD)} {path_label}")
if p:
kind = {
"movie": "movie",
"tv_show": "season pack" if p.is_season_pack else "episode",
"tv_complete": c("tv complete", CYAN),
"documentary": c("documentary", CYAN),
"concert": c("concert", CYAN),
"other": c("other", RED),
"unknown": c("unknown", YELLOW),
}.get(p.media_type, p.media_type)
kv("type", kind)
kv("title", p.title)
if p.season is not None:
ep = f"E{p.episode:02d}" if p.episode is not None else ""
kv("season/ep", f"S{p.season:02d} / {ep}")
if p.year:
kv("year", str(p.year))
if p.languages:
kv("langs", " ".join(p.languages))
kv("quality", p.quality or c("", DIM))
kv("source", p.source or c("", DIM))
kv("codec", p.codec or c("", DIM))
if p.audio_codec:
ch = f" {p.audio_channels}" if p.audio_channels else ""
kv("audio", f"{p.audio_codec}{ch}")
if p.bit_depth or p.hdr_format:
hdr_parts = [x for x in [p.bit_depth, p.hdr_format] if x]
kv("hdr/depth", " ".join(hdr_parts))
if p.edition:
kv("edition", p.edition, color=YELLOW)
kv("group", p.group, color=YELLOW if p.group == "UNKNOWN" else GREEN)
if p.site_tag:
kv("site tag", p.site_tag, color=YELLOW)
if warnings:
for w in warnings:
print(f" {c('' + w, YELLOW)}")
# Summary
print()
hr()
skipped = total - ok_count - warn_count
print(
f" {c('Total:', BOLD)} {total} "
f"{c(str(ok_count) + ' ok', GREEN, BOLD)} "
f"{c(str(warn_count) + ' warnings', YELLOW, BOLD)}"
+ (f" {c(str(skipped) + ' filtered', DIM)}" if skipped else "")
)
hr()
print()
if __name__ == "__main__":
main()