0246f85ef8
The three module-level dicts in enrich_from_probe (ffprobe codec name to scene token, channel count to layout) were exactly the kind of domain lookup table CLAUDE.md says belongs in YAML, not in Python. Move them to alfred/knowledge/release/probe_mappings.yaml, load through a new ReleaseKnowledge.probe_mappings port field, and add a kb parameter to enrich_from_probe so the consumer reads the maps via the same injection pattern as everything else. - New knowledge file: alfred/knowledge/release/probe_mappings.yaml - New loader: load_probe_mappings() in infrastructure/knowledge/release.py (normalizes channel-count keys back to int). - Port: ReleaseKnowledge gains probe_mappings: dict. - Adapter: YamlReleaseKnowledge populates it at __init__. - Consumer: enrich_from_probe(parsed, info, kb) reads the three sub-maps from kb.probe_mappings; unknown codecs still fall back to uppercase raw value, same behaviour as before. - Call sites updated: inspect_release passes kb through; the testing script gets its kb wiring (it was already broken since the ReleaseKnowledge refactor); all 22 enrich_from_probe call sites in tests/application/test_enrich_from_probe.py pass _KB.
218 lines
7.0 KiB
Python
218 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
recognize_folders_in_downloads.py — Parse every folder/file in the downloads directory.
|
|
|
|
Usage:
|
|
uv run testing/recognize_folders_in_downloads.py
|
|
uv run testing/recognize_folders_in_downloads.py --path /mnt/testipool/downloads
|
|
uv run testing/recognize_folders_in_downloads.py --failures-only
|
|
uv run testing/recognize_folders_in_downloads.py --successes-only
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
_PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(_PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(_PROJECT_ROOT))
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Colours
|
|
# ---------------------------------------------------------------------------
|
|
|
|
RESET = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
GREEN = "\033[32m"
|
|
YELLOW = "\033[33m"
|
|
RED = "\033[31m"
|
|
CYAN = "\033[36m"
|
|
|
|
USE_COLOR = True
|
|
|
|
|
|
def c(text: str, *codes: str) -> str:
|
|
if not USE_COLOR:
|
|
return str(text)
|
|
return "".join(codes) + str(text) + RESET
|
|
|
|
|
|
def kv(key: str, val: str, indent: int = 4, color: str = CYAN) -> None:
|
|
print(f"{' ' * indent}{c(key + ':', BOLD)} {c(val, color)}")
|
|
|
|
|
|
def hr() -> None:
|
|
print(c("─" * 70, DIM))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parsing quality check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _assess(p) -> list[str]:
|
|
"""Return a list of warning strings for fields that look wrong."""
|
|
if p.media_type in ("other", "unknown"):
|
|
return []
|
|
warnings = []
|
|
if p.group == "UNKNOWN":
|
|
warnings.append("group not found")
|
|
if not p.quality:
|
|
warnings.append("resolution not found")
|
|
if not p.codec:
|
|
warnings.append("codec not found")
|
|
if not p.title or p.title == p.normalised:
|
|
warnings.append("title extraction likely wrong")
|
|
return warnings
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main() -> None:
|
|
global USE_COLOR
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Recognize release folders in downloads"
|
|
)
|
|
parser.add_argument(
|
|
"--path",
|
|
default="/mnt/testipool/downloads",
|
|
help="Downloads directory (default: /mnt/testipool/downloads)",
|
|
)
|
|
parser.add_argument(
|
|
"--failures-only", action="store_true", help="Show only entries with warnings"
|
|
)
|
|
parser.add_argument(
|
|
"--successes-only", action="store_true", help="Show only fully parsed entries"
|
|
)
|
|
parser.add_argument("--no-color", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
if args.no_color or not sys.stdout.isatty():
|
|
USE_COLOR = False
|
|
|
|
downloads = Path(args.path)
|
|
if not downloads.exists():
|
|
print(c(f"Error: {downloads} does not exist", RED), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
from alfred.application.release.detect_media_type import detect_media_type
|
|
from alfred.application.release.enrich_from_probe import enrich_from_probe
|
|
from alfred.domain.release.services import parse_release
|
|
from alfred.infrastructure.filesystem.find_video import find_video_file
|
|
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
|
from alfred.infrastructure.probe import FfprobeMediaProber
|
|
|
|
_kb = YamlReleaseKnowledge()
|
|
_prober = FfprobeMediaProber()
|
|
|
|
entries = sorted(downloads.iterdir(), key=lambda p: p.name.lower())
|
|
total = len(entries)
|
|
ok_count = 0
|
|
warn_count = 0
|
|
|
|
print()
|
|
print(c("━" * 70, BOLD))
|
|
print(c(f" Downloads — {downloads}", BOLD, CYAN))
|
|
print(c(f" {total} entries", DIM))
|
|
print(c("━" * 70, BOLD))
|
|
|
|
for entry in entries:
|
|
name = entry.name
|
|
|
|
try:
|
|
p, _report = parse_release(name, _kb)
|
|
p.media_type = detect_media_type(p, entry, _kb)
|
|
if p.media_type not in ("unknown", "other"):
|
|
video_file = find_video_file(entry)
|
|
if video_file:
|
|
media_info = _prober.probe(video_file)
|
|
if media_info:
|
|
enrich_from_probe(p, media_info, _kb)
|
|
warnings = _assess(p)
|
|
except Exception as e:
|
|
warnings = [f"parse error: {e}"]
|
|
p = None
|
|
|
|
has_warnings = bool(warnings)
|
|
|
|
if args.failures_only and not has_warnings:
|
|
continue
|
|
if args.successes_only and has_warnings:
|
|
continue
|
|
|
|
print()
|
|
path_label = ""
|
|
if p:
|
|
path_label = {
|
|
"direct": c("direct", GREEN, DIM),
|
|
"sanitized": c("sanitized", YELLOW),
|
|
"ai": c("ai", RED),
|
|
}.get(p.parse_path, p.parse_path)
|
|
|
|
if has_warnings:
|
|
warn_count += 1
|
|
print(f" {c('⚠', YELLOW, BOLD)} {c(name, YELLOW)} {path_label}")
|
|
else:
|
|
ok_count += 1
|
|
print(f" {c('✓', GREEN, BOLD)} {c(name, BOLD)} {path_label}")
|
|
|
|
if p:
|
|
kind = {
|
|
"movie": "movie",
|
|
"tv_show": "season pack" if p.is_season_pack else "episode",
|
|
"tv_complete": c("tv complete", CYAN),
|
|
"documentary": c("documentary", CYAN),
|
|
"concert": c("concert", CYAN),
|
|
"other": c("other", RED),
|
|
"unknown": c("unknown", YELLOW),
|
|
}.get(p.media_type, p.media_type)
|
|
kv("type", kind)
|
|
kv("title", p.title)
|
|
if p.season is not None:
|
|
ep = f"E{p.episode:02d}" if p.episode is not None else "—"
|
|
kv("season/ep", f"S{p.season:02d} / {ep}")
|
|
if p.year:
|
|
kv("year", str(p.year))
|
|
if p.languages:
|
|
kv("langs", " ".join(p.languages))
|
|
kv("quality", p.quality or c("—", DIM))
|
|
kv("source", p.source or c("—", DIM))
|
|
kv("codec", p.codec or c("—", DIM))
|
|
if p.audio_codec:
|
|
ch = f" {p.audio_channels}" if p.audio_channels else ""
|
|
kv("audio", f"{p.audio_codec}{ch}")
|
|
if p.bit_depth or p.hdr_format:
|
|
hdr_parts = [x for x in [p.bit_depth, p.hdr_format] if x]
|
|
kv("hdr/depth", " ".join(hdr_parts))
|
|
if p.edition:
|
|
kv("edition", p.edition, color=YELLOW)
|
|
kv("group", p.group, color=YELLOW if p.group == "UNKNOWN" else GREEN)
|
|
if p.site_tag:
|
|
kv("site tag", p.site_tag, color=YELLOW)
|
|
|
|
if warnings:
|
|
for w in warnings:
|
|
print(f" {c('→ ' + w, YELLOW)}")
|
|
|
|
# Summary
|
|
print()
|
|
hr()
|
|
skipped = total - ok_count - warn_count
|
|
print(
|
|
f" {c('Total:', BOLD)} {total} "
|
|
f"{c(str(ok_count) + ' ok', GREEN, BOLD)} "
|
|
f"{c(str(warn_count) + ' warnings', YELLOW, BOLD)}"
|
|
+ (f" {c(str(skipped) + ' filtered', DIM)}" if skipped else "")
|
|
)
|
|
hr()
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|