ced72547f7
The domain layer no longer reads YAML files. All knowledge loaders move
from `alfred/domain/*/knowledge/` to `alfred/infrastructure/knowledge/`:
domain/release/knowledge.py
→ infrastructure/knowledge/release.py
domain/shared/knowledge/language_registry.py
→ infrastructure/knowledge/language_registry.py
domain/subtitles/knowledge/{loader,base}.py
→ infrastructure/knowledge/subtitles/{loader,base}.py
Callers in domain/release/{services,value_objects}.py,
domain/subtitles/{aggregates,services/*}.py, and
application/filesystem/manage_subtitles.py updated to absolute imports.
Re-exports of KnowledgeLoader/SubtitleKnowledgeBase from
domain/subtitles/__init__.py dropped (no shim per project convention).
Tests follow the moved targets.
228 lines
8.2 KiB
Python
228 lines
8.2 KiB
Python
"""PatternDetector — discovers the subtitle structure of a release folder."""
|
||
|
||
import json
|
||
import logging
|
||
import subprocess
|
||
from pathlib import Path
|
||
|
||
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
|
||
|
||
from ..value_objects import ScanStrategy, SubtitlePattern
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class PatternDetector:
|
||
"""
|
||
Inspects a release folder and returns the best matching known pattern,
|
||
plus a confidence score and a description of what was found.
|
||
|
||
Used for "pattern discovery" — when we don't yet know which pattern
|
||
a release follows. The result is proposed to the user for confirmation.
|
||
"""
|
||
|
||
def __init__(self, kb: SubtitleKnowledgeBase):
|
||
self.kb = kb
|
||
|
||
def detect(self, release_root: Path, sample_video: Path) -> dict:
|
||
"""
|
||
Analyse the release folder and return:
|
||
{
|
||
"detected": SubtitlePattern | None,
|
||
"confidence": float,
|
||
"description": str, # human-readable description of what was found
|
||
"candidate_pattern_ids": list[str],
|
||
}
|
||
"""
|
||
findings = self._inspect(release_root, sample_video)
|
||
best, confidence = self._match_pattern(findings)
|
||
|
||
return {
|
||
"detected": best,
|
||
"confidence": confidence,
|
||
"description": self._describe(findings),
|
||
"candidate_pattern_ids": [best.id] if best else [],
|
||
"raw_findings": findings,
|
||
}
|
||
|
||
def _has_embedded_subtitles(self, video_path: Path) -> bool:
|
||
"""Run ffprobe to check whether the video has embedded subtitle streams."""
|
||
try:
|
||
result = subprocess.run(
|
||
[
|
||
"ffprobe",
|
||
"-v",
|
||
"quiet",
|
||
"-print_format",
|
||
"json",
|
||
"-show_streams",
|
||
"-select_streams",
|
||
"s",
|
||
str(video_path),
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=30,
|
||
check=False,
|
||
)
|
||
data = json.loads(result.stdout)
|
||
return len(data.get("streams", [])) > 0
|
||
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
|
||
return False
|
||
|
||
def _inspect(self, release_root: Path, sample_video: Path) -> dict:
|
||
"""Gather structural facts about the release."""
|
||
known_exts = self.kb.known_extensions()
|
||
findings: dict = {
|
||
"has_subs_folder": False,
|
||
"subs_strategy": None, # "flat" | "episode_subfolder"
|
||
"subs_root": None,
|
||
"adjacent_subs": False,
|
||
"has_embedded": self._has_embedded_subtitles(sample_video),
|
||
"files_per_episode": 0,
|
||
"has_lang_tokens": False,
|
||
"has_numeric_prefix": False,
|
||
}
|
||
|
||
# Check for Subs/ folder — adjacent or at release root
|
||
for subs_candidate in [
|
||
sample_video.parent / "Subs",
|
||
release_root / "Subs",
|
||
]:
|
||
if subs_candidate.is_dir():
|
||
findings["has_subs_folder"] = True
|
||
findings["subs_root"] = str(subs_candidate)
|
||
|
||
# Is it flat or episode_subfolder?
|
||
children = list(subs_candidate.iterdir())
|
||
sub_files = [
|
||
c
|
||
for c in children
|
||
if c.is_file() and c.suffix.lower() in known_exts
|
||
]
|
||
sub_dirs = [c for c in children if c.is_dir()]
|
||
|
||
if sub_dirs and not sub_files:
|
||
findings["subs_strategy"] = "episode_subfolder"
|
||
# Count files in a sample subfolder
|
||
sample_sub = sub_dirs[0]
|
||
sample_files = [
|
||
f
|
||
for f in sample_sub.iterdir()
|
||
if f.is_file() and f.suffix.lower() in known_exts
|
||
]
|
||
findings["files_per_episode"] = len(sample_files)
|
||
# Check naming conventions
|
||
for f in sample_files:
|
||
stem = f.stem
|
||
parts = stem.split("_")
|
||
if parts[0].isdigit():
|
||
findings["has_numeric_prefix"] = True
|
||
if any(
|
||
self.kb.is_known_lang_token(t.lower())
|
||
for t in stem.replace("_", ".").split(".")
|
||
):
|
||
findings["has_lang_tokens"] = True
|
||
else:
|
||
findings["subs_strategy"] = "flat"
|
||
findings["files_per_episode"] = len(sub_files)
|
||
for f in sub_files:
|
||
if any(
|
||
self.kb.is_known_lang_token(t.lower())
|
||
for t in f.stem.replace("_", ".").split(".")
|
||
):
|
||
findings["has_lang_tokens"] = True
|
||
break
|
||
|
||
# Check adjacent subs (next to the video)
|
||
if not findings["has_subs_folder"]:
|
||
adjacent = [
|
||
p
|
||
for p in sample_video.parent.iterdir()
|
||
if p.is_file() and p.suffix.lower() in known_exts
|
||
]
|
||
if adjacent:
|
||
findings["adjacent_subs"] = True
|
||
findings["files_per_episode"] = len(adjacent)
|
||
|
||
return findings
|
||
|
||
def _match_pattern(self, findings: dict) -> tuple[SubtitlePattern | None, float]:
|
||
"""Score all known patterns against the findings."""
|
||
scores: list[tuple[float, SubtitlePattern]] = []
|
||
|
||
for pattern in self.kb.patterns().values():
|
||
score = self._score(pattern, findings)
|
||
scores.append((score, pattern))
|
||
|
||
if not scores:
|
||
return None, 0.0
|
||
|
||
scores.sort(key=lambda x: x[0], reverse=True)
|
||
best_score, best_pattern = scores[0]
|
||
|
||
if best_score < 0.4:
|
||
return None, best_score
|
||
|
||
return best_pattern, best_score
|
||
|
||
def _score(self, pattern: SubtitlePattern, findings: dict) -> float:
|
||
"""Return a 0.0–1.0 match score for this pattern against the findings."""
|
||
score = 0.0
|
||
total = 0.0
|
||
|
||
strategy = pattern.scan_strategy
|
||
|
||
if strategy == ScanStrategy.EMBEDDED:
|
||
total += 1
|
||
if findings.get("has_embedded"):
|
||
score += 1.0
|
||
if not findings.get("has_subs_folder") and not findings.get(
|
||
"adjacent_subs"
|
||
):
|
||
score += 0.5
|
||
total += 0.5
|
||
|
||
elif strategy == ScanStrategy.EPISODE_SUBFOLDER:
|
||
total += 3
|
||
if findings.get("has_subs_folder"):
|
||
score += 1.0
|
||
if findings.get("subs_strategy") == "episode_subfolder":
|
||
score += 2.0
|
||
|
||
elif strategy == ScanStrategy.FLAT:
|
||
total += 2
|
||
if findings.get("has_subs_folder"):
|
||
score += 1.0
|
||
if findings.get("subs_strategy") == "flat":
|
||
score += 1.0
|
||
|
||
elif strategy == ScanStrategy.ADJACENT:
|
||
total += 2
|
||
if findings.get("adjacent_subs"):
|
||
score += 1.0
|
||
if not findings.get("has_subs_folder"):
|
||
score += 1.0
|
||
|
||
return score / total if total > 0 else 0.0
|
||
|
||
def _describe(self, findings: dict) -> str:
|
||
parts = []
|
||
if findings.get("has_subs_folder"):
|
||
strategy = findings.get("subs_strategy", "?")
|
||
n = findings.get("files_per_episode", 0)
|
||
parts.append(f"Subs/ folder found ({strategy}), ~{n} file(s) per episode")
|
||
if findings.get("has_numeric_prefix"):
|
||
parts.append("files have numeric prefix (e.g. 2_English.srt)")
|
||
if findings.get("has_lang_tokens"):
|
||
parts.append("language tokens found in filenames")
|
||
elif findings.get("adjacent_subs"):
|
||
parts.append("subtitle files adjacent to video")
|
||
else:
|
||
parts.append("no external subtitle files found")
|
||
|
||
if findings.get("has_embedded"):
|
||
parts.append("embedded tracks detected (ffprobe)")
|
||
|
||
return " — ".join(parts) if parts else "nothing found"
|