Files
alfred/alfred/domain/subtitles/services/pattern_detector.py
T
francwa e6ee700825 refactor(subtitles): inject MediaProber/FilesystemScanner ports into domain services
Domain services no longer call subprocess or pathlib directly. Introduces
two Protocol ports in domain/shared/ports/:

  MediaProber.list_subtitle_streams(video) -> list[SubtitleStreamInfo]
  FilesystemScanner.scan_dir / stat / read_text  -> list[FileEntry] | ...

Concrete adapters live in infrastructure/:

  FfprobeMediaProber          (wraps subprocess + ffprobe + JSON)
  PathlibFilesystemScanner    (wraps pathlib + os reads)

SubtitleIdentifier and PatternDetector now take (kb, prober, scanner) at
construction time. Their internals work over FileEntry snapshots and
SubtitleStreamInfo records — no more ad-hoc Path.is_file/iterdir/stat or
embedded subprocess.run loops. _count_entries now takes raw SRT text
(returned by scanner.read_text) so SRT-only entry counting stays out of
the FS layer.

manage_subtitles use case instantiates the two adapters once and injects
them into both services. Tests pass real adapters and patch
`alfred.infrastructure.probe.ffprobe_prober.subprocess.run` for the
ffprobe-failure cases. _classify_single tests build FileEntry via a
small helper.

Domain is now free of subprocess / direct filesystem reads in the
subtitle pipeline. The only remaining I/O hooks are FilePath VO
convenience methods (exists/is_file/is_dir) which stay as a deliberate
affordance on the value object.
2026-05-19 14:52:24 +02:00

210 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""PatternDetector — discovers the subtitle structure of a release folder."""
import logging
from pathlib import Path
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from ...shared.ports import FilesystemScanner, MediaProber
from ..value_objects import ScanStrategy, SubtitlePattern
logger = logging.getLogger(__name__)
class PatternDetector:
"""
Inspects a release folder and returns the best matching known pattern,
plus a confidence score and a description of what was found.
Used for "pattern discovery" — when we don't yet know which pattern
a release follows. The result is proposed to the user for confirmation.
"""
def __init__(
self,
kb: SubtitleKnowledgeBase,
prober: MediaProber,
scanner: FilesystemScanner,
):
self.kb = kb
self.prober = prober
self.scanner = scanner
def detect(self, release_root: Path, sample_video: Path) -> dict:
"""
Analyse the release folder and return:
{
"detected": SubtitlePattern | None,
"confidence": float,
"description": str, # human-readable description of what was found
"candidate_pattern_ids": list[str],
}
"""
findings = self._inspect(release_root, sample_video)
best, confidence = self._match_pattern(findings)
return {
"detected": best,
"confidence": confidence,
"description": self._describe(findings),
"candidate_pattern_ids": [best.id] if best else [],
"raw_findings": findings,
}
def _has_embedded_subtitles(self, video_path: Path) -> bool:
return len(self.prober.list_subtitle_streams(video_path)) > 0
def _inspect(self, release_root: Path, sample_video: Path) -> dict:
"""Gather structural facts about the release."""
known_exts = self.kb.known_extensions()
findings: dict = {
"has_subs_folder": False,
"subs_strategy": None, # "flat" | "episode_subfolder"
"subs_root": None,
"adjacent_subs": False,
"has_embedded": self._has_embedded_subtitles(sample_video),
"files_per_episode": 0,
"has_lang_tokens": False,
"has_numeric_prefix": False,
}
# Check for Subs/ folder — adjacent or at release root
for subs_candidate in (
sample_video.parent / "Subs",
release_root / "Subs",
):
children = self.scanner.scan_dir(subs_candidate)
if not children:
continue
findings["has_subs_folder"] = True
findings["subs_root"] = str(subs_candidate)
# Is it flat or episode_subfolder?
sub_files = [
c for c in children if c.is_file and c.suffix.lower() in known_exts
]
sub_dirs = [c for c in children if c.is_dir]
if sub_dirs and not sub_files:
findings["subs_strategy"] = "episode_subfolder"
# Count files in a sample subfolder
sample_files = [
f
for f in self.scanner.scan_dir(sub_dirs[0].path)
if f.is_file and f.suffix.lower() in known_exts
]
findings["files_per_episode"] = len(sample_files)
# Check naming conventions
for f in sample_files:
parts = f.stem.split("_")
if parts[0].isdigit():
findings["has_numeric_prefix"] = True
if any(
self.kb.is_known_lang_token(t.lower())
for t in f.stem.replace("_", ".").split(".")
):
findings["has_lang_tokens"] = True
else:
findings["subs_strategy"] = "flat"
findings["files_per_episode"] = len(sub_files)
for f in sub_files:
if any(
self.kb.is_known_lang_token(t.lower())
for t in f.stem.replace("_", ".").split(".")
):
findings["has_lang_tokens"] = True
break
# Check adjacent subs (next to the video)
if not findings["has_subs_folder"]:
adjacent = [
e
for e in self.scanner.scan_dir(sample_video.parent)
if e.is_file and e.suffix.lower() in known_exts
]
if adjacent:
findings["adjacent_subs"] = True
findings["files_per_episode"] = len(adjacent)
return findings
def _match_pattern(self, findings: dict) -> tuple[SubtitlePattern | None, float]:
"""Score all known patterns against the findings."""
scores: list[tuple[float, SubtitlePattern]] = []
for pattern in self.kb.patterns().values():
score = self._score(pattern, findings)
scores.append((score, pattern))
if not scores:
return None, 0.0
scores.sort(key=lambda x: x[0], reverse=True)
best_score, best_pattern = scores[0]
if best_score < 0.4:
return None, best_score
return best_pattern, best_score
def _score(self, pattern: SubtitlePattern, findings: dict) -> float:
"""Return a 0.01.0 match score for this pattern against the findings."""
score = 0.0
total = 0.0
strategy = pattern.scan_strategy
if strategy == ScanStrategy.EMBEDDED:
total += 1
if findings.get("has_embedded"):
score += 1.0
if not findings.get("has_subs_folder") and not findings.get(
"adjacent_subs"
):
score += 0.5
total += 0.5
elif strategy == ScanStrategy.EPISODE_SUBFOLDER:
total += 3
if findings.get("has_subs_folder"):
score += 1.0
if findings.get("subs_strategy") == "episode_subfolder":
score += 2.0
elif strategy == ScanStrategy.FLAT:
total += 2
if findings.get("has_subs_folder"):
score += 1.0
if findings.get("subs_strategy") == "flat":
score += 1.0
elif strategy == ScanStrategy.ADJACENT:
total += 2
if findings.get("adjacent_subs"):
score += 1.0
if not findings.get("has_subs_folder"):
score += 1.0
return score / total if total > 0 else 0.0
def _describe(self, findings: dict) -> str:
parts = []
if findings.get("has_subs_folder"):
strategy = findings.get("subs_strategy", "?")
n = findings.get("files_per_episode", 0)
parts.append(f"Subs/ folder found ({strategy}), ~{n} file(s) per episode")
if findings.get("has_numeric_prefix"):
parts.append("files have numeric prefix (e.g. 2_English.srt)")
if findings.get("has_lang_tokens"):
parts.append("language tokens found in filenames")
elif findings.get("adjacent_subs"):
parts.append("subtitle files adjacent to video")
else:
parts.append("no external subtitle files found")
if findings.get("has_embedded"):
parts.append("embedded tracks detected")
return "".join(parts) if parts else "nothing found"