Files
alfred/testing/recognize_folders_in_downloads.py
T
francwa b7979c0f8b refactor(release): freeze ParsedRelease + enrich_from_probe returns new instance
ParsedRelease is now @dataclass(frozen=True). The enrichment passes that
used to patch fields in place now produce new instances:

- enrich_from_probe(parsed, info, kb) returns a new ParsedRelease via
  dataclasses.replace (no allocation when no field changed).
- inspect_release rebinds 'parsed' after detect_media_type (wrapped in
  MediaTypeToken — the strict isinstance check now also runs on
  replace) and after enrich_from_probe.

languages becomes a tuple[str, ...] so the VO is properly immutable.
Parser pipeline packs languages as a tuple in the assemble dict.

Callers updated: inspect_release, testing/recognize_folders_in_downloads.py.
Tests updated: 22 enrich_from_probe call sites rebound, language
assertions switched to tuple literals, test_release_fixtures normalizes
result['languages'] back to list for YAML-fixture comparison.

Suite: 1077 passed.
2026-05-21 07:51:49 +02:00

221 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""
recognize_folders_in_downloads.py — Parse every folder/file in the downloads directory.
Usage:
uv run testing/recognize_folders_in_downloads.py
uv run testing/recognize_folders_in_downloads.py --path /mnt/testipool/downloads
uv run testing/recognize_folders_in_downloads.py --failures-only
uv run testing/recognize_folders_in_downloads.py --successes-only
"""
import argparse
import sys
from pathlib import Path
_PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colours
# ---------------------------------------------------------------------------
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
USE_COLOR = True
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return str(text)
return "".join(codes) + str(text) + RESET
def kv(key: str, val: str, indent: int = 4, color: str = CYAN) -> None:
print(f"{' ' * indent}{c(key + ':', BOLD)} {c(val, color)}")
def hr() -> None:
print(c("" * 70, DIM))
# ---------------------------------------------------------------------------
# Parsing quality check
# ---------------------------------------------------------------------------
def _assess(p) -> list[str]:
"""Return a list of warning strings for fields that look wrong."""
if p.media_type in ("other", "unknown"):
return []
warnings = []
if p.group == "UNKNOWN":
warnings.append("group not found")
if not p.quality:
warnings.append("resolution not found")
if not p.codec:
warnings.append("codec not found")
if not p.title or p.title == p.normalised:
warnings.append("title extraction likely wrong")
return warnings
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
global USE_COLOR
parser = argparse.ArgumentParser(
description="Recognize release folders in downloads"
)
parser.add_argument(
"--path",
default="/mnt/testipool/downloads",
help="Downloads directory (default: /mnt/testipool/downloads)",
)
parser.add_argument(
"--failures-only", action="store_true", help="Show only entries with warnings"
)
parser.add_argument(
"--successes-only", action="store_true", help="Show only fully parsed entries"
)
parser.add_argument("--no-color", action="store_true")
args = parser.parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
downloads = Path(args.path)
if not downloads.exists():
print(c(f"Error: {downloads} does not exist", RED), file=sys.stderr)
sys.exit(1)
from dataclasses import replace
from alfred.application.release.detect_media_type import detect_media_type
from alfred.application.release.enrich_from_probe import enrich_from_probe
from alfred.domain.release.services import parse_release
from alfred.domain.release.value_objects import MediaTypeToken
from alfred.infrastructure.filesystem.find_video import find_video_file
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.probe import FfprobeMediaProber
_kb = YamlReleaseKnowledge()
_prober = FfprobeMediaProber()
entries = sorted(downloads.iterdir(), key=lambda p: p.name.lower())
total = len(entries)
ok_count = 0
warn_count = 0
print()
print(c("" * 70, BOLD))
print(c(f" Downloads — {downloads}", BOLD, CYAN))
print(c(f" {total} entries", DIM))
print(c("" * 70, BOLD))
for entry in entries:
name = entry.name
try:
p, _report = parse_release(name, _kb)
p = replace(p, media_type=MediaTypeToken(detect_media_type(p, entry, _kb)))
if p.media_type not in ("unknown", "other"):
video_file = find_video_file(entry)
if video_file:
media_info = _prober.probe(video_file)
if media_info:
p = enrich_from_probe(p, media_info, _kb)
warnings = _assess(p)
except Exception as e:
warnings = [f"parse error: {e}"]
p = None
has_warnings = bool(warnings)
if args.failures_only and not has_warnings:
continue
if args.successes_only and has_warnings:
continue
print()
path_label = ""
if p:
path_label = {
"direct": c("direct", GREEN, DIM),
"sanitized": c("sanitized", YELLOW),
"ai": c("ai", RED),
}.get(p.parse_path, p.parse_path)
if has_warnings:
warn_count += 1
print(f" {c('', YELLOW, BOLD)} {c(name, YELLOW)} {path_label}")
else:
ok_count += 1
print(f" {c('', GREEN, BOLD)} {c(name, BOLD)} {path_label}")
if p:
kind = {
"movie": "movie",
"tv_show": "season pack" if p.is_season_pack else "episode",
"tv_complete": c("tv complete", CYAN),
"documentary": c("documentary", CYAN),
"concert": c("concert", CYAN),
"other": c("other", RED),
"unknown": c("unknown", YELLOW),
}.get(p.media_type, p.media_type)
kv("type", kind)
kv("title", p.title)
if p.season is not None:
ep = f"E{p.episode:02d}" if p.episode is not None else ""
kv("season/ep", f"S{p.season:02d} / {ep}")
if p.year:
kv("year", str(p.year))
if p.languages:
kv("langs", " ".join(p.languages))
kv("quality", p.quality or c("", DIM))
kv("source", p.source or c("", DIM))
kv("codec", p.codec or c("", DIM))
if p.audio_codec:
ch = f" {p.audio_channels}" if p.audio_channels else ""
kv("audio", f"{p.audio_codec}{ch}")
if p.bit_depth or p.hdr_format:
hdr_parts = [x for x in [p.bit_depth, p.hdr_format] if x]
kv("hdr/depth", " ".join(hdr_parts))
if p.edition:
kv("edition", p.edition, color=YELLOW)
kv("group", p.group, color=YELLOW if p.group == "UNKNOWN" else GREEN)
if p.site_tag:
kv("site tag", p.site_tag, color=YELLOW)
if warnings:
for w in warnings:
print(f" {c('' + w, YELLOW)}")
# Summary
print()
hr()
skipped = total - ok_count - warn_count
print(
f" {c('Total:', BOLD)} {total} "
f"{c(str(ok_count) + ' ok', GREEN, BOLD)} "
f"{c(str(warn_count) + ' warnings', YELLOW, BOLD)}"
+ (f" {c(str(skipped) + ' filtered', DIM)}" if skipped else "")
)
hr()
print()
if __name__ == "__main__":
main()