Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 14941d47c0 | |||
| df798f55cc | |||
| 535935cc73 | |||
| 6e252d1e81 | |||
| 903e9e7117 | |||
| 9556bf9e08 | |||
| e6ee700825 | |||
| ced72547f7 | |||
| f338b08706 | |||
| da484d7474 | |||
| 481eeb5afd | |||
| 7cd24f3a31 | |||
| eb8995cfc3 |
@@ -112,6 +112,48 @@ callers).
|
||||
|
||||
### Internal
|
||||
|
||||
- **Domain I/O extraction** (`refactor/domain-io-extraction`): the domain
|
||||
layer no longer performs subprocess calls, filesystem scans, or YAML
|
||||
loading. Achieved in a series of focused commits:
|
||||
- **Knowledge YAML loaders moved to infrastructure**:
|
||||
`alfred/domain/release/knowledge.py`,
|
||||
`alfred/domain/shared/knowledge/language_registry.py`, and
|
||||
`alfred/domain/subtitles/knowledge/{base,loader}.py` relocated to
|
||||
`alfred/infrastructure/knowledge/`. Re-exports were dropped — callers
|
||||
import directly from the new location.
|
||||
- **`MediaProber` and `FilesystemScanner` Protocol ports** introduced at
|
||||
`alfred/domain/shared/ports/` with frozen-dataclass DTOs
|
||||
(`SubtitleStreamInfo`, `FileEntry`). `SubtitleIdentifier` and
|
||||
`PatternDetector` are now constructor-injected with concrete adapters
|
||||
(`FfprobeMediaProber` wrapping `subprocess.run(ffprobe)` and
|
||||
`PathlibFilesystemScanner` wrapping `pathlib`). No more direct
|
||||
`subprocess`/`pathlib` usage from the subtitle domain services.
|
||||
- **Live filesystem methods removed from VOs and entities**:
|
||||
`FilePath.exists()` / `.is_file()` / `.is_dir()` deleted —
|
||||
`FilePath` is now a pure address VO. `Movie.has_file()` and
|
||||
`Episode.is_downloaded()` dropped. Callers either rely on a prior
|
||||
detection step or use try/except over pre-checks (eliminates
|
||||
TOCTOU races).
|
||||
- **`SubtitlePlacer` moved to the application layer** at
|
||||
`alfred/application/subtitles/placer.py` — it performs `os.link`
|
||||
I/O, which doesn't belong in the domain. Pre-checks replaced with
|
||||
try/except for `FileNotFoundError`/`FileExistsError`.
|
||||
- **`SubtitleRuleSet.resolve()` no longer reaches into the knowledge
|
||||
base**: the implicit `DEFAULT_RULES()` helper is gone, replaced by
|
||||
an explicit `default_rules: SubtitleMatchingRules` parameter. The
|
||||
`ManageSubtitles` use case loads defaults from the KB once and
|
||||
passes them in.
|
||||
- **`SubtitleKnowledge` Protocol port** at
|
||||
`alfred/domain/subtitles/ports/knowledge.py` declares the read-only
|
||||
query surface domain services consume (7 methods:
|
||||
`known_extensions`, `format_for_extension`, `language_for_token`,
|
||||
`is_known_lang_token`, `type_for_token`, `is_known_type_token`,
|
||||
`patterns`). `SubtitleIdentifier` and `PatternDetector` depend on
|
||||
this Protocol instead of the concrete `SubtitleKnowledgeBase` from
|
||||
infrastructure — `domain/subtitles/` now has zero imports from
|
||||
`infrastructure/`. The remaining domain → infra leak
|
||||
(`domain/release/` loading separator YAML at import-time) is
|
||||
documented in tech-debt and scheduled for its own branch.
|
||||
- **`to_dot_folder_name(title)` helper** in
|
||||
`alfred/domain/shared/value_objects.py` — extracts the
|
||||
`re.sub(r"[^\w\s\.\-]", "", title).replace(" ", ".")` pattern that was
|
||||
|
||||
@@ -5,19 +5,21 @@ from pathlib import Path
|
||||
|
||||
from alfred.domain.shared.value_objects import ImdbId
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
|
||||
from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader
|
||||
from alfred.domain.subtitles.services.identifier import SubtitleIdentifier
|
||||
from alfred.domain.subtitles.services.matcher import SubtitleMatcher
|
||||
from alfred.domain.subtitles.services.pattern_detector import PatternDetector
|
||||
from alfred.domain.subtitles.services.placer import (
|
||||
from alfred.application.subtitles.placer import (
|
||||
PlacedTrack,
|
||||
SubtitlePlacer,
|
||||
_build_dest_name,
|
||||
)
|
||||
from alfred.domain.subtitles.services.utils import available_subtitles
|
||||
from alfred.domain.subtitles.value_objects import ScanStrategy
|
||||
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
|
||||
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
|
||||
from alfred.infrastructure.knowledge.subtitles.loader import KnowledgeLoader
|
||||
from alfred.infrastructure.persistence.context import get_memory
|
||||
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
|
||||
from alfred.infrastructure.subtitle.metadata_store import SubtitleMetadataStore
|
||||
from alfred.infrastructure.subtitle.rule_repository import RuleSetRepository
|
||||
|
||||
@@ -91,13 +93,21 @@ class ManageSubtitlesUseCase:
|
||||
)
|
||||
|
||||
kb = SubtitleKnowledgeBase(KnowledgeLoader())
|
||||
prober = FfprobeMediaProber()
|
||||
scanner = PathlibFilesystemScanner()
|
||||
library_root = _infer_library_root(dest_path, media_type)
|
||||
store = SubtitleMetadataStore(library_root)
|
||||
repo = RuleSetRepository(library_root)
|
||||
|
||||
# --- Pattern resolution ---
|
||||
pattern = self._resolve_pattern(
|
||||
kb, store, source_path, confirmed_pattern_id, release_group
|
||||
kb,
|
||||
prober,
|
||||
scanner,
|
||||
store,
|
||||
source_path,
|
||||
confirmed_pattern_id,
|
||||
release_group,
|
||||
)
|
||||
if pattern is None:
|
||||
return ManageSubtitlesResponse(
|
||||
@@ -108,7 +118,7 @@ class ManageSubtitlesUseCase:
|
||||
|
||||
# --- Identify ---
|
||||
media_id = _to_imdb_id(imdb_id)
|
||||
identifier = SubtitleIdentifier(kb)
|
||||
identifier = SubtitleIdentifier(kb, prober, scanner)
|
||||
metadata = identifier.identify(
|
||||
video_path=source_path,
|
||||
pattern=pattern,
|
||||
@@ -153,7 +163,7 @@ class ManageSubtitlesUseCase:
|
||||
subtitle_prefs = memory.ltm.subtitle_preferences
|
||||
except Exception:
|
||||
pass
|
||||
rules = repo.load(release_group, subtitle_prefs).resolve()
|
||||
rules = repo.load(release_group, subtitle_prefs).resolve(kb.default_rules())
|
||||
matcher = SubtitleMatcher()
|
||||
matched, unresolved = matcher.match(metadata.external_tracks, rules)
|
||||
|
||||
@@ -228,6 +238,8 @@ class ManageSubtitlesUseCase:
|
||||
def _resolve_pattern(
|
||||
self,
|
||||
kb: SubtitleKnowledgeBase,
|
||||
prober: FfprobeMediaProber,
|
||||
scanner: PathlibFilesystemScanner,
|
||||
store: SubtitleMetadataStore,
|
||||
source_path: Path,
|
||||
confirmed_pattern_id: str | None,
|
||||
@@ -250,7 +262,7 @@ class ManageSubtitlesUseCase:
|
||||
|
||||
# 3. Auto-detect
|
||||
release_root = source_path.parent
|
||||
detector = PatternDetector(kb)
|
||||
detector = PatternDetector(kb, prober, scanner)
|
||||
result = detector.detect(release_root, source_path)
|
||||
|
||||
if result["detected"] and result["confidence"] >= 0.6:
|
||||
|
||||
@@ -5,8 +5,8 @@ import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from ..entities import SubtitleCandidate
|
||||
from ..value_objects import SubtitleType
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.value_objects import SubtitleType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -78,8 +78,8 @@ class SubtitlePlacer:
|
||||
skipped.append((track, "embedded — no file to place"))
|
||||
continue
|
||||
|
||||
if not track.file_path or not track.file_path.exists():
|
||||
skipped.append((track, "source file not found"))
|
||||
if not track.file_path:
|
||||
skipped.append((track, "source file not set"))
|
||||
continue
|
||||
|
||||
try:
|
||||
@@ -90,11 +90,6 @@ class SubtitlePlacer:
|
||||
|
||||
dest_path = dest_dir / dest_name
|
||||
|
||||
if dest_path.exists():
|
||||
logger.debug(f"SubtitlePlacer: skip {dest_name} — already exists")
|
||||
skipped.append((track, "destination already exists"))
|
||||
continue
|
||||
|
||||
try:
|
||||
os.link(track.file_path, dest_path)
|
||||
placed.append(
|
||||
@@ -105,6 +100,11 @@ class SubtitlePlacer:
|
||||
)
|
||||
)
|
||||
logger.info(f"SubtitlePlacer: placed {dest_name}")
|
||||
except FileNotFoundError:
|
||||
skipped.append((track, "source file not found"))
|
||||
except FileExistsError:
|
||||
logger.debug(f"SubtitlePlacer: skip {dest_name} — already exists")
|
||||
skipped.append((track, "destination already exists"))
|
||||
except OSError as e:
|
||||
logger.warning(f"SubtitlePlacer: failed to place {dest_name}: {e}")
|
||||
skipped.append((track, str(e)))
|
||||
@@ -3,13 +3,13 @@
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
|
||||
from ..shared.media import AudioTrack, SubtitleTrack, track_lang_matches
|
||||
from ..shared.value_objects import FilePath, FileSize, ImdbId, Language
|
||||
from ..shared.media import AudioTrack, MediaWithTracks, SubtitleTrack
|
||||
from ..shared.value_objects import FilePath, FileSize, ImdbId
|
||||
from .value_objects import MovieTitle, Quality, ReleaseYear
|
||||
|
||||
|
||||
@dataclass
|
||||
class Movie:
|
||||
@dataclass(eq=False)
|
||||
class Movie(MediaWithTracks):
|
||||
"""
|
||||
Movie aggregate root for the movies domain.
|
||||
|
||||
@@ -20,6 +20,10 @@ class Movie:
|
||||
Track helpers follow the same "C+" contract as ``Episode``: pass a
|
||||
``Language`` for cross-format matching, or a ``str`` for case-insensitive
|
||||
direct comparison.
|
||||
|
||||
Equality is identity-based: two ``Movie`` instances are equal iff they
|
||||
share the same ``imdb_id``, regardless of file/track contents. This is
|
||||
the DDD aggregate invariant — the aggregate is identified by its root id.
|
||||
"""
|
||||
|
||||
imdb_id: ImdbId
|
||||
@@ -38,7 +42,7 @@ class Movie:
|
||||
# Ensure ImdbId is actually an ImdbId instance
|
||||
if not isinstance(self.imdb_id, ImdbId):
|
||||
if isinstance(self.imdb_id, str):
|
||||
object.__setattr__(self, "imdb_id", ImdbId(self.imdb_id))
|
||||
self.imdb_id = ImdbId(self.imdb_id)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"imdb_id must be ImdbId or str, got {type(self.imdb_id)}"
|
||||
@@ -47,55 +51,22 @@ class Movie:
|
||||
# Ensure MovieTitle is actually a MovieTitle instance
|
||||
if not isinstance(self.title, MovieTitle):
|
||||
if isinstance(self.title, str):
|
||||
object.__setattr__(self, "title", MovieTitle(self.title))
|
||||
self.title = MovieTitle(self.title)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"title must be MovieTitle or str, got {type(self.title)}"
|
||||
)
|
||||
|
||||
def has_file(self) -> bool:
|
||||
"""Check if the movie has an associated file."""
|
||||
return self.file_path is not None and self.file_path.exists()
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, Movie):
|
||||
return NotImplemented
|
||||
return self.imdb_id == other.imdb_id
|
||||
|
||||
def is_downloaded(self) -> bool:
|
||||
"""Check if the movie is downloaded (has a file)."""
|
||||
return self.has_file()
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.imdb_id)
|
||||
|
||||
# ── Audio helpers ──────────────────────────────────────────────────────
|
||||
|
||||
def has_audio_in(self, lang: str | Language) -> bool:
|
||||
"""True if at least one audio track is in the given language."""
|
||||
return any(track_lang_matches(t.language, lang) for t in self.audio_tracks)
|
||||
|
||||
def audio_languages(self) -> list[str]:
|
||||
"""Unique audio languages across all tracks, in track order."""
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for t in self.audio_tracks:
|
||||
if t.language and t.language not in seen:
|
||||
seen.add(t.language)
|
||||
result.append(t.language)
|
||||
return result
|
||||
|
||||
# ── Subtitle helpers ───────────────────────────────────────────────────
|
||||
|
||||
def has_subtitles_in(self, lang: str | Language) -> bool:
|
||||
"""True if at least one subtitle track is in the given language."""
|
||||
return any(track_lang_matches(t.language, lang) for t in self.subtitle_tracks)
|
||||
|
||||
def has_forced_subs(self) -> bool:
|
||||
"""True if at least one subtitle track is flagged as forced."""
|
||||
return any(t.is_forced for t in self.subtitle_tracks)
|
||||
|
||||
def subtitle_languages(self) -> list[str]:
|
||||
"""Unique subtitle languages across all tracks, in track order."""
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for t in self.subtitle_tracks:
|
||||
if t.language and t.language not in seen:
|
||||
seen.add(t.language)
|
||||
result.append(t.language)
|
||||
return result
|
||||
# Track helpers (has_audio_in / audio_languages / has_subtitles_in /
|
||||
# has_forced_subs / subtitle_languages) come from MediaWithTracks.
|
||||
|
||||
def get_folder_name(self) -> str:
|
||||
"""
|
||||
|
||||
@@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from .knowledge import load_separators
|
||||
from alfred.infrastructure.knowledge.release import load_separators
|
||||
from .value_objects import (
|
||||
_AUDIO,
|
||||
_CODECS,
|
||||
@@ -16,7 +16,9 @@ from .value_objects import (
|
||||
_RESOLUTIONS,
|
||||
_SOURCES,
|
||||
_VIDEO_META,
|
||||
MediaTypeToken,
|
||||
ParsedRelease,
|
||||
ParsePath,
|
||||
)
|
||||
|
||||
|
||||
@@ -39,12 +41,12 @@ def parse_release(name: str) -> ParsedRelease:
|
||||
and run token-level matchers (season/episode, tech, languages, audio,
|
||||
video, edition, title, year).
|
||||
"""
|
||||
parse_path = "direct"
|
||||
parse_path = ParsePath.DIRECT.value
|
||||
|
||||
# Always try to extract a bracket-enclosed site tag first.
|
||||
clean, site_tag = _strip_site_tag(name)
|
||||
if site_tag is not None:
|
||||
parse_path = "sanitized"
|
||||
parse_path = ParsePath.SANITIZED.value
|
||||
|
||||
if not _is_well_formed(clean):
|
||||
return ParsedRelease(
|
||||
@@ -60,9 +62,9 @@ def parse_release(name: str) -> ParsedRelease:
|
||||
codec=None,
|
||||
group="UNKNOWN",
|
||||
tech_string="",
|
||||
media_type="unknown",
|
||||
media_type=MediaTypeToken.UNKNOWN.value,
|
||||
site_tag=site_tag,
|
||||
parse_path="ai",
|
||||
parse_path=ParsePath.AI.value,
|
||||
)
|
||||
|
||||
name = clean
|
||||
@@ -137,19 +139,19 @@ def _infer_media_type(
|
||||
integrale_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("integrale", [])}
|
||||
|
||||
if upper_tokens & doc_tokens:
|
||||
return "documentary"
|
||||
return MediaTypeToken.DOCUMENTARY.value
|
||||
if upper_tokens & concert_tokens:
|
||||
return "concert"
|
||||
return MediaTypeToken.CONCERT.value
|
||||
if (
|
||||
edition in {"COMPLETE", "INTEGRALE", "COLLECTION"}
|
||||
or upper_tokens & integrale_tokens
|
||||
) and season is None:
|
||||
return "tv_complete"
|
||||
return MediaTypeToken.TV_COMPLETE.value
|
||||
if season is not None:
|
||||
return "tv_show"
|
||||
return MediaTypeToken.TV_SHOW.value
|
||||
if any([quality, source, codec, year]):
|
||||
return "movie"
|
||||
return "unknown"
|
||||
return MediaTypeToken.MOVIE.value
|
||||
return MediaTypeToken.UNKNOWN.value
|
||||
|
||||
|
||||
def _is_well_formed(name: str) -> bool:
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
|
||||
from .knowledge import (
|
||||
from ..shared.exceptions import ValidationError
|
||||
from alfred.infrastructure.knowledge.release import (
|
||||
load_audio,
|
||||
load_codecs,
|
||||
load_editions,
|
||||
@@ -50,6 +52,38 @@ def _sanitize_for_fs(text: str) -> str:
|
||||
return text.translate(_WIN_FORBIDDEN_TABLE)
|
||||
|
||||
|
||||
class MediaTypeToken(str, Enum):
|
||||
"""
|
||||
Canonical values for ``ParsedRelease.media_type``.
|
||||
|
||||
Inherits from ``str`` so existing string-based comparisons (``== "movie"``,
|
||||
JSON serialization, TMDB DTO interop) keep working unchanged. The enum
|
||||
serves both as documentation and as the set of valid values for
|
||||
``__post_init__`` validation.
|
||||
"""
|
||||
|
||||
MOVIE = "movie"
|
||||
TV_SHOW = "tv_show"
|
||||
TV_COMPLETE = "tv_complete"
|
||||
DOCUMENTARY = "documentary"
|
||||
CONCERT = "concert"
|
||||
OTHER = "other"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
class ParsePath(str, Enum):
|
||||
"""How a ``ParsedRelease`` was produced. ``str``-backed for the same
|
||||
reasons as :class:`MediaTypeToken`."""
|
||||
|
||||
DIRECT = "direct"
|
||||
SANITIZED = "sanitized"
|
||||
AI = "ai"
|
||||
|
||||
|
||||
_VALID_MEDIA_TYPES: frozenset[str] = frozenset(m.value for m in MediaTypeToken)
|
||||
_VALID_PARSE_PATHS: frozenset[str] = frozenset(p.value for p in ParsePath)
|
||||
|
||||
|
||||
def _strip_episode_from_normalized(normalized: str) -> str:
|
||||
"""
|
||||
Remove all episode parts (Exx) from a normalized release name, keeping Sxx.
|
||||
@@ -85,13 +119,11 @@ class ParsedRelease:
|
||||
codec: str | None # x265, HEVC, …
|
||||
group: str # release group, "UNKNOWN" if missing
|
||||
tech_string: str # quality.source.codec joined with dots
|
||||
media_type: str = (
|
||||
"unknown" # "movie" | "tv_show" | "tv_complete" | "other" | "unknown"
|
||||
)
|
||||
media_type: MediaTypeToken = MediaTypeToken.UNKNOWN
|
||||
site_tag: str | None = (
|
||||
None # site watermark stripped from name, e.g. "TGx", "OxTorrent.vc"
|
||||
)
|
||||
parse_path: str = "direct" # "direct" | "sanitized" | "ai"
|
||||
parse_path: ParsePath = ParsePath.DIRECT
|
||||
languages: list[str] = field(default_factory=list) # ["MULTI", "VFF"], ["FRENCH"], …
|
||||
audio_codec: str | None = None # "DTS-HD.MA", "DDP", "EAC3", …
|
||||
audio_channels: str | None = None # "5.1", "7.1", "2.0", …
|
||||
@@ -99,6 +131,51 @@ class ParsedRelease:
|
||||
hdr_format: str | None = None # "DV", "HDR10", "DV.HDR10", …
|
||||
edition: str | None = None # "UNRATED", "EXTENDED", "DIRECTORS.CUT", …
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.raw:
|
||||
raise ValidationError("ParsedRelease.raw cannot be empty")
|
||||
if not self.group:
|
||||
raise ValidationError("ParsedRelease.group cannot be empty")
|
||||
if self.year is not None and not (1888 <= self.year <= 2100):
|
||||
raise ValidationError(
|
||||
f"ParsedRelease.year out of range: {self.year}"
|
||||
)
|
||||
if self.season is not None and not (0 <= self.season <= 100):
|
||||
raise ValidationError(
|
||||
f"ParsedRelease.season out of range: {self.season}"
|
||||
)
|
||||
if self.episode is not None and not (0 <= self.episode <= 9999):
|
||||
raise ValidationError(
|
||||
f"ParsedRelease.episode out of range: {self.episode}"
|
||||
)
|
||||
if self.episode_end is not None:
|
||||
if not (0 <= self.episode_end <= 9999):
|
||||
raise ValidationError(
|
||||
f"ParsedRelease.episode_end out of range: {self.episode_end}"
|
||||
)
|
||||
if self.episode is not None and self.episode_end < self.episode:
|
||||
raise ValidationError(
|
||||
f"ParsedRelease.episode_end ({self.episode_end}) < "
|
||||
f"episode ({self.episode})"
|
||||
)
|
||||
# Coerce raw strings into their enum form (tolerant constructor).
|
||||
if not isinstance(self.media_type, MediaTypeToken):
|
||||
try:
|
||||
self.media_type = MediaTypeToken(self.media_type)
|
||||
except ValueError:
|
||||
raise ValidationError(
|
||||
f"ParsedRelease.media_type invalid: {self.media_type!r} "
|
||||
f"(expected one of {sorted(_VALID_MEDIA_TYPES)})"
|
||||
) from None
|
||||
if not isinstance(self.parse_path, ParsePath):
|
||||
try:
|
||||
self.parse_path = ParsePath(self.parse_path)
|
||||
except ValueError:
|
||||
raise ValidationError(
|
||||
f"ParsedRelease.parse_path invalid: {self.parse_path!r} "
|
||||
f"(expected one of {sorted(_VALID_PARSE_PATHS)})"
|
||||
) from None
|
||||
|
||||
@property
|
||||
def is_season_pack(self) -> bool:
|
||||
return self.season is not None and self.episode is None
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
"""Shared knowledge loaders (cross-domain)."""
|
||||
|
||||
from .language_registry import LanguageRegistry
|
||||
|
||||
__all__ = ["LanguageRegistry"]
|
||||
@@ -8,11 +8,13 @@ from .audio import AudioTrack
|
||||
from .info import MediaInfo
|
||||
from .matching import track_lang_matches
|
||||
from .subtitle import SubtitleTrack
|
||||
from .tracks_mixin import MediaWithTracks
|
||||
from .video import VideoTrack
|
||||
|
||||
__all__ = [
|
||||
"AudioTrack",
|
||||
"MediaInfo",
|
||||
"MediaWithTracks",
|
||||
"SubtitleTrack",
|
||||
"VideoTrack",
|
||||
"track_lang_matches",
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class AudioTrack:
|
||||
"""A single audio track as reported by ffprobe."""
|
||||
|
||||
|
||||
@@ -9,19 +9,21 @@ from .subtitle import SubtitleTrack
|
||||
from .video import VideoTrack
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class MediaInfo:
|
||||
"""
|
||||
File-level media metadata extracted by ffprobe.
|
||||
File-level media metadata extracted by ffprobe — immutable snapshot.
|
||||
|
||||
Symmetric design: every stream type is a list of typed track objects.
|
||||
Symmetric design: every stream type is a tuple of typed track objects
|
||||
(immutable on purpose — a MediaInfo is a frozen view of one ffprobe run,
|
||||
not a mutable collection to append to).
|
||||
Backwards-compatible flat accessors (``resolution``, ``width``, …) read
|
||||
from the first video track when present.
|
||||
"""
|
||||
|
||||
video_tracks: list[VideoTrack] = field(default_factory=list)
|
||||
audio_tracks: list[AudioTrack] = field(default_factory=list)
|
||||
subtitle_tracks: list[SubtitleTrack] = field(default_factory=list)
|
||||
video_tracks: tuple[VideoTrack, ...] = field(default_factory=tuple)
|
||||
audio_tracks: tuple[AudioTrack, ...] = field(default_factory=tuple)
|
||||
subtitle_tracks: tuple[SubtitleTrack, ...] = field(default_factory=tuple)
|
||||
|
||||
# File-level (from ffprobe ``format`` block, not from any single stream)
|
||||
duration_seconds: float | None = None
|
||||
|
||||
@@ -14,7 +14,7 @@ from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class SubtitleTrack:
|
||||
"""A single embedded subtitle track as reported by ffprobe."""
|
||||
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
"""Mixin shared by entities that carry audio + subtitle tracks.
|
||||
|
||||
Both ``Movie`` and ``Episode`` carry a ``list[AudioTrack]`` plus a
|
||||
``list[SubtitleTrack]`` and answer the same 5 queries about them (language
|
||||
presence, unique languages, forced flag). Keep that behavior in one place so a
|
||||
fix in one is a fix in both.
|
||||
|
||||
The mixin is plain Python (no dataclass machinery) so it composes cleanly with
|
||||
``@dataclass`` entities — it only reads ``self.audio_tracks`` and
|
||||
``self.subtitle_tracks`` which the host class provides as fields.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from ..value_objects import Language
|
||||
from .matching import track_lang_matches
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .audio import AudioTrack
|
||||
from .subtitle import SubtitleTrack
|
||||
|
||||
|
||||
class MediaWithTracks:
|
||||
"""
|
||||
Mixin providing audio/subtitle helpers for entities with track collections.
|
||||
|
||||
Hosts must expose two attributes:
|
||||
|
||||
* ``audio_tracks: list[AudioTrack]``
|
||||
* ``subtitle_tracks: list[SubtitleTrack]``
|
||||
|
||||
The helpers follow the "C+" matching contract: pass a :class:`Language`
|
||||
for cross-format matching, or a ``str`` for case-insensitive comparison.
|
||||
"""
|
||||
|
||||
# These attributes are provided by the host entity (Movie, Episode, …).
|
||||
# Declared here only for type-checkers and to make the contract explicit.
|
||||
audio_tracks: list["AudioTrack"]
|
||||
subtitle_tracks: list["SubtitleTrack"]
|
||||
|
||||
# ── Audio helpers ──────────────────────────────────────────────────────
|
||||
|
||||
def has_audio_in(self, lang: str | Language) -> bool:
|
||||
"""True if at least one audio track is in the given language."""
|
||||
return any(track_lang_matches(t.language, lang) for t in self.audio_tracks)
|
||||
|
||||
def audio_languages(self) -> list[str]:
|
||||
"""Unique audio languages across all tracks, in track order."""
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for t in self.audio_tracks:
|
||||
if t.language and t.language not in seen:
|
||||
seen.add(t.language)
|
||||
result.append(t.language)
|
||||
return result
|
||||
|
||||
# ── Subtitle helpers ───────────────────────────────────────────────────
|
||||
|
||||
def has_subtitles_in(self, lang: str | Language) -> bool:
|
||||
"""True if at least one subtitle track is in the given language."""
|
||||
return any(track_lang_matches(t.language, lang) for t in self.subtitle_tracks)
|
||||
|
||||
def has_forced_subs(self) -> bool:
|
||||
"""True if at least one subtitle track is flagged as forced."""
|
||||
return any(t.is_forced for t in self.subtitle_tracks)
|
||||
|
||||
def subtitle_languages(self) -> list[str]:
|
||||
"""Unique subtitle languages across all tracks, in track order."""
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for t in self.subtitle_tracks:
|
||||
if t.language and t.language not in seen:
|
||||
seen.add(t.language)
|
||||
result.append(t.language)
|
||||
return result
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class VideoTrack:
|
||||
"""A single video track as reported by ffprobe.
|
||||
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
"""Ports — Protocol interfaces the domain depends on.
|
||||
|
||||
Adapters live in ``alfred/infrastructure/`` and implement these protocols.
|
||||
Domain code never imports infrastructure; it accepts a port via constructor
|
||||
injection and calls it. Tests can pass in-memory fakes that satisfy the
|
||||
Protocol without going through real I/O.
|
||||
"""
|
||||
|
||||
from .filesystem_scanner import FileEntry, FilesystemScanner
|
||||
from .media_prober import MediaProber, SubtitleStreamInfo
|
||||
|
||||
__all__ = [
|
||||
"FileEntry",
|
||||
"FilesystemScanner",
|
||||
"MediaProber",
|
||||
"SubtitleStreamInfo",
|
||||
]
|
||||
@@ -0,0 +1,59 @@
|
||||
"""FilesystemScanner port — abstracts filesystem inspection.
|
||||
|
||||
The domain never calls ``Path.iterdir``, ``Path.is_file``, ``Path.stat`` or
|
||||
``open()`` directly. It asks the scanner for a ``FileEntry`` snapshot and
|
||||
reasons from there. One scan = one I/O round-trip; no callbacks back to disk.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Protocol
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FileEntry:
|
||||
"""Frozen snapshot of one filesystem entry, taken at scan time.
|
||||
|
||||
The entry carries enough metadata for the domain to classify and order
|
||||
files without re-querying the OS. ``size_kb`` is ``None`` for directories
|
||||
and for files whose size could not be read.
|
||||
"""
|
||||
|
||||
path: Path
|
||||
is_file: bool
|
||||
is_dir: bool
|
||||
size_kb: float | None
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.path.name
|
||||
|
||||
@property
|
||||
def stem(self) -> str:
|
||||
return self.path.stem
|
||||
|
||||
@property
|
||||
def suffix(self) -> str:
|
||||
return self.path.suffix
|
||||
|
||||
|
||||
class FilesystemScanner(Protocol):
|
||||
"""Read-only filesystem inspection."""
|
||||
|
||||
def scan_dir(self, path: Path) -> list[FileEntry]:
|
||||
"""Return sorted entries directly inside ``path``.
|
||||
|
||||
Returns an empty list when ``path`` is not a directory or is
|
||||
unreadable. Adapters must not raise.
|
||||
"""
|
||||
...
|
||||
|
||||
def stat(self, path: Path) -> FileEntry | None:
|
||||
"""Stat a single path; ``None`` when it doesn't exist or is unreadable."""
|
||||
...
|
||||
|
||||
def read_text(self, path: Path, encoding: str = "utf-8") -> str | None:
|
||||
"""Read a text file in one go; ``None`` on any error."""
|
||||
...
|
||||
@@ -0,0 +1,39 @@
|
||||
"""MediaProber port — abstracts media stream inspection (e.g. ffprobe).
|
||||
|
||||
The adapter (typically wrapping ffprobe) maps low-level container metadata
|
||||
into the small set of stream attributes the domain reasons about. Replacing
|
||||
ffprobe with another tool only requires a new adapter — domain stays put.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Protocol
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SubtitleStreamInfo:
|
||||
"""A single embedded subtitle stream, as seen by the prober.
|
||||
|
||||
``language`` is the raw language tag emitted by the container (typically
|
||||
ISO 639-2 like ``"fre"``, ``"eng"``); may be empty/None when the stream
|
||||
has no language tag. The domain resolves it to a canonical ``Language``
|
||||
via the knowledge base.
|
||||
"""
|
||||
|
||||
language: str | None
|
||||
is_hearing_impaired: bool
|
||||
is_forced: bool
|
||||
|
||||
|
||||
class MediaProber(Protocol):
|
||||
"""Inspect a media file's stream metadata."""
|
||||
|
||||
def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]:
|
||||
"""Return all subtitle streams in ``video``.
|
||||
|
||||
Returns an empty list when the file is missing, unreadable, or has
|
||||
no subtitle streams. Adapters must not raise.
|
||||
"""
|
||||
...
|
||||
@@ -67,18 +67,6 @@ class FilePath:
|
||||
# Use object.__setattr__ because dataclass is frozen
|
||||
object.__setattr__(self, "value", path_obj)
|
||||
|
||||
def exists(self) -> bool:
|
||||
"""Check if the path exists."""
|
||||
return self.value.exists()
|
||||
|
||||
def is_file(self) -> bool:
|
||||
"""Check if the path is a file."""
|
||||
return self.value.is_file()
|
||||
|
||||
def is_dir(self) -> bool:
|
||||
"""Check if the path is a directory."""
|
||||
return self.value.is_dir()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return str(self.value)
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
from .aggregates import SubtitleRuleSet
|
||||
from .entities import MediaSubtitleMetadata, SubtitleCandidate
|
||||
from .exceptions import SubtitleNotFound
|
||||
from .knowledge import KnowledgeLoader, SubtitleKnowledgeBase
|
||||
from .services import PatternDetector, SubtitleIdentifier, SubtitleMatcher
|
||||
from .value_objects import (
|
||||
RuleScope,
|
||||
@@ -20,8 +19,6 @@ __all__ = [
|
||||
"SubtitleCandidate",
|
||||
"MediaSubtitleMetadata",
|
||||
"SubtitleRuleSet",
|
||||
"SubtitleKnowledgeBase",
|
||||
"KnowledgeLoader",
|
||||
"SubtitleIdentifier",
|
||||
"SubtitleMatcher",
|
||||
"PatternDetector",
|
||||
|
||||
@@ -4,15 +4,9 @@ from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from ..shared.value_objects import ImdbId
|
||||
from .knowledge.base import SubtitleKnowledgeBase
|
||||
from .value_objects import RuleScope, SubtitleMatchingRules
|
||||
|
||||
|
||||
def DEFAULT_RULES() -> SubtitleMatchingRules:
|
||||
"""Load default matching rules from subtitles.yaml (defaults section)."""
|
||||
return SubtitleKnowledgeBase().default_rules()
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubtitleRuleSet:
|
||||
"""
|
||||
@@ -36,12 +30,18 @@ class SubtitleRuleSet:
|
||||
_format_priority: list[str] | None = field(default=None, repr=False)
|
||||
_min_confidence: float | None = field(default=None, repr=False)
|
||||
|
||||
def resolve(self) -> SubtitleMatchingRules:
|
||||
def resolve(self, default_rules: SubtitleMatchingRules) -> SubtitleMatchingRules:
|
||||
"""
|
||||
Walk the parent chain and merge deltas into effective rules.
|
||||
Falls back to DEFAULT_RULES at the top of the chain.
|
||||
|
||||
``default_rules`` seeds the top of the chain — it is the caller's
|
||||
responsibility to load these from the knowledge base (infrastructure).
|
||||
Keeping the default rules as a parameter keeps the domain free of
|
||||
any I/O dependency.
|
||||
"""
|
||||
base = self.parent.resolve() if self.parent else DEFAULT_RULES()
|
||||
base = (
|
||||
self.parent.resolve(default_rules) if self.parent else default_rules
|
||||
)
|
||||
return SubtitleMatchingRules(
|
||||
preferred_languages=self._languages or base.preferred_languages,
|
||||
preferred_formats=self._formats or base.preferred_formats,
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
"""Domain ports for the subtitles domain — Protocol-based abstractions
|
||||
that decouple domain services from concrete infrastructure adapters."""
|
||||
|
||||
from .knowledge import SubtitleKnowledge
|
||||
|
||||
__all__ = ["SubtitleKnowledge"]
|
||||
@@ -0,0 +1,38 @@
|
||||
"""SubtitleKnowledge port — the query surface domain services need from the
|
||||
subtitle knowledge base, expressed as a Protocol so the domain never imports
|
||||
the infrastructure adapter that backs it.
|
||||
|
||||
The concrete implementation lives in
|
||||
``alfred/infrastructure/knowledge/subtitles/base.py`` (the YAML-backed
|
||||
``SubtitleKnowledgeBase``). Tests can supply any object that satisfies this
|
||||
structural contract.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Protocol
|
||||
|
||||
from ..value_objects import SubtitleFormat, SubtitleLanguage, SubtitlePattern, SubtitleType
|
||||
|
||||
|
||||
class SubtitleKnowledge(Protocol):
|
||||
"""Read-only query surface for subtitle knowledge consumed by the domain.
|
||||
|
||||
Only the methods that domain services actually call belong here — anything
|
||||
else (defaults loading, reload, pattern groups, raw dicts) stays on the
|
||||
concrete class and is reserved for the application layer.
|
||||
"""
|
||||
|
||||
def known_extensions(self) -> set[str]: ...
|
||||
|
||||
def format_for_extension(self, ext: str) -> SubtitleFormat | None: ...
|
||||
|
||||
def language_for_token(self, token: str) -> SubtitleLanguage | None: ...
|
||||
|
||||
def is_known_lang_token(self, token: str) -> bool: ...
|
||||
|
||||
def type_for_token(self, token: str) -> SubtitleType | None: ...
|
||||
|
||||
def is_known_type_token(self, token: str) -> bool: ...
|
||||
|
||||
def patterns(self) -> dict[str, SubtitlePattern]: ...
|
||||
@@ -1,207 +0,0 @@
|
||||
"""SubtitleScanner — inspects local subtitle files and filters them per user preferences.
|
||||
|
||||
Given a video file path, the scanner:
|
||||
1. Looks for subtitle files in the same directory as the video.
|
||||
2. Optionally also inspects a Subs/ subfolder adjacent to the video.
|
||||
3. Classifies each file (language, SDH, forced) from its filename, delegating
|
||||
all token knowledge to SubtitleKnowledgeBase (which itself merges
|
||||
LanguageRegistry + subtitle-specific tokens from subtitles.yaml).
|
||||
4. Filters according to SubtitlePreferences (languages, min_size_kb, keep_sdh,
|
||||
keep_forced).
|
||||
5. Returns a list of SubtitleCandidate — one per file that passes the filter,
|
||||
with the destination filename already computed.
|
||||
|
||||
Filename classification heuristics
|
||||
-----------------------------------
|
||||
We parse the stem of each subtitle file looking for known patterns:
|
||||
|
||||
fre.srt → lang=fre, sdh=False, forced=False
|
||||
fre.sdh.srt → lang=fre, sdh=True
|
||||
fre.forced.srt → lang=fre, forced=True
|
||||
Breaking.Bad.S01E01.French.srt → lang=fre (alias match via LanguageRegistry)
|
||||
Breaking.Bad.S01E01.VOSTFR.srt → lang=fre (subtitle-specific token)
|
||||
|
||||
ISO 639-2/B codes are used throughout (matching the project-wide canonical form
|
||||
from iso_languages.yaml — what ffprobe emits).
|
||||
|
||||
Output naming convention (matches SubtitlePreferences docstring):
|
||||
{lang}.srt
|
||||
{lang}.sdh.srt
|
||||
{lang}.forced.srt
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .knowledge.base import SubtitleKnowledgeBase
|
||||
from .value_objects import SubtitleType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_TOKEN_SPLIT = re.compile(r"[\.\s_\-]+")
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubtitleCandidate:
|
||||
"""A subtitle file that passed the filter, ready to be placed."""
|
||||
|
||||
source_path: Path
|
||||
language: str # ISO 639-2/B code, e.g. "fre"
|
||||
is_sdh: bool
|
||||
is_forced: bool
|
||||
extension: str # e.g. ".srt"
|
||||
|
||||
@property
|
||||
def destination_name(self) -> str:
|
||||
"""
|
||||
Compute the destination filename per naming convention:
|
||||
{lang}.srt
|
||||
{lang}.sdh.srt
|
||||
{lang}.forced.srt
|
||||
"""
|
||||
ext = self.extension.lstrip(".")
|
||||
parts = [self.language]
|
||||
if self.is_sdh:
|
||||
parts.append("sdh")
|
||||
elif self.is_forced:
|
||||
parts.append("forced")
|
||||
return ".".join(parts) + "." + ext
|
||||
|
||||
|
||||
# Module-level KB instance — built lazily on first use to avoid loading YAML at import.
|
||||
_KB: SubtitleKnowledgeBase | None = None
|
||||
|
||||
|
||||
def _kb() -> SubtitleKnowledgeBase:
|
||||
global _KB # noqa: PLW0603 — intentional lazy module-level cache
|
||||
if _KB is None:
|
||||
_KB = SubtitleKnowledgeBase()
|
||||
return _KB
|
||||
|
||||
|
||||
def _classify(path: Path) -> tuple[str | None, bool, bool]:
|
||||
"""
|
||||
Parse a subtitle filename and return (language_code, is_sdh, is_forced).
|
||||
|
||||
``language_code`` is the ISO 639-2/B canonical code (e.g. ``"fre"``).
|
||||
Returns (None, False, False) if the language cannot be determined.
|
||||
"""
|
||||
stem = path.stem.lower()
|
||||
tokens = _TOKEN_SPLIT.split(stem)
|
||||
kb = _kb()
|
||||
|
||||
language: str | None = None
|
||||
is_sdh = False
|
||||
is_forced = False
|
||||
|
||||
for token in tokens:
|
||||
if not token:
|
||||
continue
|
||||
if language is None:
|
||||
lang = kb.language_for_token(token)
|
||||
if lang is not None:
|
||||
language = lang.code
|
||||
continue
|
||||
stype = kb.type_for_token(token)
|
||||
if stype is SubtitleType.SDH:
|
||||
is_sdh = True
|
||||
elif stype is SubtitleType.FORCED:
|
||||
is_forced = True
|
||||
|
||||
return language, is_sdh, is_forced
|
||||
|
||||
|
||||
class SubtitleScanner:
|
||||
"""
|
||||
Scans subtitle files next to a video and filters them per SubtitlePreferences.
|
||||
|
||||
Usage:
|
||||
scanner = SubtitleScanner(prefs)
|
||||
candidates = scanner.scan(video_path)
|
||||
# Each candidate has .source_path and .destination_name
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, languages: list[str], min_size_kb: int, keep_sdh: bool, keep_forced: bool
|
||||
):
|
||||
self.languages = [lang.lower() for lang in languages]
|
||||
self.min_size_kb = min_size_kb
|
||||
self.keep_sdh = keep_sdh
|
||||
self.keep_forced = keep_forced
|
||||
self._kb = _kb()
|
||||
self._subtitle_extensions = {e.lower() for e in self._kb.known_extensions()}
|
||||
|
||||
def scan(self, video_path: Path) -> list[SubtitleCandidate]:
|
||||
"""
|
||||
Return all subtitle candidates found next to the video that pass the filter.
|
||||
|
||||
Scans:
|
||||
- Same directory as the video (flat siblings)
|
||||
- Subs/ subfolder if present
|
||||
"""
|
||||
candidates: list[SubtitleCandidate] = []
|
||||
search_dirs = [video_path.parent]
|
||||
|
||||
subs_dir = video_path.parent / "Subs"
|
||||
if subs_dir.is_dir():
|
||||
search_dirs.append(subs_dir)
|
||||
logger.debug(f"SubtitleScanner: found Subs/ folder at {subs_dir}")
|
||||
|
||||
for directory in search_dirs:
|
||||
for path in sorted(directory.iterdir()):
|
||||
if not path.is_file():
|
||||
continue
|
||||
if path.suffix.lower() not in self._subtitle_extensions:
|
||||
continue
|
||||
|
||||
candidate = self._evaluate(path)
|
||||
if candidate is not None:
|
||||
candidates.append(candidate)
|
||||
|
||||
logger.info(
|
||||
f"SubtitleScanner: {len(candidates)} candidate(s) found for {video_path.name}"
|
||||
)
|
||||
return candidates
|
||||
|
||||
def _evaluate(self, path: Path) -> SubtitleCandidate | None:
|
||||
"""Apply all filters to a single subtitle file. Returns None if it should be dropped."""
|
||||
# Size filter
|
||||
size_kb = path.stat().st_size / 1024
|
||||
if size_kb < self.min_size_kb:
|
||||
logger.debug(
|
||||
f"SubtitleScanner: skip {path.name} (too small: {size_kb:.1f} KB)"
|
||||
)
|
||||
return None
|
||||
|
||||
language, is_sdh, is_forced = _classify(path)
|
||||
|
||||
# Language filter
|
||||
if language is None:
|
||||
logger.debug(f"SubtitleScanner: skip {path.name} (language unknown)")
|
||||
return None
|
||||
|
||||
if language not in self.languages:
|
||||
logger.debug(
|
||||
f"SubtitleScanner: skip {path.name} (language '{language}' not in prefs)"
|
||||
)
|
||||
return None
|
||||
|
||||
# SDH filter
|
||||
if is_sdh and not self.keep_sdh:
|
||||
logger.debug(f"SubtitleScanner: skip {path.name} (SDH not wanted)")
|
||||
return None
|
||||
|
||||
# Forced filter
|
||||
if is_forced and not self.keep_forced:
|
||||
logger.debug(f"SubtitleScanner: skip {path.name} (forced not wanted)")
|
||||
return None
|
||||
|
||||
return SubtitleCandidate(
|
||||
source_path=path,
|
||||
language=language,
|
||||
is_sdh=is_sdh,
|
||||
is_forced=is_forced,
|
||||
extension=path.suffix.lower(),
|
||||
)
|
||||
@@ -1,13 +1,9 @@
|
||||
from .identifier import SubtitleIdentifier
|
||||
from .matcher import SubtitleMatcher
|
||||
from .pattern_detector import PatternDetector
|
||||
from .placer import PlacedTrack, PlaceResult, SubtitlePlacer
|
||||
|
||||
__all__ = [
|
||||
"SubtitleIdentifier",
|
||||
"SubtitleMatcher",
|
||||
"PatternDetector",
|
||||
"SubtitlePlacer",
|
||||
"PlacedTrack",
|
||||
"PlaceResult",
|
||||
]
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
"""SubtitleIdentifier — finds and classifies all subtitle tracks for a video file."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from ...shared.ports import FilesystemScanner, MediaProber
|
||||
from ..ports import SubtitleKnowledge
|
||||
from ...shared.value_objects import ImdbId
|
||||
from ..entities import MediaSubtitleMetadata, SubtitleCandidate
|
||||
from ..knowledge.base import SubtitleKnowledgeBase
|
||||
from ..value_objects import ScanStrategy, SubtitlePattern, SubtitleType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -37,17 +36,14 @@ def _tokenize_suffix(stem: str, episode_stem: str) -> list[str]:
|
||||
return _tokenize(stem)
|
||||
|
||||
|
||||
def _count_entries(path: Path) -> int:
|
||||
"""Return the entry count of an SRT file by finding the last cue number."""
|
||||
try:
|
||||
with open(path, encoding="utf-8", errors="replace") as f:
|
||||
lines = f.read().splitlines()
|
||||
for line in reversed(lines):
|
||||
if line.strip().isdigit():
|
||||
return int(line.strip())
|
||||
return 0
|
||||
except Exception:
|
||||
return 0
|
||||
def _count_entries(text: str | None) -> int | None:
|
||||
"""Return the entry count of an SRT body by finding the last cue number."""
|
||||
if text is None:
|
||||
return None
|
||||
for line in reversed(text.splitlines()):
|
||||
if line.strip().isdigit():
|
||||
return int(line.strip())
|
||||
return 0
|
||||
|
||||
|
||||
class SubtitleIdentifier:
|
||||
@@ -60,8 +56,15 @@ class SubtitleIdentifier:
|
||||
the caller (use case) decides whether to ask the user for clarification.
|
||||
"""
|
||||
|
||||
def __init__(self, kb: SubtitleKnowledgeBase):
|
||||
def __init__(
|
||||
self,
|
||||
kb: SubtitleKnowledge,
|
||||
prober: MediaProber,
|
||||
scanner: FilesystemScanner,
|
||||
):
|
||||
self.kb = kb
|
||||
self.prober = prober
|
||||
self.scanner = scanner
|
||||
|
||||
def identify(
|
||||
self,
|
||||
@@ -88,52 +91,21 @@ class SubtitleIdentifier:
|
||||
return metadata
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Embedded tracks — ffprobe
|
||||
# Embedded tracks — via MediaProber
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _scan_embedded(self, video_path: Path) -> list[SubtitleCandidate]:
|
||||
if not video_path.exists():
|
||||
return []
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"quiet",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_streams",
|
||||
"-select_streams",
|
||||
"s",
|
||||
str(video_path),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
check=False,
|
||||
)
|
||||
data = json.loads(result.stdout)
|
||||
except (
|
||||
subprocess.TimeoutExpired,
|
||||
json.JSONDecodeError,
|
||||
FileNotFoundError,
|
||||
) as e:
|
||||
logger.debug(
|
||||
f"SubtitleIdentifier: ffprobe failed for {video_path.name}: {e}"
|
||||
)
|
||||
return []
|
||||
streams = self.prober.list_subtitle_streams(video_path)
|
||||
|
||||
tracks = []
|
||||
for stream in data.get("streams", []):
|
||||
tags = stream.get("tags", {})
|
||||
disposition = stream.get("disposition", {})
|
||||
lang_code = tags.get("language", "")
|
||||
for stream in streams:
|
||||
lang = (
|
||||
self.kb.language_for_token(stream.language) if stream.language else None
|
||||
)
|
||||
|
||||
lang = self.kb.language_for_token(lang_code) if lang_code else None
|
||||
|
||||
if disposition.get("hearing_impaired"):
|
||||
if stream.is_hearing_impaired:
|
||||
stype = SubtitleType.SDH
|
||||
elif disposition.get("forced"):
|
||||
elif stream.is_forced:
|
||||
stype = SubtitleType.FORCED
|
||||
else:
|
||||
stype = SubtitleType.STANDARD
|
||||
@@ -144,7 +116,7 @@ class SubtitleIdentifier:
|
||||
format=None,
|
||||
subtitle_type=stype,
|
||||
is_embedded=True,
|
||||
raw_tokens=[lang_code] if lang_code else [],
|
||||
raw_tokens=[stream.language] if stream.language else [],
|
||||
)
|
||||
)
|
||||
|
||||
@@ -176,57 +148,47 @@ class SubtitleIdentifier:
|
||||
|
||||
return self._classify_files(candidates, pattern, episode_stem=episode_stem)
|
||||
|
||||
def _find_adjacent(self, video_path: Path) -> list[Path]:
|
||||
def _find_adjacent(self, video_path: Path) -> list:
|
||||
known = self.kb.known_extensions()
|
||||
return [
|
||||
p
|
||||
for p in sorted(video_path.parent.iterdir())
|
||||
if p.is_file()
|
||||
and p.suffix.lower() in self.kb.known_extensions()
|
||||
and p.stem != video_path.stem
|
||||
entry
|
||||
for entry in self.scanner.scan_dir(video_path.parent)
|
||||
if entry.is_file
|
||||
and entry.suffix.lower() in known
|
||||
and entry.stem != video_path.stem
|
||||
]
|
||||
|
||||
def _find_flat(self, video_path: Path, root_folder: str) -> list[Path]:
|
||||
subs_dir = video_path.parent / root_folder
|
||||
if not subs_dir.is_dir():
|
||||
# Also look at release root (one level up)
|
||||
subs_dir = video_path.parent.parent / root_folder
|
||||
if not subs_dir.is_dir():
|
||||
return []
|
||||
return [
|
||||
p
|
||||
for p in sorted(subs_dir.iterdir())
|
||||
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
|
||||
]
|
||||
def _find_flat(self, video_path: Path, root_folder: str) -> list:
|
||||
known = self.kb.known_extensions()
|
||||
# Adjacent first, then release root (one level up)
|
||||
for subs_dir in (
|
||||
video_path.parent / root_folder,
|
||||
video_path.parent.parent / root_folder,
|
||||
):
|
||||
entries = self.scanner.scan_dir(subs_dir)
|
||||
if entries:
|
||||
return [
|
||||
e for e in entries if e.is_file and e.suffix.lower() in known
|
||||
]
|
||||
return []
|
||||
|
||||
def _find_episode_subfolder(
|
||||
self, video_path: Path, root_folder: str
|
||||
) -> tuple[list[Path], str]:
|
||||
"""
|
||||
Look for Subs/{episode_stem}/*.srt
|
||||
|
||||
Checks two locations:
|
||||
1. Adjacent to the video: video_path.parent / root_folder / video_path.stem
|
||||
2. Release root (one level up): video_path.parent.parent / root_folder / video_path.stem
|
||||
|
||||
Returns (files, episode_stem) so the classifier can strip the prefix.
|
||||
"""
|
||||
) -> tuple[list, str]:
|
||||
"""Look for Subs/{episode_stem}/*.srt — adjacent or one level up."""
|
||||
episode_stem = video_path.stem
|
||||
candidates_dirs = [
|
||||
known = self.kb.known_extensions()
|
||||
for subs_dir in (
|
||||
video_path.parent / root_folder / episode_stem,
|
||||
video_path.parent.parent / root_folder / episode_stem,
|
||||
]
|
||||
for subs_dir in candidates_dirs:
|
||||
if subs_dir.is_dir():
|
||||
files = [
|
||||
p
|
||||
for p in sorted(subs_dir.iterdir())
|
||||
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
|
||||
]
|
||||
if files:
|
||||
logger.debug(
|
||||
f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}"
|
||||
)
|
||||
return files, episode_stem
|
||||
):
|
||||
entries = self.scanner.scan_dir(subs_dir)
|
||||
files = [e for e in entries if e.is_file and e.suffix.lower() in known]
|
||||
if files:
|
||||
logger.debug(
|
||||
f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}"
|
||||
)
|
||||
return files, episode_stem
|
||||
return [], episode_stem
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
@@ -235,14 +197,13 @@ class SubtitleIdentifier:
|
||||
|
||||
def _classify_files(
|
||||
self,
|
||||
paths: list[Path],
|
||||
entries: list,
|
||||
pattern: SubtitlePattern,
|
||||
episode_stem: str | None = None,
|
||||
) -> list[SubtitleCandidate]:
|
||||
tracks = []
|
||||
for path in paths:
|
||||
track = self._classify_single(path, episode_stem=episode_stem)
|
||||
tracks.append(track)
|
||||
tracks = [
|
||||
self._classify_single(entry, episode_stem=episode_stem) for entry in entries
|
||||
]
|
||||
|
||||
# Post-process: if multiple tracks share same language but type is ambiguous,
|
||||
# apply size_and_count disambiguation
|
||||
@@ -252,13 +213,13 @@ class SubtitleIdentifier:
|
||||
return tracks
|
||||
|
||||
def _classify_single(
|
||||
self, path: Path, episode_stem: str | None = None
|
||||
self, entry, episode_stem: str | None = None
|
||||
) -> SubtitleCandidate:
|
||||
fmt = self.kb.format_for_extension(path.suffix)
|
||||
fmt = self.kb.format_for_extension(entry.suffix)
|
||||
tokens = (
|
||||
_tokenize_suffix(path.stem, episode_stem)
|
||||
_tokenize_suffix(entry.stem, episode_stem)
|
||||
if episode_stem
|
||||
else _tokenize(path.stem)
|
||||
else _tokenize(entry.stem)
|
||||
)
|
||||
|
||||
language = None
|
||||
@@ -284,19 +245,21 @@ class SubtitleIdentifier:
|
||||
|
||||
if unknown_tokens:
|
||||
logger.debug(
|
||||
f"SubtitleIdentifier: unknown tokens in '{path.name}': {unknown_tokens}"
|
||||
f"SubtitleIdentifier: unknown tokens in '{entry.name}': {unknown_tokens}"
|
||||
)
|
||||
|
||||
size_kb = path.stat().st_size / 1024 if path.exists() else None
|
||||
entry_count = _count_entries(path) if path.exists() else None
|
||||
# Entry count: only meaningful for SRT files; read text on demand.
|
||||
entry_count: int | None = None
|
||||
if entry.suffix.lower() == ".srt":
|
||||
entry_count = _count_entries(self.scanner.read_text(entry.path))
|
||||
|
||||
return SubtitleCandidate(
|
||||
language=language,
|
||||
format=fmt,
|
||||
subtitle_type=subtitle_type,
|
||||
is_embedded=False,
|
||||
file_path=path,
|
||||
file_size_kb=size_kb,
|
||||
file_path=entry.path,
|
||||
file_size_kb=entry.size_kb,
|
||||
entry_count=entry_count,
|
||||
confidence=confidence,
|
||||
raw_tokens=tokens,
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
"""PatternDetector — discovers the subtitle structure of a release folder."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from ..knowledge.base import SubtitleKnowledgeBase
|
||||
from ...shared.ports import FilesystemScanner, MediaProber
|
||||
from ..ports import SubtitleKnowledge
|
||||
from ..value_objects import ScanStrategy, SubtitlePattern
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -20,8 +19,15 @@ class PatternDetector:
|
||||
a release follows. The result is proposed to the user for confirmation.
|
||||
"""
|
||||
|
||||
def __init__(self, kb: SubtitleKnowledgeBase):
|
||||
def __init__(
|
||||
self,
|
||||
kb: SubtitleKnowledge,
|
||||
prober: MediaProber,
|
||||
scanner: FilesystemScanner,
|
||||
):
|
||||
self.kb = kb
|
||||
self.prober = prober
|
||||
self.scanner = scanner
|
||||
|
||||
def detect(self, release_root: Path, sample_video: Path) -> dict:
|
||||
"""
|
||||
@@ -45,29 +51,7 @@ class PatternDetector:
|
||||
}
|
||||
|
||||
def _has_embedded_subtitles(self, video_path: Path) -> bool:
|
||||
"""Run ffprobe to check whether the video has embedded subtitle streams."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"quiet",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_streams",
|
||||
"-select_streams",
|
||||
"s",
|
||||
str(video_path),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
check=False,
|
||||
)
|
||||
data = json.loads(result.stdout)
|
||||
return len(data.get("streams", [])) > 0
|
||||
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
|
||||
return False
|
||||
return len(self.prober.list_subtitle_streams(video_path)) > 0
|
||||
|
||||
def _inspect(self, release_root: Path, sample_video: Path) -> dict:
|
||||
"""Gather structural facts about the release."""
|
||||
@@ -84,61 +68,59 @@ class PatternDetector:
|
||||
}
|
||||
|
||||
# Check for Subs/ folder — adjacent or at release root
|
||||
for subs_candidate in [
|
||||
for subs_candidate in (
|
||||
sample_video.parent / "Subs",
|
||||
release_root / "Subs",
|
||||
]:
|
||||
if subs_candidate.is_dir():
|
||||
findings["has_subs_folder"] = True
|
||||
findings["subs_root"] = str(subs_candidate)
|
||||
):
|
||||
children = self.scanner.scan_dir(subs_candidate)
|
||||
if not children:
|
||||
continue
|
||||
|
||||
# Is it flat or episode_subfolder?
|
||||
children = list(subs_candidate.iterdir())
|
||||
sub_files = [
|
||||
c
|
||||
for c in children
|
||||
if c.is_file() and c.suffix.lower() in known_exts
|
||||
findings["has_subs_folder"] = True
|
||||
findings["subs_root"] = str(subs_candidate)
|
||||
|
||||
# Is it flat or episode_subfolder?
|
||||
sub_files = [
|
||||
c for c in children if c.is_file and c.suffix.lower() in known_exts
|
||||
]
|
||||
sub_dirs = [c for c in children if c.is_dir]
|
||||
|
||||
if sub_dirs and not sub_files:
|
||||
findings["subs_strategy"] = "episode_subfolder"
|
||||
# Count files in a sample subfolder
|
||||
sample_files = [
|
||||
f
|
||||
for f in self.scanner.scan_dir(sub_dirs[0].path)
|
||||
if f.is_file and f.suffix.lower() in known_exts
|
||||
]
|
||||
sub_dirs = [c for c in children if c.is_dir()]
|
||||
|
||||
if sub_dirs and not sub_files:
|
||||
findings["subs_strategy"] = "episode_subfolder"
|
||||
# Count files in a sample subfolder
|
||||
sample_sub = sub_dirs[0]
|
||||
sample_files = [
|
||||
f
|
||||
for f in sample_sub.iterdir()
|
||||
if f.is_file() and f.suffix.lower() in known_exts
|
||||
]
|
||||
findings["files_per_episode"] = len(sample_files)
|
||||
# Check naming conventions
|
||||
for f in sample_files:
|
||||
stem = f.stem
|
||||
parts = stem.split("_")
|
||||
if parts[0].isdigit():
|
||||
findings["has_numeric_prefix"] = True
|
||||
if any(
|
||||
self.kb.is_known_lang_token(t.lower())
|
||||
for t in stem.replace("_", ".").split(".")
|
||||
):
|
||||
findings["has_lang_tokens"] = True
|
||||
else:
|
||||
findings["subs_strategy"] = "flat"
|
||||
findings["files_per_episode"] = len(sub_files)
|
||||
for f in sub_files:
|
||||
if any(
|
||||
self.kb.is_known_lang_token(t.lower())
|
||||
for t in f.stem.replace("_", ".").split(".")
|
||||
):
|
||||
findings["has_lang_tokens"] = True
|
||||
break
|
||||
findings["files_per_episode"] = len(sample_files)
|
||||
# Check naming conventions
|
||||
for f in sample_files:
|
||||
parts = f.stem.split("_")
|
||||
if parts[0].isdigit():
|
||||
findings["has_numeric_prefix"] = True
|
||||
if any(
|
||||
self.kb.is_known_lang_token(t.lower())
|
||||
for t in f.stem.replace("_", ".").split(".")
|
||||
):
|
||||
findings["has_lang_tokens"] = True
|
||||
else:
|
||||
findings["subs_strategy"] = "flat"
|
||||
findings["files_per_episode"] = len(sub_files)
|
||||
for f in sub_files:
|
||||
if any(
|
||||
self.kb.is_known_lang_token(t.lower())
|
||||
for t in f.stem.replace("_", ".").split(".")
|
||||
):
|
||||
findings["has_lang_tokens"] = True
|
||||
break
|
||||
|
||||
# Check adjacent subs (next to the video)
|
||||
if not findings["has_subs_folder"]:
|
||||
adjacent = [
|
||||
p
|
||||
for p in sample_video.parent.iterdir()
|
||||
if p.is_file() and p.suffix.lower() in known_exts
|
||||
e
|
||||
for e in self.scanner.scan_dir(sample_video.parent)
|
||||
if e.is_file and e.suffix.lower() in known_exts
|
||||
]
|
||||
if adjacent:
|
||||
findings["adjacent_subs"] = True
|
||||
@@ -221,6 +203,6 @@ class PatternDetector:
|
||||
parts.append("no external subtitle files found")
|
||||
|
||||
if findings.get("has_embedded"):
|
||||
parts.append("embedded tracks detected (ffprobe)")
|
||||
parts.append("embedded tracks detected")
|
||||
|
||||
return " — ".join(parts) if parts else "nothing found"
|
||||
|
||||
@@ -28,12 +28,11 @@ from __future__ import annotations
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from ..shared.media import AudioTrack, SubtitleTrack, track_lang_matches
|
||||
from ..shared.media import AudioTrack, MediaWithTracks, SubtitleTrack
|
||||
from ..shared.value_objects import (
|
||||
FilePath,
|
||||
FileSize,
|
||||
ImdbId,
|
||||
Language,
|
||||
to_dot_folder_name,
|
||||
)
|
||||
from .value_objects import (
|
||||
@@ -48,8 +47,8 @@ from .value_objects import (
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
|
||||
@dataclass
|
||||
class Episode:
|
||||
@dataclass(eq=False)
|
||||
class Episode(MediaWithTracks):
|
||||
"""
|
||||
A single episode of a TV show — leaf of the TVShow aggregate.
|
||||
|
||||
@@ -57,6 +56,11 @@ class Episode:
|
||||
(audio + subtitle). Track lists are populated by the ffprobe + subtitle
|
||||
scan pipeline; they may be empty when the episode is known but not yet
|
||||
scanned, or when no file is downloaded yet.
|
||||
|
||||
Equality is identity-based within the aggregate: two ``Episode`` instances
|
||||
are equal iff they share the same ``(season_number, episode_number)``,
|
||||
regardless of title/file/track contents. The root TVShow guarantees
|
||||
cross-show uniqueness.
|
||||
"""
|
||||
|
||||
season_number: SeasonNumber
|
||||
@@ -76,51 +80,19 @@ class Episode:
|
||||
if isinstance(self.episode_number, int):
|
||||
self.episode_number = EpisodeNumber(self.episode_number)
|
||||
|
||||
# ── File presence ──────────────────────────────────────────────────────
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, Episode):
|
||||
return NotImplemented
|
||||
return (
|
||||
self.season_number == other.season_number
|
||||
and self.episode_number == other.episode_number
|
||||
)
|
||||
|
||||
def has_file(self) -> bool:
|
||||
"""True if a file path is set and the file actually exists on disk."""
|
||||
return self.file_path is not None and self.file_path.exists()
|
||||
def __hash__(self) -> int:
|
||||
return hash((self.season_number, self.episode_number))
|
||||
|
||||
def is_downloaded(self) -> bool:
|
||||
"""Alias of ``has_file()`` — reads better in collection-status contexts."""
|
||||
return self.has_file()
|
||||
|
||||
# ── Audio helpers ──────────────────────────────────────────────────────
|
||||
|
||||
def has_audio_in(self, lang: str | Language) -> bool:
|
||||
"""True if at least one audio track is in the given language."""
|
||||
return any(track_lang_matches(t.language, lang) for t in self.audio_tracks)
|
||||
|
||||
def audio_languages(self) -> list[str]:
|
||||
"""Unique audio languages across all tracks, in track order."""
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for t in self.audio_tracks:
|
||||
if t.language and t.language not in seen:
|
||||
seen.add(t.language)
|
||||
result.append(t.language)
|
||||
return result
|
||||
|
||||
# ── Subtitle helpers ───────────────────────────────────────────────────
|
||||
|
||||
def has_subtitles_in(self, lang: str | Language) -> bool:
|
||||
"""True if at least one subtitle track is in the given language."""
|
||||
return any(track_lang_matches(t.language, lang) for t in self.subtitle_tracks)
|
||||
|
||||
def has_forced_subs(self) -> bool:
|
||||
"""True if at least one subtitle track is flagged as forced."""
|
||||
return any(t.is_forced for t in self.subtitle_tracks)
|
||||
|
||||
def subtitle_languages(self) -> list[str]:
|
||||
"""Unique subtitle languages across all tracks, in track order."""
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for t in self.subtitle_tracks:
|
||||
if t.language and t.language not in seen:
|
||||
seen.add(t.language)
|
||||
result.append(t.language)
|
||||
return result
|
||||
# Track helpers (has_audio_in / audio_languages / has_subtitles_in /
|
||||
# has_forced_subs / subtitle_languages) come from MediaWithTracks.
|
||||
|
||||
# ── Naming ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -57,27 +57,31 @@ def _parse(data: dict) -> MediaInfo:
|
||||
streams = data.get("streams", [])
|
||||
fmt = data.get("format", {})
|
||||
|
||||
info = MediaInfo()
|
||||
|
||||
# File-level duration/bitrate (ffprobe ``format`` block — independent of streams)
|
||||
duration_seconds: float | None = None
|
||||
bitrate_kbps: int | None = None
|
||||
if "duration" in fmt:
|
||||
try:
|
||||
info.duration_seconds = float(fmt["duration"])
|
||||
duration_seconds = float(fmt["duration"])
|
||||
except ValueError:
|
||||
pass
|
||||
if "bit_rate" in fmt:
|
||||
try:
|
||||
info.bitrate_kbps = int(fmt["bit_rate"]) // 1000
|
||||
bitrate_kbps = int(fmt["bit_rate"]) // 1000
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
video_tracks: list[VideoTrack] = []
|
||||
audio_tracks: list[AudioTrack] = []
|
||||
subtitle_tracks: list[SubtitleTrack] = []
|
||||
|
||||
for stream in streams:
|
||||
codec_type = stream.get("codec_type")
|
||||
|
||||
if codec_type == "video":
|
||||
info.video_tracks.append(
|
||||
video_tracks.append(
|
||||
VideoTrack(
|
||||
index=stream.get("index", len(info.video_tracks)),
|
||||
index=stream.get("index", len(video_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
width=stream.get("width"),
|
||||
height=stream.get("height"),
|
||||
@@ -86,9 +90,9 @@ def _parse(data: dict) -> MediaInfo:
|
||||
)
|
||||
|
||||
elif codec_type == "audio":
|
||||
info.audio_tracks.append(
|
||||
audio_tracks.append(
|
||||
AudioTrack(
|
||||
index=stream.get("index", len(info.audio_tracks)),
|
||||
index=stream.get("index", len(audio_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
channels=stream.get("channels"),
|
||||
channel_layout=stream.get("channel_layout"),
|
||||
@@ -98,9 +102,9 @@ def _parse(data: dict) -> MediaInfo:
|
||||
)
|
||||
|
||||
elif codec_type == "subtitle":
|
||||
info.subtitle_tracks.append(
|
||||
subtitle_tracks.append(
|
||||
SubtitleTrack(
|
||||
index=stream.get("index", len(info.subtitle_tracks)),
|
||||
index=stream.get("index", len(subtitle_tracks)),
|
||||
codec=stream.get("codec_name"),
|
||||
language=stream.get("tags", {}).get("language"),
|
||||
is_default=stream.get("disposition", {}).get("default", 0) == 1,
|
||||
@@ -108,4 +112,10 @@ def _parse(data: dict) -> MediaInfo:
|
||||
)
|
||||
)
|
||||
|
||||
return info
|
||||
return MediaInfo(
|
||||
video_tracks=tuple(video_tracks),
|
||||
audio_tracks=tuple(audio_tracks),
|
||||
subtitle_tracks=tuple(subtitle_tracks),
|
||||
duration_seconds=duration_seconds,
|
||||
bitrate_kbps=bitrate_kbps,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
"""PathlibFilesystemScanner — FilesystemScanner adapter backed by pathlib."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.shared.ports import FileEntry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PathlibFilesystemScanner:
|
||||
"""Read-only filesystem scanner using ``pathlib``.
|
||||
|
||||
Implements :class:`alfred.domain.shared.ports.FilesystemScanner`
|
||||
structurally. Never raises — failures are logged and surfaced as
|
||||
empty results.
|
||||
"""
|
||||
|
||||
def scan_dir(self, path: Path) -> list[FileEntry]:
|
||||
try:
|
||||
if not path.is_dir():
|
||||
return []
|
||||
children = sorted(path.iterdir())
|
||||
except OSError as e:
|
||||
logger.debug(f"PathlibFilesystemScanner: scan_dir failed for {path}: {e}")
|
||||
return []
|
||||
|
||||
entries: list[FileEntry] = []
|
||||
for child in children:
|
||||
entry = self._make_entry(child)
|
||||
if entry is not None:
|
||||
entries.append(entry)
|
||||
return entries
|
||||
|
||||
def stat(self, path: Path) -> FileEntry | None:
|
||||
return self._make_entry(path)
|
||||
|
||||
def read_text(self, path: Path, encoding: str = "utf-8") -> str | None:
|
||||
try:
|
||||
with open(path, encoding=encoding, errors="replace") as f:
|
||||
return f.read()
|
||||
except OSError as e:
|
||||
logger.debug(f"PathlibFilesystemScanner: read_text failed for {path}: {e}")
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _make_entry(self, path: Path) -> FileEntry | None:
|
||||
try:
|
||||
is_file = path.is_file()
|
||||
is_dir = path.is_dir()
|
||||
except OSError:
|
||||
return None
|
||||
if not (is_file or is_dir):
|
||||
return None
|
||||
|
||||
size_kb: float | None = None
|
||||
if is_file:
|
||||
try:
|
||||
size_kb = path.stat().st_size / 1024
|
||||
except OSError:
|
||||
size_kb = None
|
||||
|
||||
return FileEntry(path=path, is_file=is_file, is_dir=is_dir, size_kb=size_kb)
|
||||
@@ -0,0 +1,6 @@
|
||||
"""Knowledge loaders — YAML I/O kept out of the domain layer.
|
||||
|
||||
Each submodule reads its YAML files from ``alfred/knowledge/`` (builtin,
|
||||
versioned) and ``data/knowledge/`` (learned, gitignored), and exposes plain
|
||||
Python values (sets, dicts, classes) for domain code to consume.
|
||||
"""
|
||||
+1
-1
@@ -13,7 +13,7 @@ import yaml
|
||||
|
||||
import alfred as _alfred_pkg
|
||||
|
||||
from ..value_objects import Language
|
||||
from alfred.domain.shared.value_objects import Language
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
+2
-2
@@ -2,8 +2,8 @@
|
||||
|
||||
import logging
|
||||
|
||||
from ...shared.knowledge.language_registry import LanguageRegistry
|
||||
from ..value_objects import (
|
||||
from alfred.infrastructure.knowledge.language_registry import LanguageRegistry
|
||||
from alfred.domain.subtitles.value_objects import (
|
||||
ScanStrategy,
|
||||
SubtitleFormat,
|
||||
SubtitleLanguage,
|
||||
@@ -0,0 +1,5 @@
|
||||
"""Media probing adapters — concrete implementations of MediaProber."""
|
||||
|
||||
from .ffprobe_prober import FfprobeMediaProber
|
||||
|
||||
__all__ = ["FfprobeMediaProber"]
|
||||
@@ -0,0 +1,65 @@
|
||||
"""FfprobeMediaProber — MediaProber adapter backed by the ffprobe CLI."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.shared.ports import SubtitleStreamInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_FFPROBE_TIMEOUT_SECONDS = 30
|
||||
|
||||
|
||||
class FfprobeMediaProber:
|
||||
"""Inspect media files by shelling out to ``ffprobe``.
|
||||
|
||||
Implements :class:`alfred.domain.shared.ports.MediaProber` structurally.
|
||||
Never raises — failures are logged and surfaced as empty results.
|
||||
"""
|
||||
|
||||
def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]:
|
||||
if not video.exists():
|
||||
return []
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"quiet",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_streams",
|
||||
"-select_streams",
|
||||
"s",
|
||||
str(video),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=_FFPROBE_TIMEOUT_SECONDS,
|
||||
check=False,
|
||||
)
|
||||
data = json.loads(result.stdout)
|
||||
except (
|
||||
subprocess.TimeoutExpired,
|
||||
json.JSONDecodeError,
|
||||
FileNotFoundError,
|
||||
) as e:
|
||||
logger.debug(f"FfprobeMediaProber: ffprobe failed for {video.name}: {e}")
|
||||
return []
|
||||
|
||||
streams: list[SubtitleStreamInfo] = []
|
||||
for stream in data.get("streams", []):
|
||||
tags = stream.get("tags", {}) or {}
|
||||
disposition = stream.get("disposition", {}) or {}
|
||||
streams.append(
|
||||
SubtitleStreamInfo(
|
||||
language=tags.get("language") or None,
|
||||
is_hearing_impaired=bool(disposition.get("hearing_impaired")),
|
||||
is_forced=bool(disposition.get("forced")),
|
||||
)
|
||||
)
|
||||
return streams
|
||||
@@ -14,7 +14,7 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.services.placer import PlacedTrack
|
||||
from alfred.application.subtitles.placer import PlacedTrack
|
||||
from alfred.infrastructure.metadata.store import MetadataStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -54,7 +54,7 @@ class RuleSetRepository:
|
||||
Build and return the resolved RuleSet chain.
|
||||
|
||||
If subtitle_preferences is provided, it seeds the global base rule set
|
||||
from LTM (overriding the hardcoded DEFAULT_RULES).
|
||||
from LTM (overriding the knowledge-base defaults at resolve time).
|
||||
Returns global default if no overrides exist.
|
||||
"""
|
||||
base = SubtitleRuleSet.global_default()
|
||||
|
||||
@@ -41,7 +41,7 @@ from alfred.application.filesystem.manage_subtitles import (
|
||||
_to_unresolved_dto,
|
||||
)
|
||||
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleCandidate
|
||||
from alfred.domain.subtitles.services.placer import PlacedTrack, PlaceResult
|
||||
from alfred.application.subtitles.placer import PlacedTrack, PlaceResult
|
||||
from alfred.domain.subtitles.value_objects import (
|
||||
ScanStrategy,
|
||||
SubtitleFormat,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Tests for ``alfred.domain.subtitles.services.placer.SubtitlePlacer``.
|
||||
"""Tests for ``alfred.application.subtitles.placer.SubtitlePlacer``.
|
||||
|
||||
The placer hard-links subtitle files next to a destination video, naming
|
||||
them ``{video_stem}.{lang}[.sdh|.forced].{ext}``.
|
||||
@@ -22,7 +22,7 @@ from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.services.placer import (
|
||||
from alfred.application.subtitles.placer import (
|
||||
PlacedTrack,
|
||||
PlaceResult,
|
||||
SubtitlePlacer,
|
||||
@@ -198,7 +198,7 @@ class TestOSError:
|
||||
video.write_bytes(b"")
|
||||
track = _track(src)
|
||||
with patch(
|
||||
"alfred.domain.subtitles.services.placer.os.link",
|
||||
"alfred.application.subtitles.placer.os.link",
|
||||
side_effect=OSError("cross-device link"),
|
||||
):
|
||||
result = placer.place([track], video)
|
||||
@@ -72,23 +72,6 @@ class TestFilePath:
|
||||
with pytest.raises(ValidationError):
|
||||
FilePath(123) # type: ignore
|
||||
|
||||
def test_exists_true(self, tmp_path):
|
||||
p = FilePath(tmp_path)
|
||||
assert p.exists()
|
||||
|
||||
def test_exists_false(self, tmp_path):
|
||||
p = FilePath(tmp_path / "nonexistent")
|
||||
assert not p.exists()
|
||||
|
||||
def test_is_file(self, tmp_path):
|
||||
f = tmp_path / "file.txt"
|
||||
f.write_text("x")
|
||||
assert FilePath(f).is_file()
|
||||
assert not FilePath(tmp_path).is_file()
|
||||
|
||||
def test_is_dir(self, tmp_path):
|
||||
assert FilePath(tmp_path).is_dir()
|
||||
|
||||
def test_str(self, tmp_path):
|
||||
p = FilePath(tmp_path)
|
||||
assert str(p) == str(tmp_path)
|
||||
|
||||
@@ -22,8 +22,8 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from alfred.domain.shared.ports import FileEntry
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
|
||||
from alfred.domain.subtitles.services.identifier import (
|
||||
SubtitleIdentifier,
|
||||
_count_entries,
|
||||
@@ -37,6 +37,19 @@ from alfred.domain.subtitles.value_objects import (
|
||||
SubtitleType,
|
||||
TypeDetectionMethod,
|
||||
)
|
||||
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
|
||||
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
|
||||
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
|
||||
|
||||
|
||||
def _file_entry(path) -> FileEntry:
|
||||
"""Helper: build a FileEntry from a real tmp_path Path."""
|
||||
return FileEntry(
|
||||
path=path,
|
||||
is_file=path.is_file(),
|
||||
is_dir=path.is_dir(),
|
||||
size_kb=(path.stat().st_size / 1024) if path.is_file() else None,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@@ -46,7 +59,7 @@ def kb():
|
||||
|
||||
@pytest.fixture
|
||||
def identifier(kb):
|
||||
return SubtitleIdentifier(kb)
|
||||
return SubtitleIdentifier(kb, FfprobeMediaProber(), PathlibFilesystemScanner())
|
||||
|
||||
|
||||
def _pattern(
|
||||
@@ -103,23 +116,19 @@ class TestTokenize:
|
||||
|
||||
|
||||
class TestCountEntries:
|
||||
def test_last_cue_number(self, tmp_path):
|
||||
srt = tmp_path / "x.srt"
|
||||
srt.write_text(
|
||||
def test_last_cue_number(self):
|
||||
text = (
|
||||
"1\n00:00:01,000 --> 00:00:02,000\nHello\n\n"
|
||||
"2\n00:00:03,000 --> 00:00:04,000\nWorld\n\n"
|
||||
"42\n00:00:05,000 --> 00:00:06,000\nLast\n",
|
||||
encoding="utf-8",
|
||||
"42\n00:00:05,000 --> 00:00:06,000\nLast\n"
|
||||
)
|
||||
assert _count_entries(srt) == 42
|
||||
assert _count_entries(text) == 42
|
||||
|
||||
def test_missing_file_returns_zero(self, tmp_path):
|
||||
assert _count_entries(tmp_path / "nope.srt") == 0
|
||||
def test_missing_file_returns_none(self):
|
||||
assert _count_entries(None) is None
|
||||
|
||||
def test_empty_file_returns_zero(self, tmp_path):
|
||||
f = tmp_path / "x.srt"
|
||||
f.write_text("")
|
||||
assert _count_entries(f) == 0
|
||||
def test_empty_file_returns_zero(self):
|
||||
assert _count_entries("") == 0
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
@@ -135,7 +144,7 @@ class TestEmbedded:
|
||||
video = tmp_path / "v.mkv"
|
||||
video.write_bytes(b"")
|
||||
with patch(
|
||||
"alfred.domain.subtitles.services.identifier.subprocess.run",
|
||||
"alfred.infrastructure.probe.ffprobe_prober.subprocess.run",
|
||||
side_effect=FileNotFoundError("no ffprobe"),
|
||||
):
|
||||
assert identifier._scan_embedded(video) == []
|
||||
@@ -156,7 +165,7 @@ class TestEmbedded:
|
||||
stdout = fake_output
|
||||
|
||||
with patch(
|
||||
"alfred.domain.subtitles.services.identifier.subprocess.run",
|
||||
"alfred.infrastructure.probe.ffprobe_prober.subprocess.run",
|
||||
return_value=FakeResult(),
|
||||
):
|
||||
tracks = identifier._scan_embedded(video)
|
||||
@@ -256,7 +265,7 @@ class TestClassify:
|
||||
def test_classifies_language_and_format(self, identifier, tmp_path):
|
||||
f = tmp_path / "Show.S01E01.English.srt"
|
||||
f.write_text("1\n00:00:01,000 --> 00:00:02,000\nHi\n")
|
||||
track = identifier._classify_single(f)
|
||||
track = identifier._classify_single(_file_entry(f))
|
||||
assert track.language.code == "eng"
|
||||
assert track.format.id == "srt"
|
||||
assert track.confidence > 0
|
||||
@@ -265,13 +274,13 @@ class TestClassify:
|
||||
def test_classifies_type_token(self, identifier, tmp_path):
|
||||
f = tmp_path / "Show.S01E01.English.sdh.srt"
|
||||
f.write_text("")
|
||||
track = identifier._classify_single(f)
|
||||
track = identifier._classify_single(_file_entry(f))
|
||||
assert track.subtitle_type == SubtitleType.SDH
|
||||
|
||||
def test_unknown_tokens_lower_confidence(self, identifier, tmp_path):
|
||||
f = tmp_path / "Show.S01E01.gibberish.srt"
|
||||
f.write_text("")
|
||||
track = identifier._classify_single(f)
|
||||
track = identifier._classify_single(_file_entry(f))
|
||||
# No lang/type recognized → confidence is 0 or very low.
|
||||
assert track.language is None
|
||||
assert track.confidence < 0.5
|
||||
@@ -279,7 +288,9 @@ class TestClassify:
|
||||
def test_episode_stem_prefix_stripped(self, identifier, tmp_path):
|
||||
f = tmp_path / "Show.S01E01.English.srt"
|
||||
f.write_text("")
|
||||
track = identifier._classify_single(f, episode_stem="Show.S01E01")
|
||||
track = identifier._classify_single(
|
||||
_file_entry(f), episode_stem="Show.S01E01"
|
||||
)
|
||||
# Only "english" remains as meaningful token → confidence == 1.0
|
||||
assert track.language.code == "eng"
|
||||
assert track.confidence == 1.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Tests for ``alfred.domain.subtitles.knowledge`` (loader + base).
|
||||
"""Tests for ``alfred.infrastructure.knowledge.subtitles`` (loader + base).
|
||||
|
||||
Covers:
|
||||
|
||||
@@ -19,9 +19,9 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from alfred.domain.subtitles.knowledge import loader as loader_mod
|
||||
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
|
||||
from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader, _merge
|
||||
from alfred.infrastructure.knowledge.subtitles import loader as loader_mod
|
||||
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
|
||||
from alfred.infrastructure.knowledge.subtitles.loader import KnowledgeLoader, _merge
|
||||
from alfred.domain.subtitles.value_objects import (
|
||||
ScanStrategy,
|
||||
SubtitleType,
|
||||
|
||||
@@ -25,8 +25,10 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
|
||||
from alfred.domain.subtitles.services.pattern_detector import PatternDetector
|
||||
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
|
||||
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
|
||||
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@@ -36,7 +38,7 @@ def kb():
|
||||
|
||||
@pytest.fixture
|
||||
def detector(kb):
|
||||
return PatternDetector(kb)
|
||||
return PatternDetector(kb, FfprobeMediaProber(), PathlibFilesystemScanner())
|
||||
|
||||
|
||||
def _make_video(folder: Path, name: str = "Show.S01E01.mkv") -> Path:
|
||||
|
||||
@@ -1,232 +0,0 @@
|
||||
"""Tests for SubtitleScanner and _classify helper."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.subtitles.scanner import (
|
||||
SubtitleCandidate,
|
||||
SubtitleScanner,
|
||||
_classify,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _classify — unit tests for the filename parser
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestClassify:
|
||||
def test_iso_lang_code_639_1_alias(self, tmp_path):
|
||||
# ``fr`` is an alias of the canonical ISO 639-2/B code ``fre``.
|
||||
p = tmp_path / "fr.srt"
|
||||
p.write_text("")
|
||||
lang, is_sdh, is_forced = _classify(p)
|
||||
assert lang == "fre"
|
||||
assert not is_sdh
|
||||
assert not is_forced
|
||||
|
||||
def test_english_keyword(self, tmp_path):
|
||||
p = tmp_path / "english.srt"
|
||||
p.write_text("")
|
||||
lang, _, _ = _classify(p)
|
||||
assert lang == "eng"
|
||||
|
||||
def test_french_keyword(self, tmp_path):
|
||||
p = tmp_path / "Show.S01E01.French.srt"
|
||||
p.write_text("")
|
||||
lang, _, _ = _classify(p)
|
||||
assert lang == "fre"
|
||||
|
||||
def test_vostfr_is_french(self, tmp_path):
|
||||
p = tmp_path / "Show.S01E01.VOSTFR.srt"
|
||||
p.write_text("")
|
||||
lang, _, _ = _classify(p)
|
||||
assert lang == "fre"
|
||||
|
||||
def test_sdh_token(self, tmp_path):
|
||||
p = tmp_path / "fre.sdh.srt"
|
||||
p.write_text("")
|
||||
lang, is_sdh, _ = _classify(p)
|
||||
assert lang == "fre"
|
||||
assert is_sdh
|
||||
|
||||
def test_hi_no_longer_marks_sdh(self, tmp_path):
|
||||
# ``hi`` is the ISO 639-1 alias for Hindi; it must not mark a file as
|
||||
# SDH any more (regression of the previous collision between SDH and
|
||||
# Hindi tokens). Use ``sdh`` / ``cc`` / ``hearing`` to flag SDH instead.
|
||||
p = tmp_path / "en.hi.srt"
|
||||
p.write_text("")
|
||||
lang, is_sdh, _ = _classify(p)
|
||||
assert lang == "eng"
|
||||
assert not is_sdh
|
||||
|
||||
def test_forced_token(self, tmp_path):
|
||||
p = tmp_path / "fre.forced.srt"
|
||||
p.write_text("")
|
||||
_, _, is_forced = _classify(p)
|
||||
assert is_forced
|
||||
|
||||
def test_unknown_language_returns_none(self, tmp_path):
|
||||
p = tmp_path / "Show.S01E01.720p.srt"
|
||||
p.write_text("")
|
||||
lang, _, _ = _classify(p)
|
||||
assert lang is None
|
||||
|
||||
def test_dot_separator(self, tmp_path):
|
||||
p = tmp_path / "fre.sdh.srt"
|
||||
p.write_text("")
|
||||
lang, is_sdh, _ = _classify(p)
|
||||
assert lang == "fre"
|
||||
assert is_sdh
|
||||
|
||||
def test_hyphen_separator(self, tmp_path):
|
||||
p = tmp_path / "fre-forced.srt"
|
||||
p.write_text("")
|
||||
lang, _, is_forced = _classify(p)
|
||||
assert lang == "fre"
|
||||
assert is_forced
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubtitleCandidate.destination_name
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSubtitleCandidateDestinationName:
|
||||
def _make(self, lang="fre", is_sdh=False, is_forced=False, ext=".srt", path=None):
|
||||
return SubtitleCandidate(
|
||||
source_path=path or Path("/fake/fre.srt"),
|
||||
language=lang,
|
||||
is_sdh=is_sdh,
|
||||
is_forced=is_forced,
|
||||
extension=ext,
|
||||
)
|
||||
|
||||
def test_standard(self):
|
||||
assert self._make().destination_name == "fre.srt"
|
||||
|
||||
def test_sdh(self):
|
||||
assert self._make(is_sdh=True).destination_name == "fre.sdh.srt"
|
||||
|
||||
def test_forced(self):
|
||||
assert self._make(is_forced=True).destination_name == "fre.forced.srt"
|
||||
|
||||
def test_ass_extension(self):
|
||||
assert self._make(ext=".ass").destination_name == "fre.ass"
|
||||
|
||||
def test_english_standard(self):
|
||||
assert self._make(lang="eng").destination_name == "eng.srt"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubtitleScanner — integration with real filesystem
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSubtitleScanner:
|
||||
def _scanner(self, languages=None, min_size_kb=0, keep_sdh=True, keep_forced=True):
|
||||
return SubtitleScanner(
|
||||
languages=languages or ["fre", "eng"],
|
||||
min_size_kb=min_size_kb,
|
||||
keep_sdh=keep_sdh,
|
||||
keep_forced=keep_forced,
|
||||
)
|
||||
|
||||
def _video(self, tmp_path):
|
||||
video = tmp_path / "Movie.mkv"
|
||||
video.write_bytes(b"video")
|
||||
return video
|
||||
|
||||
def test_finds_adjacent_subtitle(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "fre.srt").write_text("subtitle content")
|
||||
|
||||
candidates = self._scanner().scan(video)
|
||||
|
||||
assert len(candidates) == 1
|
||||
assert candidates[0].language == "fre"
|
||||
|
||||
def test_finds_adjacent_subtitle_legacy_639_1(self, tmp_path):
|
||||
# Reading existing media libraries: ``fr.srt`` is still recognized as
|
||||
# French and classified canonically as ``fre`` — covers user libraries
|
||||
# written before the ISO 639-2/B migration.
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "fr.srt").write_text("subtitle content")
|
||||
|
||||
candidates = self._scanner().scan(video)
|
||||
|
||||
assert len(candidates) == 1
|
||||
assert candidates[0].language == "fre"
|
||||
|
||||
def test_finds_multiple_languages(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "fre.srt").write_text("fr subtitle")
|
||||
(tmp_path / "eng.srt").write_text("en subtitle")
|
||||
|
||||
candidates = self._scanner().scan(video)
|
||||
langs = {c.language for c in candidates}
|
||||
assert langs == {"fre", "eng"}
|
||||
|
||||
def test_scans_subs_subfolder(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
subs = tmp_path / "Subs"
|
||||
subs.mkdir()
|
||||
(subs / "fre.srt").write_text("subtitle")
|
||||
|
||||
candidates = self._scanner().scan(video)
|
||||
assert any(c.language == "fre" for c in candidates)
|
||||
|
||||
def test_filters_unknown_language(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "unknown.srt").write_text("subtitle")
|
||||
|
||||
candidates = self._scanner().scan(video)
|
||||
assert len(candidates) == 0
|
||||
|
||||
def test_filters_wrong_language(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "ger.srt").write_text("german subtitle")
|
||||
|
||||
candidates = self._scanner(languages=["fre"]).scan(video)
|
||||
assert len(candidates) == 0
|
||||
|
||||
def test_filters_too_small_file(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
small = tmp_path / "fre.srt"
|
||||
small.write_bytes(b"x") # 1 byte, well below any min_size_kb
|
||||
|
||||
candidates = self._scanner(min_size_kb=10).scan(video)
|
||||
assert len(candidates) == 0
|
||||
|
||||
def test_filters_sdh_when_not_wanted(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "fre.sdh.srt").write_text("sdh subtitle")
|
||||
|
||||
candidates = self._scanner(keep_sdh=False).scan(video)
|
||||
assert len(candidates) == 0
|
||||
|
||||
def test_filters_forced_when_not_wanted(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "fre.forced.srt").write_text("forced subtitle")
|
||||
|
||||
candidates = self._scanner(keep_forced=False).scan(video)
|
||||
assert len(candidates) == 0
|
||||
|
||||
def test_keeps_sdh_when_wanted(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "fre.sdh.srt").write_text("sdh subtitle")
|
||||
|
||||
candidates = self._scanner(keep_sdh=True).scan(video)
|
||||
assert len(candidates) == 1
|
||||
assert candidates[0].is_sdh
|
||||
|
||||
def test_ignores_non_subtitle_files(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
(tmp_path / "fre.nfo").write_text("nfo file")
|
||||
(tmp_path / "fre.jpg").write_bytes(b"image")
|
||||
|
||||
candidates = self._scanner().scan(video)
|
||||
assert len(candidates) == 0
|
||||
|
||||
def test_returns_empty_when_no_subtitles(self, tmp_path):
|
||||
video = self._video(tmp_path)
|
||||
candidates = self._scanner().scan(video)
|
||||
assert candidates == []
|
||||
@@ -30,9 +30,19 @@ from alfred.domain.subtitles.value_objects import (
|
||||
RuleScope,
|
||||
SubtitleFormat,
|
||||
SubtitleLanguage,
|
||||
SubtitleMatchingRules,
|
||||
SubtitleType,
|
||||
)
|
||||
|
||||
# Test fixture: stand-in for what the KB would provide at runtime.
|
||||
_DEFAULT_RULES = SubtitleMatchingRules(
|
||||
preferred_languages=["eng"],
|
||||
preferred_formats=["srt"],
|
||||
allowed_types=["standard"],
|
||||
format_priority=["srt", "ass"],
|
||||
min_confidence=0.7,
|
||||
)
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Value objects #
|
||||
# --------------------------------------------------------------------------- #
|
||||
@@ -230,18 +240,17 @@ class TestAvailableSubtitles:
|
||||
|
||||
|
||||
class TestSubtitleRuleSet:
|
||||
def test_global_default_uses_kb_defaults(self):
|
||||
def test_global_default_returns_injected_defaults(self):
|
||||
rs = SubtitleRuleSet.global_default()
|
||||
rules = rs.resolve()
|
||||
# Loaded from subtitles.yaml — defaults must be non-empty.
|
||||
assert rules.preferred_languages
|
||||
assert rules.preferred_formats
|
||||
assert 0 < rules.min_confidence <= 1
|
||||
rules = rs.resolve(_DEFAULT_RULES)
|
||||
assert rules.preferred_languages == _DEFAULT_RULES.preferred_languages
|
||||
assert rules.preferred_formats == _DEFAULT_RULES.preferred_formats
|
||||
assert rules.min_confidence == _DEFAULT_RULES.min_confidence
|
||||
|
||||
def test_override_persists(self):
|
||||
rs = SubtitleRuleSet.global_default()
|
||||
rs.override(languages=["eng"], min_confidence=0.9)
|
||||
rules = rs.resolve()
|
||||
rules = rs.resolve(_DEFAULT_RULES)
|
||||
assert rules.preferred_languages == ["eng"]
|
||||
assert rules.min_confidence == 0.9
|
||||
|
||||
@@ -252,10 +261,10 @@ class TestSubtitleRuleSet:
|
||||
parent=parent,
|
||||
)
|
||||
child.override(languages=["jpn"])
|
||||
rules = child.resolve()
|
||||
rules = child.resolve(_DEFAULT_RULES)
|
||||
assert rules.preferred_languages == ["jpn"]
|
||||
# min_confidence not overridden at child or parent → falls back to defaults
|
||||
assert rules.min_confidence == parent.resolve().min_confidence
|
||||
assert rules.min_confidence == parent.resolve(_DEFAULT_RULES).min_confidence
|
||||
|
||||
def test_to_dict_only_emits_set_deltas(self):
|
||||
rs = SubtitleRuleSet(scope=RuleScope(level="show", identifier="tt1"))
|
||||
@@ -286,4 +295,4 @@ class TestSubtitleRuleSet:
|
||||
# code uses `is not None` explicitly. Verify 0.0 doesn't fall back.
|
||||
rs = SubtitleRuleSet.global_default()
|
||||
rs.override(min_confidence=0.0)
|
||||
assert rs.resolve().min_confidence == 0.0
|
||||
assert rs.resolve(_DEFAULT_RULES).min_confidence == 0.0
|
||||
|
||||
@@ -157,10 +157,9 @@ class TestEpisode:
|
||||
assert filename.startswith("S01E05")
|
||||
assert "Gray.Matter" in filename
|
||||
|
||||
def test_has_file_false_when_no_path(self):
|
||||
def test_file_path_unset_by_default(self):
|
||||
e = self._ep()
|
||||
assert not e.has_file()
|
||||
assert not e.is_downloaded()
|
||||
assert e.file_path is None
|
||||
|
||||
def test_str_format(self):
|
||||
e = self._ep(season=2, episode=3, title="Bit by a Dead Bee")
|
||||
|
||||
@@ -17,6 +17,7 @@ from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
from alfred.domain.subtitles.value_objects import SubtitleMatchingRules
|
||||
from alfred.infrastructure.persistence.memory.ltm.components.subtitle_preferences import (
|
||||
SubtitlePreferences,
|
||||
)
|
||||
@@ -25,6 +26,15 @@ from alfred.infrastructure.subtitle.rule_repository import (
|
||||
_filter_override,
|
||||
)
|
||||
|
||||
# Stand-in for KB defaults, injected at resolve().
|
||||
_DEFAULT_RULES = SubtitleMatchingRules(
|
||||
preferred_languages=["eng"],
|
||||
preferred_formats=["srt"],
|
||||
allowed_types=["standard"],
|
||||
format_priority=["srt", "ass"],
|
||||
min_confidence=0.7,
|
||||
)
|
||||
|
||||
|
||||
def _write(path: Path, data: dict) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
@@ -71,17 +81,17 @@ class TestLoad:
|
||||
def test_no_files_returns_global_default(self, tmp_path):
|
||||
repo = RuleSetRepository(tmp_path)
|
||||
rs = repo.load()
|
||||
# Should resolve cleanly using the hardcoded defaults.
|
||||
rules = rs.resolve()
|
||||
assert rules.preferred_languages # non-empty
|
||||
assert rules.min_confidence > 0
|
||||
# With no overrides, resolve returns the injected defaults unchanged.
|
||||
rules = rs.resolve(_DEFAULT_RULES)
|
||||
assert rules.preferred_languages == _DEFAULT_RULES.preferred_languages
|
||||
assert rules.min_confidence == _DEFAULT_RULES.min_confidence
|
||||
|
||||
def test_subtitle_preferences_override_base(self, tmp_path):
|
||||
prefs = SubtitlePreferences(
|
||||
languages=["jpn"], formats=["ass"], types=["standard"]
|
||||
)
|
||||
repo = RuleSetRepository(tmp_path)
|
||||
rules = repo.load(subtitle_preferences=prefs).resolve()
|
||||
rules = repo.load(subtitle_preferences=prefs).resolve(_DEFAULT_RULES)
|
||||
assert rules.preferred_languages == ["jpn"]
|
||||
assert rules.preferred_formats == ["ass"]
|
||||
assert rules.allowed_types == ["standard"]
|
||||
@@ -92,7 +102,7 @@ class TestLoad:
|
||||
{"override": {"languages": ["spa"], "min_confidence": 0.95}},
|
||||
)
|
||||
repo = RuleSetRepository(tmp_path)
|
||||
rules = repo.load().resolve()
|
||||
rules = repo.load().resolve(_DEFAULT_RULES)
|
||||
assert rules.preferred_languages == ["spa"]
|
||||
assert rules.min_confidence == 0.95
|
||||
|
||||
@@ -102,7 +112,7 @@ class TestLoad:
|
||||
{"override": {"format_priority": ["ass", "srt"]}},
|
||||
)
|
||||
repo = RuleSetRepository(tmp_path)
|
||||
rules = repo.load(release_group="KONTRAST").resolve()
|
||||
rules = repo.load(release_group="KONTRAST").resolve(_DEFAULT_RULES)
|
||||
assert rules.format_priority == ["ass", "srt"]
|
||||
|
||||
def test_full_three_level_chain(self, tmp_path):
|
||||
@@ -119,7 +129,9 @@ class TestLoad:
|
||||
{"override": {"min_confidence": 0.99}},
|
||||
)
|
||||
repo = RuleSetRepository(tmp_path)
|
||||
rules = repo.load(release_group="GRP", subtitle_preferences=prefs).resolve()
|
||||
rules = repo.load(release_group="GRP", subtitle_preferences=prefs).resolve(
|
||||
_DEFAULT_RULES
|
||||
)
|
||||
# All three levels visible — local overrides on top
|
||||
assert rules.preferred_languages == ["jpn"]
|
||||
assert rules.format_priority == ["ass"]
|
||||
|
||||
@@ -17,7 +17,7 @@ from __future__ import annotations
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.domain.subtitles.entities import SubtitleCandidate
|
||||
from alfred.domain.subtitles.services.placer import PlacedTrack
|
||||
from alfred.application.subtitles.placer import PlacedTrack
|
||||
from alfred.domain.subtitles.value_objects import (
|
||||
SubtitleFormat,
|
||||
SubtitleLanguage,
|
||||
|
||||
Reference in New Issue
Block a user