feat: major architectural refactor
- Refactor memory system (episodic/STM/LTM with components) - Implement complete subtitle domain (scanner, matcher, placer) - Add YAML workflow infrastructure - Externalize knowledge base (patterns, release groups) - Add comprehensive testing suite - Create manual testing CLIs
This commit is contained in:
@@ -0,0 +1,13 @@
|
||||
from .identifier import SubtitleIdentifier
|
||||
from .matcher import SubtitleMatcher
|
||||
from .pattern_detector import PatternDetector
|
||||
from .placer import PlacedTrack, PlaceResult, SubtitlePlacer
|
||||
|
||||
__all__ = [
|
||||
"SubtitleIdentifier",
|
||||
"SubtitleMatcher",
|
||||
"PatternDetector",
|
||||
"SubtitlePlacer",
|
||||
"PlacedTrack",
|
||||
"PlaceResult",
|
||||
]
|
||||
@@ -0,0 +1,287 @@
|
||||
"""SubtitleIdentifier — finds and classifies all subtitle tracks for a video file."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from ...shared.value_objects import ImdbId
|
||||
from ..entities import MediaSubtitleMetadata, SubtitleTrack
|
||||
from ..knowledge.base import SubtitleKnowledgeBase
|
||||
from ..value_objects import ScanStrategy, SubtitlePattern, SubtitleType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _tokenize(name: str) -> list[str]:
|
||||
"""Split a filename stem into lowercase tokens."""
|
||||
return [t.lower() for t in re.split(r"[\.\s_\-]+", name) if t]
|
||||
|
||||
|
||||
def _count_entries(path: Path) -> int:
|
||||
"""Return the entry count of an SRT file by finding the last cue number."""
|
||||
try:
|
||||
with open(path, encoding="utf-8", errors="replace") as f:
|
||||
lines = f.read().splitlines()
|
||||
for line in reversed(lines):
|
||||
if line.strip().isdigit():
|
||||
return int(line.strip())
|
||||
return 0
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
class SubtitleIdentifier:
|
||||
"""
|
||||
Finds all subtitle tracks for a given video file using a known pattern,
|
||||
then attempts to classify each track (language, type, format).
|
||||
|
||||
Returns a MediaSubtitleMetadata with embedded + external tracks.
|
||||
External tracks with unknown language or low confidence are left as-is —
|
||||
the caller (use case) decides whether to ask the user for clarification.
|
||||
"""
|
||||
|
||||
def __init__(self, kb: SubtitleKnowledgeBase):
|
||||
self.kb = kb
|
||||
|
||||
def identify(
|
||||
self,
|
||||
video_path: Path,
|
||||
pattern: SubtitlePattern,
|
||||
media_id: ImdbId | None,
|
||||
media_type: str,
|
||||
release_group: str | None = None,
|
||||
) -> MediaSubtitleMetadata:
|
||||
metadata = MediaSubtitleMetadata(
|
||||
media_id=media_id,
|
||||
media_type=media_type,
|
||||
release_group=release_group,
|
||||
detected_pattern_id=pattern.id,
|
||||
)
|
||||
|
||||
if pattern.scan_strategy == ScanStrategy.EMBEDDED:
|
||||
metadata.embedded_tracks = self._scan_embedded(video_path)
|
||||
else:
|
||||
metadata.external_tracks = self._scan_external(video_path, pattern)
|
||||
# Always also check for embedded tracks
|
||||
metadata.embedded_tracks = self._scan_embedded(video_path)
|
||||
|
||||
return metadata
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Embedded tracks — ffprobe
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _scan_embedded(self, video_path: Path) -> list[SubtitleTrack]:
|
||||
if not video_path.exists():
|
||||
return []
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"ffprobe", "-v", "quiet",
|
||||
"-print_format", "json",
|
||||
"-show_streams",
|
||||
"-select_streams", "s",
|
||||
str(video_path),
|
||||
],
|
||||
capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
data = json.loads(result.stdout)
|
||||
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError) as e:
|
||||
logger.debug(f"SubtitleIdentifier: ffprobe failed for {video_path.name}: {e}")
|
||||
return []
|
||||
|
||||
tracks = []
|
||||
for stream in data.get("streams", []):
|
||||
tags = stream.get("tags", {})
|
||||
disposition = stream.get("disposition", {})
|
||||
lang_code = tags.get("language", "")
|
||||
title = tags.get("title", "")
|
||||
|
||||
lang = self.kb.language_for_token(lang_code) if lang_code else None
|
||||
|
||||
if disposition.get("hearing_impaired"):
|
||||
stype = SubtitleType.SDH
|
||||
elif disposition.get("forced"):
|
||||
stype = SubtitleType.FORCED
|
||||
else:
|
||||
stype = SubtitleType.STANDARD
|
||||
|
||||
tracks.append(SubtitleTrack(
|
||||
language=lang,
|
||||
format=None,
|
||||
subtitle_type=stype,
|
||||
is_embedded=True,
|
||||
raw_tokens=[lang_code] if lang_code else [],
|
||||
))
|
||||
|
||||
logger.debug(f"SubtitleIdentifier: {len(tracks)} embedded track(s) in {video_path.name}")
|
||||
return tracks
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# External tracks — filesystem scan per pattern strategy
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _scan_external(self, video_path: Path, pattern: SubtitlePattern) -> list[SubtitleTrack]:
|
||||
strategy = pattern.scan_strategy
|
||||
|
||||
if strategy == ScanStrategy.ADJACENT:
|
||||
candidates = self._find_adjacent(video_path)
|
||||
elif strategy == ScanStrategy.FLAT:
|
||||
candidates = self._find_flat(video_path, pattern.root_folder or "Subs")
|
||||
elif strategy == ScanStrategy.EPISODE_SUBFOLDER:
|
||||
candidates = self._find_episode_subfolder(video_path, pattern.root_folder or "Subs")
|
||||
else:
|
||||
return []
|
||||
|
||||
return self._classify_files(candidates, pattern)
|
||||
|
||||
def _find_adjacent(self, video_path: Path) -> list[Path]:
|
||||
return [
|
||||
p for p in sorted(video_path.parent.iterdir())
|
||||
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
|
||||
and p.stem != video_path.stem
|
||||
]
|
||||
|
||||
def _find_flat(self, video_path: Path, root_folder: str) -> list[Path]:
|
||||
subs_dir = video_path.parent / root_folder
|
||||
if not subs_dir.is_dir():
|
||||
# Also look at release root (one level up)
|
||||
subs_dir = video_path.parent.parent / root_folder
|
||||
if not subs_dir.is_dir():
|
||||
return []
|
||||
return [
|
||||
p for p in sorted(subs_dir.iterdir())
|
||||
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
|
||||
]
|
||||
|
||||
def _find_episode_subfolder(self, video_path: Path, root_folder: str) -> list[Path]:
|
||||
"""
|
||||
Look for Subs/{episode_stem}/*.srt
|
||||
|
||||
Checks two locations:
|
||||
1. Adjacent to the video: video_path.parent / root_folder / video_path.stem
|
||||
2. Release root (one level up): video_path.parent.parent / root_folder / video_path.stem
|
||||
"""
|
||||
episode_stem = video_path.stem
|
||||
candidates_dirs = [
|
||||
video_path.parent / root_folder / episode_stem,
|
||||
video_path.parent.parent / root_folder / episode_stem,
|
||||
]
|
||||
for subs_dir in candidates_dirs:
|
||||
if subs_dir.is_dir():
|
||||
files = [
|
||||
p for p in sorted(subs_dir.iterdir())
|
||||
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
|
||||
]
|
||||
if files:
|
||||
logger.debug(f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}")
|
||||
return files
|
||||
return []
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Classification
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _classify_files(self, paths: list[Path], pattern: SubtitlePattern) -> list[SubtitleTrack]:
|
||||
tracks = []
|
||||
for path in paths:
|
||||
track = self._classify_single(path)
|
||||
tracks.append(track)
|
||||
|
||||
# Post-process: if multiple tracks share same language but type is ambiguous,
|
||||
# apply size_and_count disambiguation
|
||||
if pattern.type_detection.value == "size_and_count":
|
||||
tracks = self._disambiguate_by_size(tracks)
|
||||
|
||||
return tracks
|
||||
|
||||
def _classify_single(self, path: Path) -> SubtitleTrack:
|
||||
fmt = self.kb.format_for_extension(path.suffix)
|
||||
tokens = _tokenize(path.stem)
|
||||
|
||||
language = None
|
||||
subtitle_type = SubtitleType.UNKNOWN
|
||||
unknown_tokens = []
|
||||
matched_tokens = 0
|
||||
|
||||
for token in tokens:
|
||||
if self.kb.is_known_lang_token(token):
|
||||
language = self.kb.language_for_token(token)
|
||||
matched_tokens += 1
|
||||
elif self.kb.is_known_type_token(token):
|
||||
subtitle_type = self.kb.type_for_token(token) or subtitle_type
|
||||
matched_tokens += 1
|
||||
elif token.isdigit():
|
||||
pass # numeric prefix — ignore
|
||||
elif len(token) > 1:
|
||||
unknown_tokens.append(token)
|
||||
|
||||
# Confidence: proportion of meaningful tokens that were recognized
|
||||
meaningful = [t for t in tokens if not t.isdigit() and len(t) > 1]
|
||||
confidence = matched_tokens / max(len(meaningful), 1) if meaningful else 0.5
|
||||
|
||||
if unknown_tokens:
|
||||
logger.debug(
|
||||
f"SubtitleIdentifier: unknown tokens in '{path.name}': {unknown_tokens}"
|
||||
)
|
||||
|
||||
size_kb = path.stat().st_size / 1024 if path.exists() else None
|
||||
entry_count = _count_entries(path) if path.exists() else None
|
||||
|
||||
return SubtitleTrack(
|
||||
language=language,
|
||||
format=fmt,
|
||||
subtitle_type=subtitle_type,
|
||||
is_embedded=False,
|
||||
file_path=path,
|
||||
file_size_kb=size_kb,
|
||||
entry_count=entry_count,
|
||||
confidence=confidence,
|
||||
raw_tokens=tokens,
|
||||
)
|
||||
|
||||
def _disambiguate_by_size(self, tracks: list[SubtitleTrack]) -> list[SubtitleTrack]:
|
||||
"""
|
||||
When multiple tracks share the same language and type is UNKNOWN/STANDARD,
|
||||
the one with the most entries (lines) is SDH, the smallest is FORCED if
|
||||
there are 3+, otherwise the smaller is STANDARD.
|
||||
|
||||
Only applied when type_detection = size_and_count.
|
||||
"""
|
||||
from itertools import groupby
|
||||
|
||||
# Group by language code
|
||||
lang_groups: dict[str, list[SubtitleTrack]] = {}
|
||||
for track in tracks:
|
||||
key = track.language.code if track.language else "__unknown__"
|
||||
lang_groups.setdefault(key, []).append(track)
|
||||
|
||||
result = []
|
||||
for lang_code, group in lang_groups.items():
|
||||
if len(group) == 1:
|
||||
result.extend(group)
|
||||
continue
|
||||
|
||||
# Sort by entry_count ascending (None treated as 0)
|
||||
sorted_group = sorted(group, key=lambda t: t.entry_count or 0)
|
||||
|
||||
if len(sorted_group) == 2:
|
||||
# smaller = standard, larger = sdh
|
||||
self._set_type(sorted_group[0], SubtitleType.STANDARD)
|
||||
self._set_type(sorted_group[1], SubtitleType.SDH)
|
||||
elif len(sorted_group) >= 3:
|
||||
# smallest = forced, middle = standard, largest = sdh
|
||||
self._set_type(sorted_group[0], SubtitleType.FORCED)
|
||||
for t in sorted_group[1:-1]:
|
||||
self._set_type(t, SubtitleType.STANDARD)
|
||||
self._set_type(sorted_group[-1], SubtitleType.SDH)
|
||||
|
||||
result.extend(sorted_group)
|
||||
|
||||
return result
|
||||
|
||||
def _set_type(self, track: SubtitleTrack, stype: SubtitleType) -> None:
|
||||
"""Mutate track type in-place."""
|
||||
track.subtitle_type = stype
|
||||
@@ -0,0 +1,118 @@
|
||||
"""SubtitleMatcher — filters tracks against resolved rules."""
|
||||
|
||||
import logging
|
||||
|
||||
from ..entities import SubtitleTrack
|
||||
from ..value_objects import SubtitleMatchingRules, SubtitleType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SubtitleMatcher:
|
||||
"""
|
||||
Filters a list of SubtitleTrack against effective SubtitleMatchingRules.
|
||||
|
||||
Returns matched tracks (pass all filters, confidence >= min_confidence)
|
||||
and unresolved tracks (need user clarification).
|
||||
|
||||
Conflict resolution: when two tracks share the same language + type,
|
||||
format_priority decides which one to keep.
|
||||
"""
|
||||
|
||||
def match(
|
||||
self,
|
||||
tracks: list[SubtitleTrack],
|
||||
rules: SubtitleMatchingRules,
|
||||
) -> tuple[list[SubtitleTrack], list[SubtitleTrack]]:
|
||||
"""
|
||||
Returns (matched, unresolved).
|
||||
"""
|
||||
matched: list[SubtitleTrack] = []
|
||||
unresolved: list[SubtitleTrack] = []
|
||||
|
||||
for track in tracks:
|
||||
if track.is_embedded:
|
||||
continue
|
||||
|
||||
if track.language is None or track.confidence < rules.min_confidence:
|
||||
unresolved.append(track)
|
||||
continue
|
||||
|
||||
if not self._passes_filters(track, rules):
|
||||
logger.debug(f"SubtitleMatcher: filtered out {track}")
|
||||
continue
|
||||
|
||||
matched.append(track)
|
||||
|
||||
matched = self._resolve_conflicts(matched, rules)
|
||||
logger.info(
|
||||
f"SubtitleMatcher: {len(matched)} matched, {len(unresolved)} unresolved"
|
||||
)
|
||||
return matched, unresolved
|
||||
|
||||
def _passes_filters(self, track: SubtitleTrack, rules: SubtitleMatchingRules) -> bool:
|
||||
# Language filter
|
||||
if rules.preferred_languages:
|
||||
if not track.language:
|
||||
return False
|
||||
if track.language.code not in rules.preferred_languages:
|
||||
return False
|
||||
|
||||
# Format filter (only for external files)
|
||||
if rules.preferred_formats and not track.is_embedded:
|
||||
if not track.format:
|
||||
return False
|
||||
if track.format.id not in rules.preferred_formats:
|
||||
return False
|
||||
|
||||
# Type filter
|
||||
if rules.allowed_types:
|
||||
if track.subtitle_type.value not in rules.allowed_types:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _resolve_conflicts(
|
||||
self,
|
||||
tracks: list[SubtitleTrack],
|
||||
rules: SubtitleMatchingRules,
|
||||
) -> list[SubtitleTrack]:
|
||||
"""
|
||||
When multiple tracks have same language + type, keep only the best one
|
||||
according to format_priority. If no format_priority applies, keep the first.
|
||||
"""
|
||||
seen: dict[tuple, SubtitleTrack] = {}
|
||||
|
||||
for track in tracks:
|
||||
lang = track.language.code if track.language else None
|
||||
stype = track.subtitle_type.value
|
||||
key = (lang, stype)
|
||||
|
||||
if key not in seen:
|
||||
seen[key] = track
|
||||
else:
|
||||
existing = seen[key]
|
||||
if self._prefer(track, existing, rules.format_priority):
|
||||
logger.debug(
|
||||
f"SubtitleMatcher: conflict {key} — "
|
||||
f"preferring {track.format.id if track.format else 'embedded'} "
|
||||
f"over {existing.format.id if existing.format else 'embedded'}"
|
||||
)
|
||||
seen[key] = track
|
||||
|
||||
return list(seen.values())
|
||||
|
||||
def _prefer(
|
||||
self,
|
||||
candidate: SubtitleTrack,
|
||||
existing: SubtitleTrack,
|
||||
format_priority: list[str],
|
||||
) -> bool:
|
||||
"""Return True if candidate is preferable to existing."""
|
||||
if not format_priority:
|
||||
return False
|
||||
c_fmt = candidate.format.id if candidate.format else ""
|
||||
e_fmt = existing.format.id if existing.format else ""
|
||||
c_rank = format_priority.index(c_fmt) if c_fmt in format_priority else 999
|
||||
e_rank = format_priority.index(e_fmt) if e_fmt in format_priority else 999
|
||||
return c_rank < e_rank
|
||||
@@ -0,0 +1,205 @@
|
||||
"""PatternDetector — discovers the subtitle structure of a release folder."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from ..knowledge.base import SubtitleKnowledgeBase
|
||||
from ..value_objects import ScanStrategy, SubtitlePattern
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PatternDetector:
|
||||
"""
|
||||
Inspects a release folder and returns the best matching known pattern,
|
||||
plus a confidence score and a description of what was found.
|
||||
|
||||
Used for "pattern discovery" — when we don't yet know which pattern
|
||||
a release follows. The result is proposed to the user for confirmation.
|
||||
"""
|
||||
|
||||
def __init__(self, kb: SubtitleKnowledgeBase):
|
||||
self.kb = kb
|
||||
|
||||
def detect(self, release_root: Path, sample_video: Path) -> dict:
|
||||
"""
|
||||
Analyse the release folder and return:
|
||||
{
|
||||
"detected": SubtitlePattern | None,
|
||||
"confidence": float,
|
||||
"description": str, # human-readable description of what was found
|
||||
"candidate_pattern_ids": list[str],
|
||||
}
|
||||
"""
|
||||
findings = self._inspect(release_root, sample_video)
|
||||
best, confidence = self._match_pattern(findings)
|
||||
|
||||
return {
|
||||
"detected": best,
|
||||
"confidence": confidence,
|
||||
"description": self._describe(findings),
|
||||
"candidate_pattern_ids": [best.id] if best else [],
|
||||
"raw_findings": findings,
|
||||
}
|
||||
|
||||
def _has_embedded_subtitles(self, video_path: Path) -> bool:
|
||||
"""Run ffprobe to check whether the video has embedded subtitle streams."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"ffprobe", "-v", "quiet",
|
||||
"-print_format", "json",
|
||||
"-show_streams",
|
||||
"-select_streams", "s",
|
||||
str(video_path),
|
||||
],
|
||||
capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
data = json.loads(result.stdout)
|
||||
return len(data.get("streams", [])) > 0
|
||||
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
|
||||
return False
|
||||
|
||||
def _inspect(self, release_root: Path, sample_video: Path) -> dict:
|
||||
"""Gather structural facts about the release."""
|
||||
known_exts = self.kb.known_extensions()
|
||||
findings: dict = {
|
||||
"has_subs_folder": False,
|
||||
"subs_strategy": None, # "flat" | "episode_subfolder"
|
||||
"subs_root": None,
|
||||
"adjacent_subs": False,
|
||||
"has_embedded": self._has_embedded_subtitles(sample_video),
|
||||
"files_per_episode": 0,
|
||||
"has_lang_tokens": False,
|
||||
"has_numeric_prefix": False,
|
||||
}
|
||||
|
||||
# Check for Subs/ folder — adjacent or at release root
|
||||
for subs_candidate in [
|
||||
sample_video.parent / "Subs",
|
||||
release_root / "Subs",
|
||||
]:
|
||||
if subs_candidate.is_dir():
|
||||
findings["has_subs_folder"] = True
|
||||
findings["subs_root"] = str(subs_candidate)
|
||||
|
||||
# Is it flat or episode_subfolder?
|
||||
children = list(subs_candidate.iterdir())
|
||||
sub_files = [c for c in children if c.is_file() and c.suffix.lower() in known_exts]
|
||||
sub_dirs = [c for c in children if c.is_dir()]
|
||||
|
||||
if sub_dirs and not sub_files:
|
||||
findings["subs_strategy"] = "episode_subfolder"
|
||||
# Count files in a sample subfolder
|
||||
sample_sub = sub_dirs[0]
|
||||
sample_files = [f for f in sample_sub.iterdir()
|
||||
if f.is_file() and f.suffix.lower() in known_exts]
|
||||
findings["files_per_episode"] = len(sample_files)
|
||||
# Check naming conventions
|
||||
for f in sample_files:
|
||||
stem = f.stem
|
||||
parts = stem.split("_")
|
||||
if parts[0].isdigit():
|
||||
findings["has_numeric_prefix"] = True
|
||||
if any(self.kb.is_known_lang_token(t.lower())
|
||||
for t in stem.replace("_", ".").split(".")):
|
||||
findings["has_lang_tokens"] = True
|
||||
else:
|
||||
findings["subs_strategy"] = "flat"
|
||||
findings["files_per_episode"] = len(sub_files)
|
||||
for f in sub_files:
|
||||
if any(self.kb.is_known_lang_token(t.lower())
|
||||
for t in f.stem.replace("_", ".").split(".")):
|
||||
findings["has_lang_tokens"] = True
|
||||
break
|
||||
|
||||
# Check adjacent subs (next to the video)
|
||||
if not findings["has_subs_folder"]:
|
||||
adjacent = [
|
||||
p for p in sample_video.parent.iterdir()
|
||||
if p.is_file() and p.suffix.lower() in known_exts
|
||||
]
|
||||
if adjacent:
|
||||
findings["adjacent_subs"] = True
|
||||
findings["files_per_episode"] = len(adjacent)
|
||||
|
||||
return findings
|
||||
|
||||
def _match_pattern(self, findings: dict) -> tuple[SubtitlePattern | None, float]:
|
||||
"""Score all known patterns against the findings."""
|
||||
scores: list[tuple[float, SubtitlePattern]] = []
|
||||
|
||||
for pattern in self.kb.patterns().values():
|
||||
score = self._score(pattern, findings)
|
||||
scores.append((score, pattern))
|
||||
|
||||
if not scores:
|
||||
return None, 0.0
|
||||
|
||||
scores.sort(key=lambda x: x[0], reverse=True)
|
||||
best_score, best_pattern = scores[0]
|
||||
|
||||
if best_score < 0.4:
|
||||
return None, best_score
|
||||
|
||||
return best_pattern, best_score
|
||||
|
||||
def _score(self, pattern: SubtitlePattern, findings: dict) -> float:
|
||||
"""Return a 0.0–1.0 match score for this pattern against the findings."""
|
||||
score = 0.0
|
||||
total = 0.0
|
||||
|
||||
strategy = pattern.scan_strategy
|
||||
|
||||
if strategy == ScanStrategy.EMBEDDED:
|
||||
total += 1
|
||||
if findings.get("has_embedded"):
|
||||
score += 1.0
|
||||
if not findings.get("has_subs_folder") and not findings.get("adjacent_subs"):
|
||||
score += 0.5
|
||||
total += 0.5
|
||||
|
||||
elif strategy == ScanStrategy.EPISODE_SUBFOLDER:
|
||||
total += 3
|
||||
if findings.get("has_subs_folder"):
|
||||
score += 1.0
|
||||
if findings.get("subs_strategy") == "episode_subfolder":
|
||||
score += 2.0
|
||||
|
||||
elif strategy == ScanStrategy.FLAT:
|
||||
total += 2
|
||||
if findings.get("has_subs_folder"):
|
||||
score += 1.0
|
||||
if findings.get("subs_strategy") == "flat":
|
||||
score += 1.0
|
||||
|
||||
elif strategy == ScanStrategy.ADJACENT:
|
||||
total += 2
|
||||
if findings.get("adjacent_subs"):
|
||||
score += 1.0
|
||||
if not findings.get("has_subs_folder"):
|
||||
score += 1.0
|
||||
|
||||
return score / total if total > 0 else 0.0
|
||||
|
||||
def _describe(self, findings: dict) -> str:
|
||||
parts = []
|
||||
if findings.get("has_subs_folder"):
|
||||
strategy = findings.get("subs_strategy", "?")
|
||||
n = findings.get("files_per_episode", 0)
|
||||
parts.append(f"Subs/ folder found ({strategy}), ~{n} file(s) per episode")
|
||||
if findings.get("has_numeric_prefix"):
|
||||
parts.append("files have numeric prefix (e.g. 2_English.srt)")
|
||||
if findings.get("has_lang_tokens"):
|
||||
parts.append("language tokens found in filenames")
|
||||
elif findings.get("adjacent_subs"):
|
||||
parts.append("subtitle files adjacent to video")
|
||||
else:
|
||||
parts.append("no external subtitle files found")
|
||||
|
||||
if findings.get("has_embedded"):
|
||||
parts.append("embedded tracks detected (ffprobe)")
|
||||
|
||||
return " — ".join(parts) if parts else "nothing found"
|
||||
@@ -0,0 +1,93 @@
|
||||
"""SubtitlePlacer — hard-links matched subtitle tracks next to the destination video."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from ..entities import SubtitleTrack
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlacedTrack:
|
||||
source: Path
|
||||
destination: Path
|
||||
filename: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlaceResult:
|
||||
placed: list[PlacedTrack]
|
||||
skipped: list[tuple[SubtitleTrack, str]] # (track, reason)
|
||||
|
||||
@property
|
||||
def placed_count(self) -> int:
|
||||
return len(self.placed)
|
||||
|
||||
@property
|
||||
def skipped_count(self) -> int:
|
||||
return len(self.skipped)
|
||||
|
||||
|
||||
class SubtitlePlacer:
|
||||
"""
|
||||
Hard-links matched SubtitleTrack files next to a destination video.
|
||||
|
||||
Uses the same hard-link strategy as FileManager.copy_file:
|
||||
instant, no data duplication, qBittorrent keeps seeding.
|
||||
|
||||
Embedded tracks are skipped — nothing to place on disk.
|
||||
"""
|
||||
|
||||
def place(
|
||||
self,
|
||||
tracks: list[SubtitleTrack],
|
||||
destination_video: Path,
|
||||
) -> PlaceResult:
|
||||
placed: list[PlacedTrack] = []
|
||||
skipped: list[tuple[SubtitleTrack, str]] = []
|
||||
|
||||
dest_dir = destination_video.parent
|
||||
|
||||
for track in tracks:
|
||||
if track.is_embedded:
|
||||
logger.debug(f"SubtitlePlacer: skip embedded track ({track.language})")
|
||||
skipped.append((track, "embedded — no file to place"))
|
||||
continue
|
||||
|
||||
if not track.file_path or not track.file_path.exists():
|
||||
skipped.append((track, "source file not found"))
|
||||
continue
|
||||
|
||||
try:
|
||||
dest_name = track.destination_name
|
||||
except ValueError as e:
|
||||
skipped.append((track, str(e)))
|
||||
continue
|
||||
|
||||
dest_path = dest_dir / dest_name
|
||||
|
||||
if dest_path.exists():
|
||||
logger.debug(f"SubtitlePlacer: skip {dest_name} — already exists")
|
||||
skipped.append((track, "destination already exists"))
|
||||
continue
|
||||
|
||||
try:
|
||||
os.link(track.file_path, dest_path)
|
||||
placed.append(PlacedTrack(
|
||||
source=track.file_path,
|
||||
destination=dest_path,
|
||||
filename=dest_name,
|
||||
))
|
||||
logger.info(f"SubtitlePlacer: placed {dest_name}")
|
||||
except OSError as e:
|
||||
logger.warning(f"SubtitlePlacer: failed to place {dest_name}: {e}")
|
||||
skipped.append((track, str(e)))
|
||||
|
||||
logger.info(
|
||||
f"SubtitlePlacer: {len(placed)} placed, {len(skipped)} skipped "
|
||||
f"for {destination_video.name}"
|
||||
)
|
||||
return PlaceResult(placed=placed, skipped=skipped)
|
||||
@@ -0,0 +1,21 @@
|
||||
"""Subtitle service utilities."""
|
||||
|
||||
from ..entities import SubtitleTrack
|
||||
|
||||
|
||||
def available_subtitles(tracks: list[SubtitleTrack]) -> list[SubtitleTrack]:
|
||||
"""
|
||||
Return the distinct subtitle tracks available, deduped by (language, type).
|
||||
|
||||
Useful to display what is available for a media item regardless of user
|
||||
preferences — e.g. eng, eng.sdh, fra all show up as separate entries.
|
||||
"""
|
||||
seen: set[tuple] = set()
|
||||
result: list[SubtitleTrack] = []
|
||||
for track in tracks:
|
||||
lang = track.language.code if track.language else None
|
||||
key = (lang, track.subtitle_type)
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
result.append(track)
|
||||
return result
|
||||
Reference in New Issue
Block a user