13 Commits

Author SHA1 Message Date
francwa 14941d47c0 Merge branch 'refactor/domain-io-extraction'
Extract all I/O (subprocess, filesystem, YAML loading) from the domain
layer via ports/adapters. domain/subtitles/ now has zero imports from
infrastructure/. The remaining domain → infra leak (release knowledge
loaded at import time) is documented in tech-debt for a dedicated branch.
2026-05-19 15:16:59 +02:00
francwa df798f55cc refactor(subtitles): introduce SubtitleKnowledge Protocol port
Domain services (SubtitleIdentifier, PatternDetector) used to import the
concrete SubtitleKnowledgeBase class directly from infrastructure for
their type hint. With this commit they depend on a structural Protocol
in alfred/domain/subtitles/ports/knowledge.py declaring just the 7
read-only query methods the domain actually consumes.

The concrete YAML-backed SubtitleKnowledgeBase in infrastructure remains
the sole adapter — no rename, no shim. With this change
alfred/domain/subtitles/ has zero imports from alfred/infrastructure/.

Also extend the changelog entry covering the full domain-io-extraction
branch.
2026-05-19 15:15:43 +02:00
francwa 535935cc73 docs(changelog): summarize refactor/domain-io-extraction work block 2026-05-19 15:11:17 +02:00
francwa 6e252d1e81 refactor(subtitles): inject default rules into SubtitleRuleSet.resolve()
aggregates.py used to call SubtitleKnowledgeBase().default_rules() via a
DEFAULT_RULES() helper, which silently pulled the infrastructure layer
(YAML loader) into the domain on every resolve.

Make the dependency explicit: resolve() now takes the default rules as
a parameter, and the caller (the ManageSubtitles use case) loads them
from the KB once and passes them in. Domain stays I/O-free.

- Drop DEFAULT_RULES helper and the SubtitleKnowledgeBase import from
  alfred/domain/subtitles/aggregates.py
- SubtitleRuleSet.resolve(default_rules: SubtitleMatchingRules)
- manage_subtitles use case passes kb.default_rules() at the call site
- Tests use a local SubtitleMatchingRules stand-in instead of relying
  on KB defaults
2026-05-19 15:10:06 +02:00
francwa 903e9e7117 refactor(subtitles): move SubtitlePlacer to application layer
The placer performs filesystem I/O (os.link) — it belongs in the
application layer, not the domain. Domain services should be pure.

- Move alfred/domain/subtitles/services/placer.py to
  alfred/application/subtitles/placer.py
- Move tests/domain/test_subtitle_placer.py to
  tests/application/test_subtitle_placer.py
- Update all callers (manage_subtitles use case, metadata store, tests)
- Drop placer re-exports from domain.subtitles.services.__init__
2026-05-19 15:07:39 +02:00
francwa 9556bf9e08 refactor(domain): strip live filesystem I/O from VOs and entities
DDD-pure cleanup — entities and value objects no longer query the world
at read time.

  FilePath: drop .exists() / .is_file() / .is_dir(). The VO is now a
    pure address; ask the injected FilesystemScanner for live state.
  Movie:    drop .has_file() / .is_downloaded(). Invariant: when the
    application sets file_path, it has already constated the file
    exists; downstream readers trust the snapshot.
  Episode:  same — drop .has_file() / .is_downloaded().
  SubtitlePlacer: drop the pre-check .exists() calls. The placer now
    attempts os.link() and reports FileNotFoundError / FileExistsError
    as skip reasons. Removes a TOCTOU race as a bonus.

Tests adjusted: the FilePath VO method tests are gone (the methods are
gone), test_has_file_false_when_no_path replaced by a plain assertion
on file_path is None. Placer tests are unchanged — the skip-reason
strings ('not found', 'already exists') match the new try/except paths.

The 'snapshot value objects' pattern (ProbedMediaInfo, TmdbMovieInfo)
that this cleanup enables is documented in refactor_domain_io.md, to
be applied when a future use case actually needs richer metadata —
not now, no speculative VOs.
2026-05-19 14:58:59 +02:00
francwa e6ee700825 refactor(subtitles): inject MediaProber/FilesystemScanner ports into domain services
Domain services no longer call subprocess or pathlib directly. Introduces
two Protocol ports in domain/shared/ports/:

  MediaProber.list_subtitle_streams(video) -> list[SubtitleStreamInfo]
  FilesystemScanner.scan_dir / stat / read_text  -> list[FileEntry] | ...

Concrete adapters live in infrastructure/:

  FfprobeMediaProber          (wraps subprocess + ffprobe + JSON)
  PathlibFilesystemScanner    (wraps pathlib + os reads)

SubtitleIdentifier and PatternDetector now take (kb, prober, scanner) at
construction time. Their internals work over FileEntry snapshots and
SubtitleStreamInfo records — no more ad-hoc Path.is_file/iterdir/stat or
embedded subprocess.run loops. _count_entries now takes raw SRT text
(returned by scanner.read_text) so SRT-only entry counting stays out of
the FS layer.

manage_subtitles use case instantiates the two adapters once and injects
them into both services. Tests pass real adapters and patch
`alfred.infrastructure.probe.ffprobe_prober.subprocess.run` for the
ffprobe-failure cases. _classify_single tests build FileEntry via a
small helper.

Domain is now free of subprocess / direct filesystem reads in the
subtitle pipeline. The only remaining I/O hooks are FilePath VO
convenience methods (exists/is_file/is_dir) which stay as a deliberate
affordance on the value object.
2026-05-19 14:52:24 +02:00
francwa ced72547f7 refactor(knowledge): extract YAML loaders from domain to infrastructure
The domain layer no longer reads YAML files. All knowledge loaders move
from `alfred/domain/*/knowledge/` to `alfred/infrastructure/knowledge/`:

  domain/release/knowledge.py
    → infrastructure/knowledge/release.py
  domain/shared/knowledge/language_registry.py
    → infrastructure/knowledge/language_registry.py
  domain/subtitles/knowledge/{loader,base}.py
    → infrastructure/knowledge/subtitles/{loader,base}.py

Callers in domain/release/{services,value_objects}.py,
domain/subtitles/{aggregates,services/*}.py, and
application/filesystem/manage_subtitles.py updated to absolute imports.
Re-exports of KnowledgeLoader/SubtitleKnowledgeBase from
domain/subtitles/__init__.py dropped (no shim per project convention).
Tests follow the moved targets.
2026-05-19 14:35:18 +02:00
francwa f338b08706 refactor(release): type media_type/parse_path as true enums
ParsedRelease.media_type is now MediaTypeToken (not str) and parse_path
is ParsePath (not str). __post_init__ keeps a tolerant constructor that
coerces raw strings via the enum, so callers passing 'movie'/'direct'
still work transparently. Since both enums inherit from str, existing
string comparisons and JSON serialization remain unchanged.
2026-05-19 14:21:27 +02:00
francwa da484d7474 refactor(release): typed enums + __post_init__ validation on ParsedRelease
ParsedRelease accepted any string for media_type/parse_path and had no
validation on numeric ranges (season=-5 was silently accepted). Tighten
both ends:

- New str-backed Enums MediaTypeToken and ParsePath. Inherit from str so
  every existing comparison ('== "movie"'), JSON serialization, and TMDB
  DTO interop keeps working unchanged.
- ParsedRelease.__post_init__ now validates: raw/group non-empty, year in
  1888-2100, season 0-100, episode 0-9999, episode_end >= episode,
  media_type/parse_path against the enum allowlist.
- services.py uses the enum .value members everywhere instead of bare
  string literals — kills the typo risk.
2026-05-19 14:17:56 +02:00
francwa 481eeb5afd refactor(domain): identity-based equality + dedup track helpers
Two related DDD fixes for Movie and Episode entities:

- Identity equality: @dataclass(eq=False) with custom __eq__/__hash__.
  Movie is identified by imdb_id, Episode by (season, episode) within
  the TVShow aggregate. Auto-generated field-by-field equality was
  incorrectly making two Movie instances with the same imdb_id but
  different audio_tracks compare unequal — breaks dedup/caching.

- MediaWithTracks mixin: the 5 audio/subtitle helpers
  (has_audio_in / audio_languages / has_subtitles_in / has_forced_subs /
  subtitle_languages) were duplicated verbatim between Movie and Episode.
  Extracted to shared/media/tracks_mixin.py; both entities now inherit.

Bonus: dropped the object.__setattr__ coercion dance in Movie.__post_init__
— the class isn't frozen so plain assignment is the right call.
2026-05-19 14:17:47 +02:00
francwa 7cd24f3a31 refactor(domain): freeze media track value objects
AudioTrack, VideoTrack, SubtitleTrack and MediaInfo are snapshots of a
single ffprobe run — model them as proper immutable value objects.

- @dataclass(frozen=True) on all four
- MediaInfo track collections become tuple[...] instead of list[...]
- ffprobe adapter rewritten to build tuples up-front instead of
  appending/setattr'ing on a constructed instance
2026-05-19 14:17:27 +02:00
francwa eb8995cfc3 refactor(subtitles): drop dead scanner module
SubtitleScanner was an earlier iteration superseded by SubtitleIdentifier
and never imported in production code (only by its own tests). Removing
both keeps the bounded context clean and shrinks the surface.
2026-05-19 14:17:15 +02:00
50 changed files with 844 additions and 878 deletions
+42
View File
@@ -112,6 +112,48 @@ callers).
### Internal ### Internal
- **Domain I/O extraction** (`refactor/domain-io-extraction`): the domain
layer no longer performs subprocess calls, filesystem scans, or YAML
loading. Achieved in a series of focused commits:
- **Knowledge YAML loaders moved to infrastructure**:
`alfred/domain/release/knowledge.py`,
`alfred/domain/shared/knowledge/language_registry.py`, and
`alfred/domain/subtitles/knowledge/{base,loader}.py` relocated to
`alfred/infrastructure/knowledge/`. Re-exports were dropped — callers
import directly from the new location.
- **`MediaProber` and `FilesystemScanner` Protocol ports** introduced at
`alfred/domain/shared/ports/` with frozen-dataclass DTOs
(`SubtitleStreamInfo`, `FileEntry`). `SubtitleIdentifier` and
`PatternDetector` are now constructor-injected with concrete adapters
(`FfprobeMediaProber` wrapping `subprocess.run(ffprobe)` and
`PathlibFilesystemScanner` wrapping `pathlib`). No more direct
`subprocess`/`pathlib` usage from the subtitle domain services.
- **Live filesystem methods removed from VOs and entities**:
`FilePath.exists()` / `.is_file()` / `.is_dir()` deleted —
`FilePath` is now a pure address VO. `Movie.has_file()` and
`Episode.is_downloaded()` dropped. Callers either rely on a prior
detection step or use try/except over pre-checks (eliminates
TOCTOU races).
- **`SubtitlePlacer` moved to the application layer** at
`alfred/application/subtitles/placer.py` — it performs `os.link`
I/O, which doesn't belong in the domain. Pre-checks replaced with
try/except for `FileNotFoundError`/`FileExistsError`.
- **`SubtitleRuleSet.resolve()` no longer reaches into the knowledge
base**: the implicit `DEFAULT_RULES()` helper is gone, replaced by
an explicit `default_rules: SubtitleMatchingRules` parameter. The
`ManageSubtitles` use case loads defaults from the KB once and
passes them in.
- **`SubtitleKnowledge` Protocol port** at
`alfred/domain/subtitles/ports/knowledge.py` declares the read-only
query surface domain services consume (7 methods:
`known_extensions`, `format_for_extension`, `language_for_token`,
`is_known_lang_token`, `type_for_token`, `is_known_type_token`,
`patterns`). `SubtitleIdentifier` and `PatternDetector` depend on
this Protocol instead of the concrete `SubtitleKnowledgeBase` from
infrastructure — `domain/subtitles/` now has zero imports from
`infrastructure/`. The remaining domain → infra leak
(`domain/release/` loading separator YAML at import-time) is
documented in tech-debt and scheduled for its own branch.
- **`to_dot_folder_name(title)` helper** in - **`to_dot_folder_name(title)` helper** in
`alfred/domain/shared/value_objects.py` — extracts the `alfred/domain/shared/value_objects.py` — extracts the
`re.sub(r"[^\w\s\.\-]", "", title).replace(" ", ".")` pattern that was `re.sub(r"[^\w\s\.\-]", "", title).replace(" ", ".")` pattern that was
@@ -5,19 +5,21 @@ from pathlib import Path
from alfred.domain.shared.value_objects import ImdbId from alfred.domain.shared.value_objects import ImdbId
from alfred.domain.subtitles.entities import SubtitleCandidate from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader
from alfred.domain.subtitles.services.identifier import SubtitleIdentifier from alfred.domain.subtitles.services.identifier import SubtitleIdentifier
from alfred.domain.subtitles.services.matcher import SubtitleMatcher from alfred.domain.subtitles.services.matcher import SubtitleMatcher
from alfred.domain.subtitles.services.pattern_detector import PatternDetector from alfred.domain.subtitles.services.pattern_detector import PatternDetector
from alfred.domain.subtitles.services.placer import ( from alfred.application.subtitles.placer import (
PlacedTrack, PlacedTrack,
SubtitlePlacer, SubtitlePlacer,
_build_dest_name, _build_dest_name,
) )
from alfred.domain.subtitles.services.utils import available_subtitles from alfred.domain.subtitles.services.utils import available_subtitles
from alfred.domain.subtitles.value_objects import ScanStrategy from alfred.domain.subtitles.value_objects import ScanStrategy
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from alfred.infrastructure.knowledge.subtitles.loader import KnowledgeLoader
from alfred.infrastructure.persistence.context import get_memory from alfred.infrastructure.persistence.context import get_memory
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
from alfred.infrastructure.subtitle.metadata_store import SubtitleMetadataStore from alfred.infrastructure.subtitle.metadata_store import SubtitleMetadataStore
from alfred.infrastructure.subtitle.rule_repository import RuleSetRepository from alfred.infrastructure.subtitle.rule_repository import RuleSetRepository
@@ -91,13 +93,21 @@ class ManageSubtitlesUseCase:
) )
kb = SubtitleKnowledgeBase(KnowledgeLoader()) kb = SubtitleKnowledgeBase(KnowledgeLoader())
prober = FfprobeMediaProber()
scanner = PathlibFilesystemScanner()
library_root = _infer_library_root(dest_path, media_type) library_root = _infer_library_root(dest_path, media_type)
store = SubtitleMetadataStore(library_root) store = SubtitleMetadataStore(library_root)
repo = RuleSetRepository(library_root) repo = RuleSetRepository(library_root)
# --- Pattern resolution --- # --- Pattern resolution ---
pattern = self._resolve_pattern( pattern = self._resolve_pattern(
kb, store, source_path, confirmed_pattern_id, release_group kb,
prober,
scanner,
store,
source_path,
confirmed_pattern_id,
release_group,
) )
if pattern is None: if pattern is None:
return ManageSubtitlesResponse( return ManageSubtitlesResponse(
@@ -108,7 +118,7 @@ class ManageSubtitlesUseCase:
# --- Identify --- # --- Identify ---
media_id = _to_imdb_id(imdb_id) media_id = _to_imdb_id(imdb_id)
identifier = SubtitleIdentifier(kb) identifier = SubtitleIdentifier(kb, prober, scanner)
metadata = identifier.identify( metadata = identifier.identify(
video_path=source_path, video_path=source_path,
pattern=pattern, pattern=pattern,
@@ -153,7 +163,7 @@ class ManageSubtitlesUseCase:
subtitle_prefs = memory.ltm.subtitle_preferences subtitle_prefs = memory.ltm.subtitle_preferences
except Exception: except Exception:
pass pass
rules = repo.load(release_group, subtitle_prefs).resolve() rules = repo.load(release_group, subtitle_prefs).resolve(kb.default_rules())
matcher = SubtitleMatcher() matcher = SubtitleMatcher()
matched, unresolved = matcher.match(metadata.external_tracks, rules) matched, unresolved = matcher.match(metadata.external_tracks, rules)
@@ -228,6 +238,8 @@ class ManageSubtitlesUseCase:
def _resolve_pattern( def _resolve_pattern(
self, self,
kb: SubtitleKnowledgeBase, kb: SubtitleKnowledgeBase,
prober: FfprobeMediaProber,
scanner: PathlibFilesystemScanner,
store: SubtitleMetadataStore, store: SubtitleMetadataStore,
source_path: Path, source_path: Path,
confirmed_pattern_id: str | None, confirmed_pattern_id: str | None,
@@ -250,7 +262,7 @@ class ManageSubtitlesUseCase:
# 3. Auto-detect # 3. Auto-detect
release_root = source_path.parent release_root = source_path.parent
detector = PatternDetector(kb) detector = PatternDetector(kb, prober, scanner)
result = detector.detect(release_root, source_path) result = detector.detect(release_root, source_path)
if result["detected"] and result["confidence"] >= 0.6: if result["detected"] and result["confidence"] >= 0.6:
@@ -5,8 +5,8 @@ import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from ..entities import SubtitleCandidate from alfred.domain.subtitles.entities import SubtitleCandidate
from ..value_objects import SubtitleType from alfred.domain.subtitles.value_objects import SubtitleType
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -78,8 +78,8 @@ class SubtitlePlacer:
skipped.append((track, "embedded — no file to place")) skipped.append((track, "embedded — no file to place"))
continue continue
if not track.file_path or not track.file_path.exists(): if not track.file_path:
skipped.append((track, "source file not found")) skipped.append((track, "source file not set"))
continue continue
try: try:
@@ -90,11 +90,6 @@ class SubtitlePlacer:
dest_path = dest_dir / dest_name dest_path = dest_dir / dest_name
if dest_path.exists():
logger.debug(f"SubtitlePlacer: skip {dest_name} — already exists")
skipped.append((track, "destination already exists"))
continue
try: try:
os.link(track.file_path, dest_path) os.link(track.file_path, dest_path)
placed.append( placed.append(
@@ -105,6 +100,11 @@ class SubtitlePlacer:
) )
) )
logger.info(f"SubtitlePlacer: placed {dest_name}") logger.info(f"SubtitlePlacer: placed {dest_name}")
except FileNotFoundError:
skipped.append((track, "source file not found"))
except FileExistsError:
logger.debug(f"SubtitlePlacer: skip {dest_name} — already exists")
skipped.append((track, "destination already exists"))
except OSError as e: except OSError as e:
logger.warning(f"SubtitlePlacer: failed to place {dest_name}: {e}") logger.warning(f"SubtitlePlacer: failed to place {dest_name}: {e}")
skipped.append((track, str(e))) skipped.append((track, str(e)))
+18 -47
View File
@@ -3,13 +3,13 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from ..shared.media import AudioTrack, SubtitleTrack, track_lang_matches from ..shared.media import AudioTrack, MediaWithTracks, SubtitleTrack
from ..shared.value_objects import FilePath, FileSize, ImdbId, Language from ..shared.value_objects import FilePath, FileSize, ImdbId
from .value_objects import MovieTitle, Quality, ReleaseYear from .value_objects import MovieTitle, Quality, ReleaseYear
@dataclass @dataclass(eq=False)
class Movie: class Movie(MediaWithTracks):
""" """
Movie aggregate root for the movies domain. Movie aggregate root for the movies domain.
@@ -20,6 +20,10 @@ class Movie:
Track helpers follow the same "C+" contract as ``Episode``: pass a Track helpers follow the same "C+" contract as ``Episode``: pass a
``Language`` for cross-format matching, or a ``str`` for case-insensitive ``Language`` for cross-format matching, or a ``str`` for case-insensitive
direct comparison. direct comparison.
Equality is identity-based: two ``Movie`` instances are equal iff they
share the same ``imdb_id``, regardless of file/track contents. This is
the DDD aggregate invariant — the aggregate is identified by its root id.
""" """
imdb_id: ImdbId imdb_id: ImdbId
@@ -38,7 +42,7 @@ class Movie:
# Ensure ImdbId is actually an ImdbId instance # Ensure ImdbId is actually an ImdbId instance
if not isinstance(self.imdb_id, ImdbId): if not isinstance(self.imdb_id, ImdbId):
if isinstance(self.imdb_id, str): if isinstance(self.imdb_id, str):
object.__setattr__(self, "imdb_id", ImdbId(self.imdb_id)) self.imdb_id = ImdbId(self.imdb_id)
else: else:
raise ValueError( raise ValueError(
f"imdb_id must be ImdbId or str, got {type(self.imdb_id)}" f"imdb_id must be ImdbId or str, got {type(self.imdb_id)}"
@@ -47,55 +51,22 @@ class Movie:
# Ensure MovieTitle is actually a MovieTitle instance # Ensure MovieTitle is actually a MovieTitle instance
if not isinstance(self.title, MovieTitle): if not isinstance(self.title, MovieTitle):
if isinstance(self.title, str): if isinstance(self.title, str):
object.__setattr__(self, "title", MovieTitle(self.title)) self.title = MovieTitle(self.title)
else: else:
raise ValueError( raise ValueError(
f"title must be MovieTitle or str, got {type(self.title)}" f"title must be MovieTitle or str, got {type(self.title)}"
) )
def has_file(self) -> bool: def __eq__(self, other: object) -> bool:
"""Check if the movie has an associated file.""" if not isinstance(other, Movie):
return self.file_path is not None and self.file_path.exists() return NotImplemented
return self.imdb_id == other.imdb_id
def is_downloaded(self) -> bool: def __hash__(self) -> int:
"""Check if the movie is downloaded (has a file).""" return hash(self.imdb_id)
return self.has_file()
# ── Audio helpers ────────────────────────────────────────────────────── # Track helpers (has_audio_in / audio_languages / has_subtitles_in /
# has_forced_subs / subtitle_languages) come from MediaWithTracks.
def has_audio_in(self, lang: str | Language) -> bool:
"""True if at least one audio track is in the given language."""
return any(track_lang_matches(t.language, lang) for t in self.audio_tracks)
def audio_languages(self) -> list[str]:
"""Unique audio languages across all tracks, in track order."""
seen: set[str] = set()
result: list[str] = []
for t in self.audio_tracks:
if t.language and t.language not in seen:
seen.add(t.language)
result.append(t.language)
return result
# ── Subtitle helpers ───────────────────────────────────────────────────
def has_subtitles_in(self, lang: str | Language) -> bool:
"""True if at least one subtitle track is in the given language."""
return any(track_lang_matches(t.language, lang) for t in self.subtitle_tracks)
def has_forced_subs(self) -> bool:
"""True if at least one subtitle track is flagged as forced."""
return any(t.is_forced for t in self.subtitle_tracks)
def subtitle_languages(self) -> list[str]:
"""Unique subtitle languages across all tracks, in track order."""
seen: set[str] = set()
result: list[str] = []
for t in self.subtitle_tracks:
if t.language and t.language not in seen:
seen.add(t.language)
result.append(t.language)
return result
def get_folder_name(self) -> str: def get_folder_name(self) -> str:
""" """
+13 -11
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
import re import re
from .knowledge import load_separators from alfred.infrastructure.knowledge.release import load_separators
from .value_objects import ( from .value_objects import (
_AUDIO, _AUDIO,
_CODECS, _CODECS,
@@ -16,7 +16,9 @@ from .value_objects import (
_RESOLUTIONS, _RESOLUTIONS,
_SOURCES, _SOURCES,
_VIDEO_META, _VIDEO_META,
MediaTypeToken,
ParsedRelease, ParsedRelease,
ParsePath,
) )
@@ -39,12 +41,12 @@ def parse_release(name: str) -> ParsedRelease:
and run token-level matchers (season/episode, tech, languages, audio, and run token-level matchers (season/episode, tech, languages, audio,
video, edition, title, year). video, edition, title, year).
""" """
parse_path = "direct" parse_path = ParsePath.DIRECT.value
# Always try to extract a bracket-enclosed site tag first. # Always try to extract a bracket-enclosed site tag first.
clean, site_tag = _strip_site_tag(name) clean, site_tag = _strip_site_tag(name)
if site_tag is not None: if site_tag is not None:
parse_path = "sanitized" parse_path = ParsePath.SANITIZED.value
if not _is_well_formed(clean): if not _is_well_formed(clean):
return ParsedRelease( return ParsedRelease(
@@ -60,9 +62,9 @@ def parse_release(name: str) -> ParsedRelease:
codec=None, codec=None,
group="UNKNOWN", group="UNKNOWN",
tech_string="", tech_string="",
media_type="unknown", media_type=MediaTypeToken.UNKNOWN.value,
site_tag=site_tag, site_tag=site_tag,
parse_path="ai", parse_path=ParsePath.AI.value,
) )
name = clean name = clean
@@ -137,19 +139,19 @@ def _infer_media_type(
integrale_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("integrale", [])} integrale_tokens = {t.upper() for t in _MEDIA_TYPE_TOKENS.get("integrale", [])}
if upper_tokens & doc_tokens: if upper_tokens & doc_tokens:
return "documentary" return MediaTypeToken.DOCUMENTARY.value
if upper_tokens & concert_tokens: if upper_tokens & concert_tokens:
return "concert" return MediaTypeToken.CONCERT.value
if ( if (
edition in {"COMPLETE", "INTEGRALE", "COLLECTION"} edition in {"COMPLETE", "INTEGRALE", "COLLECTION"}
or upper_tokens & integrale_tokens or upper_tokens & integrale_tokens
) and season is None: ) and season is None:
return "tv_complete" return MediaTypeToken.TV_COMPLETE.value
if season is not None: if season is not None:
return "tv_show" return MediaTypeToken.TV_SHOW.value
if any([quality, source, codec, year]): if any([quality, source, codec, year]):
return "movie" return MediaTypeToken.MOVIE.value
return "unknown" return MediaTypeToken.UNKNOWN.value
def _is_well_formed(name: str) -> bool: def _is_well_formed(name: str) -> bool:
+82 -5
View File
@@ -3,8 +3,10 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum
from .knowledge import ( from ..shared.exceptions import ValidationError
from alfred.infrastructure.knowledge.release import (
load_audio, load_audio,
load_codecs, load_codecs,
load_editions, load_editions,
@@ -50,6 +52,38 @@ def _sanitize_for_fs(text: str) -> str:
return text.translate(_WIN_FORBIDDEN_TABLE) return text.translate(_WIN_FORBIDDEN_TABLE)
class MediaTypeToken(str, Enum):
"""
Canonical values for ``ParsedRelease.media_type``.
Inherits from ``str`` so existing string-based comparisons (``== "movie"``,
JSON serialization, TMDB DTO interop) keep working unchanged. The enum
serves both as documentation and as the set of valid values for
``__post_init__`` validation.
"""
MOVIE = "movie"
TV_SHOW = "tv_show"
TV_COMPLETE = "tv_complete"
DOCUMENTARY = "documentary"
CONCERT = "concert"
OTHER = "other"
UNKNOWN = "unknown"
class ParsePath(str, Enum):
"""How a ``ParsedRelease`` was produced. ``str``-backed for the same
reasons as :class:`MediaTypeToken`."""
DIRECT = "direct"
SANITIZED = "sanitized"
AI = "ai"
_VALID_MEDIA_TYPES: frozenset[str] = frozenset(m.value for m in MediaTypeToken)
_VALID_PARSE_PATHS: frozenset[str] = frozenset(p.value for p in ParsePath)
def _strip_episode_from_normalized(normalized: str) -> str: def _strip_episode_from_normalized(normalized: str) -> str:
""" """
Remove all episode parts (Exx) from a normalized release name, keeping Sxx. Remove all episode parts (Exx) from a normalized release name, keeping Sxx.
@@ -85,13 +119,11 @@ class ParsedRelease:
codec: str | None # x265, HEVC, … codec: str | None # x265, HEVC, …
group: str # release group, "UNKNOWN" if missing group: str # release group, "UNKNOWN" if missing
tech_string: str # quality.source.codec joined with dots tech_string: str # quality.source.codec joined with dots
media_type: str = ( media_type: MediaTypeToken = MediaTypeToken.UNKNOWN
"unknown" # "movie" | "tv_show" | "tv_complete" | "other" | "unknown"
)
site_tag: str | None = ( site_tag: str | None = (
None # site watermark stripped from name, e.g. "TGx", "OxTorrent.vc" None # site watermark stripped from name, e.g. "TGx", "OxTorrent.vc"
) )
parse_path: str = "direct" # "direct" | "sanitized" | "ai" parse_path: ParsePath = ParsePath.DIRECT
languages: list[str] = field(default_factory=list) # ["MULTI", "VFF"], ["FRENCH"], … languages: list[str] = field(default_factory=list) # ["MULTI", "VFF"], ["FRENCH"], …
audio_codec: str | None = None # "DTS-HD.MA", "DDP", "EAC3", … audio_codec: str | None = None # "DTS-HD.MA", "DDP", "EAC3", …
audio_channels: str | None = None # "5.1", "7.1", "2.0", … audio_channels: str | None = None # "5.1", "7.1", "2.0", …
@@ -99,6 +131,51 @@ class ParsedRelease:
hdr_format: str | None = None # "DV", "HDR10", "DV.HDR10", … hdr_format: str | None = None # "DV", "HDR10", "DV.HDR10", …
edition: str | None = None # "UNRATED", "EXTENDED", "DIRECTORS.CUT", … edition: str | None = None # "UNRATED", "EXTENDED", "DIRECTORS.CUT", …
def __post_init__(self) -> None:
if not self.raw:
raise ValidationError("ParsedRelease.raw cannot be empty")
if not self.group:
raise ValidationError("ParsedRelease.group cannot be empty")
if self.year is not None and not (1888 <= self.year <= 2100):
raise ValidationError(
f"ParsedRelease.year out of range: {self.year}"
)
if self.season is not None and not (0 <= self.season <= 100):
raise ValidationError(
f"ParsedRelease.season out of range: {self.season}"
)
if self.episode is not None and not (0 <= self.episode <= 9999):
raise ValidationError(
f"ParsedRelease.episode out of range: {self.episode}"
)
if self.episode_end is not None:
if not (0 <= self.episode_end <= 9999):
raise ValidationError(
f"ParsedRelease.episode_end out of range: {self.episode_end}"
)
if self.episode is not None and self.episode_end < self.episode:
raise ValidationError(
f"ParsedRelease.episode_end ({self.episode_end}) < "
f"episode ({self.episode})"
)
# Coerce raw strings into their enum form (tolerant constructor).
if not isinstance(self.media_type, MediaTypeToken):
try:
self.media_type = MediaTypeToken(self.media_type)
except ValueError:
raise ValidationError(
f"ParsedRelease.media_type invalid: {self.media_type!r} "
f"(expected one of {sorted(_VALID_MEDIA_TYPES)})"
) from None
if not isinstance(self.parse_path, ParsePath):
try:
self.parse_path = ParsePath(self.parse_path)
except ValueError:
raise ValidationError(
f"ParsedRelease.parse_path invalid: {self.parse_path!r} "
f"(expected one of {sorted(_VALID_PARSE_PATHS)})"
) from None
@property @property
def is_season_pack(self) -> bool: def is_season_pack(self) -> bool:
return self.season is not None and self.episode is None return self.season is not None and self.episode is None
@@ -1,5 +0,0 @@
"""Shared knowledge loaders (cross-domain)."""
from .language_registry import LanguageRegistry
__all__ = ["LanguageRegistry"]
+2
View File
@@ -8,11 +8,13 @@ from .audio import AudioTrack
from .info import MediaInfo from .info import MediaInfo
from .matching import track_lang_matches from .matching import track_lang_matches
from .subtitle import SubtitleTrack from .subtitle import SubtitleTrack
from .tracks_mixin import MediaWithTracks
from .video import VideoTrack from .video import VideoTrack
__all__ = [ __all__ = [
"AudioTrack", "AudioTrack",
"MediaInfo", "MediaInfo",
"MediaWithTracks",
"SubtitleTrack", "SubtitleTrack",
"VideoTrack", "VideoTrack",
"track_lang_matches", "track_lang_matches",
+1 -1
View File
@@ -5,7 +5,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@dataclass @dataclass(frozen=True)
class AudioTrack: class AudioTrack:
"""A single audio track as reported by ffprobe.""" """A single audio track as reported by ffprobe."""
+8 -6
View File
@@ -9,19 +9,21 @@ from .subtitle import SubtitleTrack
from .video import VideoTrack from .video import VideoTrack
@dataclass @dataclass(frozen=True)
class MediaInfo: class MediaInfo:
""" """
File-level media metadata extracted by ffprobe. File-level media metadata extracted by ffprobe — immutable snapshot.
Symmetric design: every stream type is a list of typed track objects. Symmetric design: every stream type is a tuple of typed track objects
(immutable on purpose — a MediaInfo is a frozen view of one ffprobe run,
not a mutable collection to append to).
Backwards-compatible flat accessors (``resolution``, ``width``, …) read Backwards-compatible flat accessors (``resolution``, ``width``, …) read
from the first video track when present. from the first video track when present.
""" """
video_tracks: list[VideoTrack] = field(default_factory=list) video_tracks: tuple[VideoTrack, ...] = field(default_factory=tuple)
audio_tracks: list[AudioTrack] = field(default_factory=list) audio_tracks: tuple[AudioTrack, ...] = field(default_factory=tuple)
subtitle_tracks: list[SubtitleTrack] = field(default_factory=list) subtitle_tracks: tuple[SubtitleTrack, ...] = field(default_factory=tuple)
# File-level (from ffprobe ``format`` block, not from any single stream) # File-level (from ffprobe ``format`` block, not from any single stream)
duration_seconds: float | None = None duration_seconds: float | None = None
+1 -1
View File
@@ -14,7 +14,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@dataclass @dataclass(frozen=True)
class SubtitleTrack: class SubtitleTrack:
"""A single embedded subtitle track as reported by ffprobe.""" """A single embedded subtitle track as reported by ffprobe."""
@@ -0,0 +1,77 @@
"""Mixin shared by entities that carry audio + subtitle tracks.
Both ``Movie`` and ``Episode`` carry a ``list[AudioTrack]`` plus a
``list[SubtitleTrack]`` and answer the same 5 queries about them (language
presence, unique languages, forced flag). Keep that behavior in one place so a
fix in one is a fix in both.
The mixin is plain Python (no dataclass machinery) so it composes cleanly with
``@dataclass`` entities — it only reads ``self.audio_tracks`` and
``self.subtitle_tracks`` which the host class provides as fields.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from ..value_objects import Language
from .matching import track_lang_matches
if TYPE_CHECKING:
from .audio import AudioTrack
from .subtitle import SubtitleTrack
class MediaWithTracks:
"""
Mixin providing audio/subtitle helpers for entities with track collections.
Hosts must expose two attributes:
* ``audio_tracks: list[AudioTrack]``
* ``subtitle_tracks: list[SubtitleTrack]``
The helpers follow the "C+" matching contract: pass a :class:`Language`
for cross-format matching, or a ``str`` for case-insensitive comparison.
"""
# These attributes are provided by the host entity (Movie, Episode, …).
# Declared here only for type-checkers and to make the contract explicit.
audio_tracks: list["AudioTrack"]
subtitle_tracks: list["SubtitleTrack"]
# ── Audio helpers ──────────────────────────────────────────────────────
def has_audio_in(self, lang: str | Language) -> bool:
"""True if at least one audio track is in the given language."""
return any(track_lang_matches(t.language, lang) for t in self.audio_tracks)
def audio_languages(self) -> list[str]:
"""Unique audio languages across all tracks, in track order."""
seen: set[str] = set()
result: list[str] = []
for t in self.audio_tracks:
if t.language and t.language not in seen:
seen.add(t.language)
result.append(t.language)
return result
# ── Subtitle helpers ───────────────────────────────────────────────────
def has_subtitles_in(self, lang: str | Language) -> bool:
"""True if at least one subtitle track is in the given language."""
return any(track_lang_matches(t.language, lang) for t in self.subtitle_tracks)
def has_forced_subs(self) -> bool:
"""True if at least one subtitle track is flagged as forced."""
return any(t.is_forced for t in self.subtitle_tracks)
def subtitle_languages(self) -> list[str]:
"""Unique subtitle languages across all tracks, in track order."""
seen: set[str] = set()
result: list[str] = []
for t in self.subtitle_tracks:
if t.language and t.language not in seen:
seen.add(t.language)
result.append(t.language)
return result
+1 -1
View File
@@ -5,7 +5,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@dataclass @dataclass(frozen=True)
class VideoTrack: class VideoTrack:
"""A single video track as reported by ffprobe. """A single video track as reported by ffprobe.
+17
View File
@@ -0,0 +1,17 @@
"""Ports — Protocol interfaces the domain depends on.
Adapters live in ``alfred/infrastructure/`` and implement these protocols.
Domain code never imports infrastructure; it accepts a port via constructor
injection and calls it. Tests can pass in-memory fakes that satisfy the
Protocol without going through real I/O.
"""
from .filesystem_scanner import FileEntry, FilesystemScanner
from .media_prober import MediaProber, SubtitleStreamInfo
__all__ = [
"FileEntry",
"FilesystemScanner",
"MediaProber",
"SubtitleStreamInfo",
]
@@ -0,0 +1,59 @@
"""FilesystemScanner port — abstracts filesystem inspection.
The domain never calls ``Path.iterdir``, ``Path.is_file``, ``Path.stat`` or
``open()`` directly. It asks the scanner for a ``FileEntry`` snapshot and
reasons from there. One scan = one I/O round-trip; no callbacks back to disk.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
@dataclass(frozen=True)
class FileEntry:
"""Frozen snapshot of one filesystem entry, taken at scan time.
The entry carries enough metadata for the domain to classify and order
files without re-querying the OS. ``size_kb`` is ``None`` for directories
and for files whose size could not be read.
"""
path: Path
is_file: bool
is_dir: bool
size_kb: float | None
@property
def name(self) -> str:
return self.path.name
@property
def stem(self) -> str:
return self.path.stem
@property
def suffix(self) -> str:
return self.path.suffix
class FilesystemScanner(Protocol):
"""Read-only filesystem inspection."""
def scan_dir(self, path: Path) -> list[FileEntry]:
"""Return sorted entries directly inside ``path``.
Returns an empty list when ``path`` is not a directory or is
unreadable. Adapters must not raise.
"""
...
def stat(self, path: Path) -> FileEntry | None:
"""Stat a single path; ``None`` when it doesn't exist or is unreadable."""
...
def read_text(self, path: Path, encoding: str = "utf-8") -> str | None:
"""Read a text file in one go; ``None`` on any error."""
...
@@ -0,0 +1,39 @@
"""MediaProber port — abstracts media stream inspection (e.g. ffprobe).
The adapter (typically wrapping ffprobe) maps low-level container metadata
into the small set of stream attributes the domain reasons about. Replacing
ffprobe with another tool only requires a new adapter — domain stays put.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
@dataclass(frozen=True)
class SubtitleStreamInfo:
"""A single embedded subtitle stream, as seen by the prober.
``language`` is the raw language tag emitted by the container (typically
ISO 639-2 like ``"fre"``, ``"eng"``); may be empty/None when the stream
has no language tag. The domain resolves it to a canonical ``Language``
via the knowledge base.
"""
language: str | None
is_hearing_impaired: bool
is_forced: bool
class MediaProber(Protocol):
"""Inspect a media file's stream metadata."""
def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]:
"""Return all subtitle streams in ``video``.
Returns an empty list when the file is missing, unreadable, or has
no subtitle streams. Adapters must not raise.
"""
...
-12
View File
@@ -67,18 +67,6 @@ class FilePath:
# Use object.__setattr__ because dataclass is frozen # Use object.__setattr__ because dataclass is frozen
object.__setattr__(self, "value", path_obj) object.__setattr__(self, "value", path_obj)
def exists(self) -> bool:
"""Check if the path exists."""
return self.value.exists()
def is_file(self) -> bool:
"""Check if the path is a file."""
return self.value.is_file()
def is_dir(self) -> bool:
"""Check if the path is a directory."""
return self.value.is_dir()
def __str__(self) -> str: def __str__(self) -> str:
return str(self.value) return str(self.value)
-3
View File
@@ -3,7 +3,6 @@
from .aggregates import SubtitleRuleSet from .aggregates import SubtitleRuleSet
from .entities import MediaSubtitleMetadata, SubtitleCandidate from .entities import MediaSubtitleMetadata, SubtitleCandidate
from .exceptions import SubtitleNotFound from .exceptions import SubtitleNotFound
from .knowledge import KnowledgeLoader, SubtitleKnowledgeBase
from .services import PatternDetector, SubtitleIdentifier, SubtitleMatcher from .services import PatternDetector, SubtitleIdentifier, SubtitleMatcher
from .value_objects import ( from .value_objects import (
RuleScope, RuleScope,
@@ -20,8 +19,6 @@ __all__ = [
"SubtitleCandidate", "SubtitleCandidate",
"MediaSubtitleMetadata", "MediaSubtitleMetadata",
"SubtitleRuleSet", "SubtitleRuleSet",
"SubtitleKnowledgeBase",
"KnowledgeLoader",
"SubtitleIdentifier", "SubtitleIdentifier",
"SubtitleMatcher", "SubtitleMatcher",
"PatternDetector", "PatternDetector",
+9 -9
View File
@@ -4,15 +4,9 @@ from dataclasses import dataclass, field
from typing import Any from typing import Any
from ..shared.value_objects import ImdbId from ..shared.value_objects import ImdbId
from .knowledge.base import SubtitleKnowledgeBase
from .value_objects import RuleScope, SubtitleMatchingRules from .value_objects import RuleScope, SubtitleMatchingRules
def DEFAULT_RULES() -> SubtitleMatchingRules:
"""Load default matching rules from subtitles.yaml (defaults section)."""
return SubtitleKnowledgeBase().default_rules()
@dataclass @dataclass
class SubtitleRuleSet: class SubtitleRuleSet:
""" """
@@ -36,12 +30,18 @@ class SubtitleRuleSet:
_format_priority: list[str] | None = field(default=None, repr=False) _format_priority: list[str] | None = field(default=None, repr=False)
_min_confidence: float | None = field(default=None, repr=False) _min_confidence: float | None = field(default=None, repr=False)
def resolve(self) -> SubtitleMatchingRules: def resolve(self, default_rules: SubtitleMatchingRules) -> SubtitleMatchingRules:
""" """
Walk the parent chain and merge deltas into effective rules. Walk the parent chain and merge deltas into effective rules.
Falls back to DEFAULT_RULES at the top of the chain.
``default_rules`` seeds the top of the chain — it is the caller's
responsibility to load these from the knowledge base (infrastructure).
Keeping the default rules as a parameter keeps the domain free of
any I/O dependency.
""" """
base = self.parent.resolve() if self.parent else DEFAULT_RULES() base = (
self.parent.resolve(default_rules) if self.parent else default_rules
)
return SubtitleMatchingRules( return SubtitleMatchingRules(
preferred_languages=self._languages or base.preferred_languages, preferred_languages=self._languages or base.preferred_languages,
preferred_formats=self._formats or base.preferred_formats, preferred_formats=self._formats or base.preferred_formats,
@@ -0,0 +1,6 @@
"""Domain ports for the subtitles domain — Protocol-based abstractions
that decouple domain services from concrete infrastructure adapters."""
from .knowledge import SubtitleKnowledge
__all__ = ["SubtitleKnowledge"]
@@ -0,0 +1,38 @@
"""SubtitleKnowledge port — the query surface domain services need from the
subtitle knowledge base, expressed as a Protocol so the domain never imports
the infrastructure adapter that backs it.
The concrete implementation lives in
``alfred/infrastructure/knowledge/subtitles/base.py`` (the YAML-backed
``SubtitleKnowledgeBase``). Tests can supply any object that satisfies this
structural contract.
"""
from __future__ import annotations
from typing import Protocol
from ..value_objects import SubtitleFormat, SubtitleLanguage, SubtitlePattern, SubtitleType
class SubtitleKnowledge(Protocol):
"""Read-only query surface for subtitle knowledge consumed by the domain.
Only the methods that domain services actually call belong here — anything
else (defaults loading, reload, pattern groups, raw dicts) stays on the
concrete class and is reserved for the application layer.
"""
def known_extensions(self) -> set[str]: ...
def format_for_extension(self, ext: str) -> SubtitleFormat | None: ...
def language_for_token(self, token: str) -> SubtitleLanguage | None: ...
def is_known_lang_token(self, token: str) -> bool: ...
def type_for_token(self, token: str) -> SubtitleType | None: ...
def is_known_type_token(self, token: str) -> bool: ...
def patterns(self) -> dict[str, SubtitlePattern]: ...
-207
View File
@@ -1,207 +0,0 @@
"""SubtitleScanner — inspects local subtitle files and filters them per user preferences.
Given a video file path, the scanner:
1. Looks for subtitle files in the same directory as the video.
2. Optionally also inspects a Subs/ subfolder adjacent to the video.
3. Classifies each file (language, SDH, forced) from its filename, delegating
all token knowledge to SubtitleKnowledgeBase (which itself merges
LanguageRegistry + subtitle-specific tokens from subtitles.yaml).
4. Filters according to SubtitlePreferences (languages, min_size_kb, keep_sdh,
keep_forced).
5. Returns a list of SubtitleCandidate — one per file that passes the filter,
with the destination filename already computed.
Filename classification heuristics
-----------------------------------
We parse the stem of each subtitle file looking for known patterns:
fre.srt → lang=fre, sdh=False, forced=False
fre.sdh.srt → lang=fre, sdh=True
fre.forced.srt → lang=fre, forced=True
Breaking.Bad.S01E01.French.srt → lang=fre (alias match via LanguageRegistry)
Breaking.Bad.S01E01.VOSTFR.srt → lang=fre (subtitle-specific token)
ISO 639-2/B codes are used throughout (matching the project-wide canonical form
from iso_languages.yaml — what ffprobe emits).
Output naming convention (matches SubtitlePreferences docstring):
{lang}.srt
{lang}.sdh.srt
{lang}.forced.srt
"""
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from .knowledge.base import SubtitleKnowledgeBase
from .value_objects import SubtitleType
logger = logging.getLogger(__name__)
_TOKEN_SPLIT = re.compile(r"[\.\s_\-]+")
@dataclass
class SubtitleCandidate:
"""A subtitle file that passed the filter, ready to be placed."""
source_path: Path
language: str # ISO 639-2/B code, e.g. "fre"
is_sdh: bool
is_forced: bool
extension: str # e.g. ".srt"
@property
def destination_name(self) -> str:
"""
Compute the destination filename per naming convention:
{lang}.srt
{lang}.sdh.srt
{lang}.forced.srt
"""
ext = self.extension.lstrip(".")
parts = [self.language]
if self.is_sdh:
parts.append("sdh")
elif self.is_forced:
parts.append("forced")
return ".".join(parts) + "." + ext
# Module-level KB instance — built lazily on first use to avoid loading YAML at import.
_KB: SubtitleKnowledgeBase | None = None
def _kb() -> SubtitleKnowledgeBase:
global _KB # noqa: PLW0603 — intentional lazy module-level cache
if _KB is None:
_KB = SubtitleKnowledgeBase()
return _KB
def _classify(path: Path) -> tuple[str | None, bool, bool]:
"""
Parse a subtitle filename and return (language_code, is_sdh, is_forced).
``language_code`` is the ISO 639-2/B canonical code (e.g. ``"fre"``).
Returns (None, False, False) if the language cannot be determined.
"""
stem = path.stem.lower()
tokens = _TOKEN_SPLIT.split(stem)
kb = _kb()
language: str | None = None
is_sdh = False
is_forced = False
for token in tokens:
if not token:
continue
if language is None:
lang = kb.language_for_token(token)
if lang is not None:
language = lang.code
continue
stype = kb.type_for_token(token)
if stype is SubtitleType.SDH:
is_sdh = True
elif stype is SubtitleType.FORCED:
is_forced = True
return language, is_sdh, is_forced
class SubtitleScanner:
"""
Scans subtitle files next to a video and filters them per SubtitlePreferences.
Usage:
scanner = SubtitleScanner(prefs)
candidates = scanner.scan(video_path)
# Each candidate has .source_path and .destination_name
"""
def __init__(
self, languages: list[str], min_size_kb: int, keep_sdh: bool, keep_forced: bool
):
self.languages = [lang.lower() for lang in languages]
self.min_size_kb = min_size_kb
self.keep_sdh = keep_sdh
self.keep_forced = keep_forced
self._kb = _kb()
self._subtitle_extensions = {e.lower() for e in self._kb.known_extensions()}
def scan(self, video_path: Path) -> list[SubtitleCandidate]:
"""
Return all subtitle candidates found next to the video that pass the filter.
Scans:
- Same directory as the video (flat siblings)
- Subs/ subfolder if present
"""
candidates: list[SubtitleCandidate] = []
search_dirs = [video_path.parent]
subs_dir = video_path.parent / "Subs"
if subs_dir.is_dir():
search_dirs.append(subs_dir)
logger.debug(f"SubtitleScanner: found Subs/ folder at {subs_dir}")
for directory in search_dirs:
for path in sorted(directory.iterdir()):
if not path.is_file():
continue
if path.suffix.lower() not in self._subtitle_extensions:
continue
candidate = self._evaluate(path)
if candidate is not None:
candidates.append(candidate)
logger.info(
f"SubtitleScanner: {len(candidates)} candidate(s) found for {video_path.name}"
)
return candidates
def _evaluate(self, path: Path) -> SubtitleCandidate | None:
"""Apply all filters to a single subtitle file. Returns None if it should be dropped."""
# Size filter
size_kb = path.stat().st_size / 1024
if size_kb < self.min_size_kb:
logger.debug(
f"SubtitleScanner: skip {path.name} (too small: {size_kb:.1f} KB)"
)
return None
language, is_sdh, is_forced = _classify(path)
# Language filter
if language is None:
logger.debug(f"SubtitleScanner: skip {path.name} (language unknown)")
return None
if language not in self.languages:
logger.debug(
f"SubtitleScanner: skip {path.name} (language '{language}' not in prefs)"
)
return None
# SDH filter
if is_sdh and not self.keep_sdh:
logger.debug(f"SubtitleScanner: skip {path.name} (SDH not wanted)")
return None
# Forced filter
if is_forced and not self.keep_forced:
logger.debug(f"SubtitleScanner: skip {path.name} (forced not wanted)")
return None
return SubtitleCandidate(
source_path=path,
language=language,
is_sdh=is_sdh,
is_forced=is_forced,
extension=path.suffix.lower(),
)
@@ -1,13 +1,9 @@
from .identifier import SubtitleIdentifier from .identifier import SubtitleIdentifier
from .matcher import SubtitleMatcher from .matcher import SubtitleMatcher
from .pattern_detector import PatternDetector from .pattern_detector import PatternDetector
from .placer import PlacedTrack, PlaceResult, SubtitlePlacer
__all__ = [ __all__ = [
"SubtitleIdentifier", "SubtitleIdentifier",
"SubtitleMatcher", "SubtitleMatcher",
"PatternDetector", "PatternDetector",
"SubtitlePlacer",
"PlacedTrack",
"PlaceResult",
] ]
+64 -101
View File
@@ -1,14 +1,13 @@
"""SubtitleIdentifier — finds and classifies all subtitle tracks for a video file.""" """SubtitleIdentifier — finds and classifies all subtitle tracks for a video file."""
import json
import logging import logging
import re import re
import subprocess
from pathlib import Path from pathlib import Path
from ...shared.ports import FilesystemScanner, MediaProber
from ..ports import SubtitleKnowledge
from ...shared.value_objects import ImdbId from ...shared.value_objects import ImdbId
from ..entities import MediaSubtitleMetadata, SubtitleCandidate from ..entities import MediaSubtitleMetadata, SubtitleCandidate
from ..knowledge.base import SubtitleKnowledgeBase
from ..value_objects import ScanStrategy, SubtitlePattern, SubtitleType from ..value_objects import ScanStrategy, SubtitlePattern, SubtitleType
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -37,17 +36,14 @@ def _tokenize_suffix(stem: str, episode_stem: str) -> list[str]:
return _tokenize(stem) return _tokenize(stem)
def _count_entries(path: Path) -> int: def _count_entries(text: str | None) -> int | None:
"""Return the entry count of an SRT file by finding the last cue number.""" """Return the entry count of an SRT body by finding the last cue number."""
try: if text is None:
with open(path, encoding="utf-8", errors="replace") as f: return None
lines = f.read().splitlines() for line in reversed(text.splitlines()):
for line in reversed(lines):
if line.strip().isdigit(): if line.strip().isdigit():
return int(line.strip()) return int(line.strip())
return 0 return 0
except Exception:
return 0
class SubtitleIdentifier: class SubtitleIdentifier:
@@ -60,8 +56,15 @@ class SubtitleIdentifier:
the caller (use case) decides whether to ask the user for clarification. the caller (use case) decides whether to ask the user for clarification.
""" """
def __init__(self, kb: SubtitleKnowledgeBase): def __init__(
self,
kb: SubtitleKnowledge,
prober: MediaProber,
scanner: FilesystemScanner,
):
self.kb = kb self.kb = kb
self.prober = prober
self.scanner = scanner
def identify( def identify(
self, self,
@@ -88,52 +91,21 @@ class SubtitleIdentifier:
return metadata return metadata
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Embedded tracks — ffprobe # Embedded tracks — via MediaProber
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _scan_embedded(self, video_path: Path) -> list[SubtitleCandidate]: def _scan_embedded(self, video_path: Path) -> list[SubtitleCandidate]:
if not video_path.exists(): streams = self.prober.list_subtitle_streams(video_path)
return []
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"s",
str(video_path),
],
capture_output=True,
text=True,
timeout=30,
check=False,
)
data = json.loads(result.stdout)
except (
subprocess.TimeoutExpired,
json.JSONDecodeError,
FileNotFoundError,
) as e:
logger.debug(
f"SubtitleIdentifier: ffprobe failed for {video_path.name}: {e}"
)
return []
tracks = [] tracks = []
for stream in data.get("streams", []): for stream in streams:
tags = stream.get("tags", {}) lang = (
disposition = stream.get("disposition", {}) self.kb.language_for_token(stream.language) if stream.language else None
lang_code = tags.get("language", "") )
lang = self.kb.language_for_token(lang_code) if lang_code else None if stream.is_hearing_impaired:
if disposition.get("hearing_impaired"):
stype = SubtitleType.SDH stype = SubtitleType.SDH
elif disposition.get("forced"): elif stream.is_forced:
stype = SubtitleType.FORCED stype = SubtitleType.FORCED
else: else:
stype = SubtitleType.STANDARD stype = SubtitleType.STANDARD
@@ -144,7 +116,7 @@ class SubtitleIdentifier:
format=None, format=None,
subtitle_type=stype, subtitle_type=stype,
is_embedded=True, is_embedded=True,
raw_tokens=[lang_code] if lang_code else [], raw_tokens=[stream.language] if stream.language else [],
) )
) )
@@ -176,52 +148,42 @@ class SubtitleIdentifier:
return self._classify_files(candidates, pattern, episode_stem=episode_stem) return self._classify_files(candidates, pattern, episode_stem=episode_stem)
def _find_adjacent(self, video_path: Path) -> list[Path]: def _find_adjacent(self, video_path: Path) -> list:
known = self.kb.known_extensions()
return [ return [
p entry
for p in sorted(video_path.parent.iterdir()) for entry in self.scanner.scan_dir(video_path.parent)
if p.is_file() if entry.is_file
and p.suffix.lower() in self.kb.known_extensions() and entry.suffix.lower() in known
and p.stem != video_path.stem and entry.stem != video_path.stem
] ]
def _find_flat(self, video_path: Path, root_folder: str) -> list[Path]: def _find_flat(self, video_path: Path, root_folder: str) -> list:
subs_dir = video_path.parent / root_folder known = self.kb.known_extensions()
if not subs_dir.is_dir(): # Adjacent first, then release root (one level up)
# Also look at release root (one level up) for subs_dir in (
subs_dir = video_path.parent.parent / root_folder video_path.parent / root_folder,
if not subs_dir.is_dir(): video_path.parent.parent / root_folder,
return [] ):
entries = self.scanner.scan_dir(subs_dir)
if entries:
return [ return [
p e for e in entries if e.is_file and e.suffix.lower() in known
for p in sorted(subs_dir.iterdir())
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
] ]
return []
def _find_episode_subfolder( def _find_episode_subfolder(
self, video_path: Path, root_folder: str self, video_path: Path, root_folder: str
) -> tuple[list[Path], str]: ) -> tuple[list, str]:
""" """Look for Subs/{episode_stem}/*.srt — adjacent or one level up."""
Look for Subs/{episode_stem}/*.srt
Checks two locations:
1. Adjacent to the video: video_path.parent / root_folder / video_path.stem
2. Release root (one level up): video_path.parent.parent / root_folder / video_path.stem
Returns (files, episode_stem) so the classifier can strip the prefix.
"""
episode_stem = video_path.stem episode_stem = video_path.stem
candidates_dirs = [ known = self.kb.known_extensions()
for subs_dir in (
video_path.parent / root_folder / episode_stem, video_path.parent / root_folder / episode_stem,
video_path.parent.parent / root_folder / episode_stem, video_path.parent.parent / root_folder / episode_stem,
] ):
for subs_dir in candidates_dirs: entries = self.scanner.scan_dir(subs_dir)
if subs_dir.is_dir(): files = [e for e in entries if e.is_file and e.suffix.lower() in known]
files = [
p
for p in sorted(subs_dir.iterdir())
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
]
if files: if files:
logger.debug( logger.debug(
f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}" f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}"
@@ -235,14 +197,13 @@ class SubtitleIdentifier:
def _classify_files( def _classify_files(
self, self,
paths: list[Path], entries: list,
pattern: SubtitlePattern, pattern: SubtitlePattern,
episode_stem: str | None = None, episode_stem: str | None = None,
) -> list[SubtitleCandidate]: ) -> list[SubtitleCandidate]:
tracks = [] tracks = [
for path in paths: self._classify_single(entry, episode_stem=episode_stem) for entry in entries
track = self._classify_single(path, episode_stem=episode_stem) ]
tracks.append(track)
# Post-process: if multiple tracks share same language but type is ambiguous, # Post-process: if multiple tracks share same language but type is ambiguous,
# apply size_and_count disambiguation # apply size_and_count disambiguation
@@ -252,13 +213,13 @@ class SubtitleIdentifier:
return tracks return tracks
def _classify_single( def _classify_single(
self, path: Path, episode_stem: str | None = None self, entry, episode_stem: str | None = None
) -> SubtitleCandidate: ) -> SubtitleCandidate:
fmt = self.kb.format_for_extension(path.suffix) fmt = self.kb.format_for_extension(entry.suffix)
tokens = ( tokens = (
_tokenize_suffix(path.stem, episode_stem) _tokenize_suffix(entry.stem, episode_stem)
if episode_stem if episode_stem
else _tokenize(path.stem) else _tokenize(entry.stem)
) )
language = None language = None
@@ -284,19 +245,21 @@ class SubtitleIdentifier:
if unknown_tokens: if unknown_tokens:
logger.debug( logger.debug(
f"SubtitleIdentifier: unknown tokens in '{path.name}': {unknown_tokens}" f"SubtitleIdentifier: unknown tokens in '{entry.name}': {unknown_tokens}"
) )
size_kb = path.stat().st_size / 1024 if path.exists() else None # Entry count: only meaningful for SRT files; read text on demand.
entry_count = _count_entries(path) if path.exists() else None entry_count: int | None = None
if entry.suffix.lower() == ".srt":
entry_count = _count_entries(self.scanner.read_text(entry.path))
return SubtitleCandidate( return SubtitleCandidate(
language=language, language=language,
format=fmt, format=fmt,
subtitle_type=subtitle_type, subtitle_type=subtitle_type,
is_embedded=False, is_embedded=False,
file_path=path, file_path=entry.path,
file_size_kb=size_kb, file_size_kb=entry.size_kb,
entry_count=entry_count, entry_count=entry_count,
confidence=confidence, confidence=confidence,
raw_tokens=tokens, raw_tokens=tokens,
@@ -1,11 +1,10 @@
"""PatternDetector — discovers the subtitle structure of a release folder.""" """PatternDetector — discovers the subtitle structure of a release folder."""
import json
import logging import logging
import subprocess
from pathlib import Path from pathlib import Path
from ..knowledge.base import SubtitleKnowledgeBase from ...shared.ports import FilesystemScanner, MediaProber
from ..ports import SubtitleKnowledge
from ..value_objects import ScanStrategy, SubtitlePattern from ..value_objects import ScanStrategy, SubtitlePattern
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -20,8 +19,15 @@ class PatternDetector:
a release follows. The result is proposed to the user for confirmation. a release follows. The result is proposed to the user for confirmation.
""" """
def __init__(self, kb: SubtitleKnowledgeBase): def __init__(
self,
kb: SubtitleKnowledge,
prober: MediaProber,
scanner: FilesystemScanner,
):
self.kb = kb self.kb = kb
self.prober = prober
self.scanner = scanner
def detect(self, release_root: Path, sample_video: Path) -> dict: def detect(self, release_root: Path, sample_video: Path) -> dict:
""" """
@@ -45,29 +51,7 @@ class PatternDetector:
} }
def _has_embedded_subtitles(self, video_path: Path) -> bool: def _has_embedded_subtitles(self, video_path: Path) -> bool:
"""Run ffprobe to check whether the video has embedded subtitle streams.""" return len(self.prober.list_subtitle_streams(video_path)) > 0
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"s",
str(video_path),
],
capture_output=True,
text=True,
timeout=30,
check=False,
)
data = json.loads(result.stdout)
return len(data.get("streams", [])) > 0
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
return False
def _inspect(self, release_root: Path, sample_video: Path) -> dict: def _inspect(self, release_root: Path, sample_video: Path) -> dict:
"""Gather structural facts about the release.""" """Gather structural facts about the release."""
@@ -84,42 +68,40 @@ class PatternDetector:
} }
# Check for Subs/ folder — adjacent or at release root # Check for Subs/ folder — adjacent or at release root
for subs_candidate in [ for subs_candidate in (
sample_video.parent / "Subs", sample_video.parent / "Subs",
release_root / "Subs", release_root / "Subs",
]: ):
if subs_candidate.is_dir(): children = self.scanner.scan_dir(subs_candidate)
if not children:
continue
findings["has_subs_folder"] = True findings["has_subs_folder"] = True
findings["subs_root"] = str(subs_candidate) findings["subs_root"] = str(subs_candidate)
# Is it flat or episode_subfolder? # Is it flat or episode_subfolder?
children = list(subs_candidate.iterdir())
sub_files = [ sub_files = [
c c for c in children if c.is_file and c.suffix.lower() in known_exts
for c in children
if c.is_file() and c.suffix.lower() in known_exts
] ]
sub_dirs = [c for c in children if c.is_dir()] sub_dirs = [c for c in children if c.is_dir]
if sub_dirs and not sub_files: if sub_dirs and not sub_files:
findings["subs_strategy"] = "episode_subfolder" findings["subs_strategy"] = "episode_subfolder"
# Count files in a sample subfolder # Count files in a sample subfolder
sample_sub = sub_dirs[0]
sample_files = [ sample_files = [
f f
for f in sample_sub.iterdir() for f in self.scanner.scan_dir(sub_dirs[0].path)
if f.is_file() and f.suffix.lower() in known_exts if f.is_file and f.suffix.lower() in known_exts
] ]
findings["files_per_episode"] = len(sample_files) findings["files_per_episode"] = len(sample_files)
# Check naming conventions # Check naming conventions
for f in sample_files: for f in sample_files:
stem = f.stem parts = f.stem.split("_")
parts = stem.split("_")
if parts[0].isdigit(): if parts[0].isdigit():
findings["has_numeric_prefix"] = True findings["has_numeric_prefix"] = True
if any( if any(
self.kb.is_known_lang_token(t.lower()) self.kb.is_known_lang_token(t.lower())
for t in stem.replace("_", ".").split(".") for t in f.stem.replace("_", ".").split(".")
): ):
findings["has_lang_tokens"] = True findings["has_lang_tokens"] = True
else: else:
@@ -136,9 +118,9 @@ class PatternDetector:
# Check adjacent subs (next to the video) # Check adjacent subs (next to the video)
if not findings["has_subs_folder"]: if not findings["has_subs_folder"]:
adjacent = [ adjacent = [
p e
for p in sample_video.parent.iterdir() for e in self.scanner.scan_dir(sample_video.parent)
if p.is_file() and p.suffix.lower() in known_exts if e.is_file and e.suffix.lower() in known_exts
] ]
if adjacent: if adjacent:
findings["adjacent_subs"] = True findings["adjacent_subs"] = True
@@ -221,6 +203,6 @@ class PatternDetector:
parts.append("no external subtitle files found") parts.append("no external subtitle files found")
if findings.get("has_embedded"): if findings.get("has_embedded"):
parts.append("embedded tracks detected (ffprobe)") parts.append("embedded tracks detected")
return "".join(parts) if parts else "nothing found" return "".join(parts) if parts else "nothing found"
+19 -47
View File
@@ -28,12 +28,11 @@ from __future__ import annotations
import re import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
from ..shared.media import AudioTrack, SubtitleTrack, track_lang_matches from ..shared.media import AudioTrack, MediaWithTracks, SubtitleTrack
from ..shared.value_objects import ( from ..shared.value_objects import (
FilePath, FilePath,
FileSize, FileSize,
ImdbId, ImdbId,
Language,
to_dot_folder_name, to_dot_folder_name,
) )
from .value_objects import ( from .value_objects import (
@@ -48,8 +47,8 @@ from .value_objects import (
# ════════════════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════════════════
@dataclass @dataclass(eq=False)
class Episode: class Episode(MediaWithTracks):
""" """
A single episode of a TV show — leaf of the TVShow aggregate. A single episode of a TV show — leaf of the TVShow aggregate.
@@ -57,6 +56,11 @@ class Episode:
(audio + subtitle). Track lists are populated by the ffprobe + subtitle (audio + subtitle). Track lists are populated by the ffprobe + subtitle
scan pipeline; they may be empty when the episode is known but not yet scan pipeline; they may be empty when the episode is known but not yet
scanned, or when no file is downloaded yet. scanned, or when no file is downloaded yet.
Equality is identity-based within the aggregate: two ``Episode`` instances
are equal iff they share the same ``(season_number, episode_number)``,
regardless of title/file/track contents. The root TVShow guarantees
cross-show uniqueness.
""" """
season_number: SeasonNumber season_number: SeasonNumber
@@ -76,51 +80,19 @@ class Episode:
if isinstance(self.episode_number, int): if isinstance(self.episode_number, int):
self.episode_number = EpisodeNumber(self.episode_number) self.episode_number = EpisodeNumber(self.episode_number)
# ── File presence ────────────────────────────────────────────────────── def __eq__(self, other: object) -> bool:
if not isinstance(other, Episode):
return NotImplemented
return (
self.season_number == other.season_number
and self.episode_number == other.episode_number
)
def has_file(self) -> bool: def __hash__(self) -> int:
"""True if a file path is set and the file actually exists on disk.""" return hash((self.season_number, self.episode_number))
return self.file_path is not None and self.file_path.exists()
def is_downloaded(self) -> bool: # Track helpers (has_audio_in / audio_languages / has_subtitles_in /
"""Alias of ``has_file()`` — reads better in collection-status contexts.""" # has_forced_subs / subtitle_languages) come from MediaWithTracks.
return self.has_file()
# ── Audio helpers ──────────────────────────────────────────────────────
def has_audio_in(self, lang: str | Language) -> bool:
"""True if at least one audio track is in the given language."""
return any(track_lang_matches(t.language, lang) for t in self.audio_tracks)
def audio_languages(self) -> list[str]:
"""Unique audio languages across all tracks, in track order."""
seen: set[str] = set()
result: list[str] = []
for t in self.audio_tracks:
if t.language and t.language not in seen:
seen.add(t.language)
result.append(t.language)
return result
# ── Subtitle helpers ───────────────────────────────────────────────────
def has_subtitles_in(self, lang: str | Language) -> bool:
"""True if at least one subtitle track is in the given language."""
return any(track_lang_matches(t.language, lang) for t in self.subtitle_tracks)
def has_forced_subs(self) -> bool:
"""True if at least one subtitle track is flagged as forced."""
return any(t.is_forced for t in self.subtitle_tracks)
def subtitle_languages(self) -> list[str]:
"""Unique subtitle languages across all tracks, in track order."""
seen: set[str] = set()
result: list[str] = []
for t in self.subtitle_tracks:
if t.language and t.language not in seen:
seen.add(t.language)
result.append(t.language)
return result
# ── Naming ───────────────────────────────────────────────────────────── # ── Naming ─────────────────────────────────────────────────────────────
+21 -11
View File
@@ -57,27 +57,31 @@ def _parse(data: dict) -> MediaInfo:
streams = data.get("streams", []) streams = data.get("streams", [])
fmt = data.get("format", {}) fmt = data.get("format", {})
info = MediaInfo()
# File-level duration/bitrate (ffprobe ``format`` block — independent of streams) # File-level duration/bitrate (ffprobe ``format`` block — independent of streams)
duration_seconds: float | None = None
bitrate_kbps: int | None = None
if "duration" in fmt: if "duration" in fmt:
try: try:
info.duration_seconds = float(fmt["duration"]) duration_seconds = float(fmt["duration"])
except ValueError: except ValueError:
pass pass
if "bit_rate" in fmt: if "bit_rate" in fmt:
try: try:
info.bitrate_kbps = int(fmt["bit_rate"]) // 1000 bitrate_kbps = int(fmt["bit_rate"]) // 1000
except ValueError: except ValueError:
pass pass
video_tracks: list[VideoTrack] = []
audio_tracks: list[AudioTrack] = []
subtitle_tracks: list[SubtitleTrack] = []
for stream in streams: for stream in streams:
codec_type = stream.get("codec_type") codec_type = stream.get("codec_type")
if codec_type == "video": if codec_type == "video":
info.video_tracks.append( video_tracks.append(
VideoTrack( VideoTrack(
index=stream.get("index", len(info.video_tracks)), index=stream.get("index", len(video_tracks)),
codec=stream.get("codec_name"), codec=stream.get("codec_name"),
width=stream.get("width"), width=stream.get("width"),
height=stream.get("height"), height=stream.get("height"),
@@ -86,9 +90,9 @@ def _parse(data: dict) -> MediaInfo:
) )
elif codec_type == "audio": elif codec_type == "audio":
info.audio_tracks.append( audio_tracks.append(
AudioTrack( AudioTrack(
index=stream.get("index", len(info.audio_tracks)), index=stream.get("index", len(audio_tracks)),
codec=stream.get("codec_name"), codec=stream.get("codec_name"),
channels=stream.get("channels"), channels=stream.get("channels"),
channel_layout=stream.get("channel_layout"), channel_layout=stream.get("channel_layout"),
@@ -98,9 +102,9 @@ def _parse(data: dict) -> MediaInfo:
) )
elif codec_type == "subtitle": elif codec_type == "subtitle":
info.subtitle_tracks.append( subtitle_tracks.append(
SubtitleTrack( SubtitleTrack(
index=stream.get("index", len(info.subtitle_tracks)), index=stream.get("index", len(subtitle_tracks)),
codec=stream.get("codec_name"), codec=stream.get("codec_name"),
language=stream.get("tags", {}).get("language"), language=stream.get("tags", {}).get("language"),
is_default=stream.get("disposition", {}).get("default", 0) == 1, is_default=stream.get("disposition", {}).get("default", 0) == 1,
@@ -108,4 +112,10 @@ def _parse(data: dict) -> MediaInfo:
) )
) )
return info return MediaInfo(
video_tracks=tuple(video_tracks),
audio_tracks=tuple(audio_tracks),
subtitle_tracks=tuple(subtitle_tracks),
duration_seconds=duration_seconds,
bitrate_kbps=bitrate_kbps,
)
@@ -0,0 +1,66 @@
"""PathlibFilesystemScanner — FilesystemScanner adapter backed by pathlib."""
from __future__ import annotations
import logging
from pathlib import Path
from alfred.domain.shared.ports import FileEntry
logger = logging.getLogger(__name__)
class PathlibFilesystemScanner:
"""Read-only filesystem scanner using ``pathlib``.
Implements :class:`alfred.domain.shared.ports.FilesystemScanner`
structurally. Never raises — failures are logged and surfaced as
empty results.
"""
def scan_dir(self, path: Path) -> list[FileEntry]:
try:
if not path.is_dir():
return []
children = sorted(path.iterdir())
except OSError as e:
logger.debug(f"PathlibFilesystemScanner: scan_dir failed for {path}: {e}")
return []
entries: list[FileEntry] = []
for child in children:
entry = self._make_entry(child)
if entry is not None:
entries.append(entry)
return entries
def stat(self, path: Path) -> FileEntry | None:
return self._make_entry(path)
def read_text(self, path: Path, encoding: str = "utf-8") -> str | None:
try:
with open(path, encoding=encoding, errors="replace") as f:
return f.read()
except OSError as e:
logger.debug(f"PathlibFilesystemScanner: read_text failed for {path}: {e}")
return None
# ------------------------------------------------------------------
def _make_entry(self, path: Path) -> FileEntry | None:
try:
is_file = path.is_file()
is_dir = path.is_dir()
except OSError:
return None
if not (is_file or is_dir):
return None
size_kb: float | None = None
if is_file:
try:
size_kb = path.stat().st_size / 1024
except OSError:
size_kb = None
return FileEntry(path=path, is_file=is_file, is_dir=is_dir, size_kb=size_kb)
@@ -0,0 +1,6 @@
"""Knowledge loaders — YAML I/O kept out of the domain layer.
Each submodule reads its YAML files from ``alfred/knowledge/`` (builtin,
versioned) and ``data/knowledge/`` (learned, gitignored), and exposes plain
Python values (sets, dicts, classes) for domain code to consume.
"""
@@ -13,7 +13,7 @@ import yaml
import alfred as _alfred_pkg import alfred as _alfred_pkg
from ..value_objects import Language from alfred.domain.shared.value_objects import Language
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2,8 +2,8 @@
import logging import logging
from ...shared.knowledge.language_registry import LanguageRegistry from alfred.infrastructure.knowledge.language_registry import LanguageRegistry
from ..value_objects import ( from alfred.domain.subtitles.value_objects import (
ScanStrategy, ScanStrategy,
SubtitleFormat, SubtitleFormat,
SubtitleLanguage, SubtitleLanguage,
+5
View File
@@ -0,0 +1,5 @@
"""Media probing adapters — concrete implementations of MediaProber."""
from .ffprobe_prober import FfprobeMediaProber
__all__ = ["FfprobeMediaProber"]
@@ -0,0 +1,65 @@
"""FfprobeMediaProber — MediaProber adapter backed by the ffprobe CLI."""
from __future__ import annotations
import json
import logging
import subprocess
from pathlib import Path
from alfred.domain.shared.ports import SubtitleStreamInfo
logger = logging.getLogger(__name__)
_FFPROBE_TIMEOUT_SECONDS = 30
class FfprobeMediaProber:
"""Inspect media files by shelling out to ``ffprobe``.
Implements :class:`alfred.domain.shared.ports.MediaProber` structurally.
Never raises — failures are logged and surfaced as empty results.
"""
def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]:
if not video.exists():
return []
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"s",
str(video),
],
capture_output=True,
text=True,
timeout=_FFPROBE_TIMEOUT_SECONDS,
check=False,
)
data = json.loads(result.stdout)
except (
subprocess.TimeoutExpired,
json.JSONDecodeError,
FileNotFoundError,
) as e:
logger.debug(f"FfprobeMediaProber: ffprobe failed for {video.name}: {e}")
return []
streams: list[SubtitleStreamInfo] = []
for stream in data.get("streams", []):
tags = stream.get("tags", {}) or {}
disposition = stream.get("disposition", {}) or {}
streams.append(
SubtitleStreamInfo(
language=tags.get("language") or None,
is_hearing_impaired=bool(disposition.get("hearing_impaired")),
is_forced=bool(disposition.get("forced")),
)
)
return streams
@@ -14,7 +14,7 @@ from pathlib import Path
from typing import Any from typing import Any
from alfred.domain.subtitles.entities import SubtitleCandidate from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.services.placer import PlacedTrack from alfred.application.subtitles.placer import PlacedTrack
from alfred.infrastructure.metadata.store import MetadataStore from alfred.infrastructure.metadata.store import MetadataStore
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -54,7 +54,7 @@ class RuleSetRepository:
Build and return the resolved RuleSet chain. Build and return the resolved RuleSet chain.
If subtitle_preferences is provided, it seeds the global base rule set If subtitle_preferences is provided, it seeds the global base rule set
from LTM (overriding the hardcoded DEFAULT_RULES). from LTM (overriding the knowledge-base defaults at resolve time).
Returns global default if no overrides exist. Returns global default if no overrides exist.
""" """
base = SubtitleRuleSet.global_default() base = SubtitleRuleSet.global_default()
+1 -1
View File
@@ -41,7 +41,7 @@ from alfred.application.filesystem.manage_subtitles import (
_to_unresolved_dto, _to_unresolved_dto,
) )
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleCandidate from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleCandidate
from alfred.domain.subtitles.services.placer import PlacedTrack, PlaceResult from alfred.application.subtitles.placer import PlacedTrack, PlaceResult
from alfred.domain.subtitles.value_objects import ( from alfred.domain.subtitles.value_objects import (
ScanStrategy, ScanStrategy,
SubtitleFormat, SubtitleFormat,
@@ -1,4 +1,4 @@
"""Tests for ``alfred.domain.subtitles.services.placer.SubtitlePlacer``. """Tests for ``alfred.application.subtitles.placer.SubtitlePlacer``.
The placer hard-links subtitle files next to a destination video, naming The placer hard-links subtitle files next to a destination video, naming
them ``{video_stem}.{lang}[.sdh|.forced].{ext}``. them ``{video_stem}.{lang}[.sdh|.forced].{ext}``.
@@ -22,7 +22,7 @@ from unittest.mock import patch
import pytest import pytest
from alfred.domain.subtitles.entities import SubtitleCandidate from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.services.placer import ( from alfred.application.subtitles.placer import (
PlacedTrack, PlacedTrack,
PlaceResult, PlaceResult,
SubtitlePlacer, SubtitlePlacer,
@@ -198,7 +198,7 @@ class TestOSError:
video.write_bytes(b"") video.write_bytes(b"")
track = _track(src) track = _track(src)
with patch( with patch(
"alfred.domain.subtitles.services.placer.os.link", "alfred.application.subtitles.placer.os.link",
side_effect=OSError("cross-device link"), side_effect=OSError("cross-device link"),
): ):
result = placer.place([track], video) result = placer.place([track], video)
-17
View File
@@ -72,23 +72,6 @@ class TestFilePath:
with pytest.raises(ValidationError): with pytest.raises(ValidationError):
FilePath(123) # type: ignore FilePath(123) # type: ignore
def test_exists_true(self, tmp_path):
p = FilePath(tmp_path)
assert p.exists()
def test_exists_false(self, tmp_path):
p = FilePath(tmp_path / "nonexistent")
assert not p.exists()
def test_is_file(self, tmp_path):
f = tmp_path / "file.txt"
f.write_text("x")
assert FilePath(f).is_file()
assert not FilePath(tmp_path).is_file()
def test_is_dir(self, tmp_path):
assert FilePath(tmp_path).is_dir()
def test_str(self, tmp_path): def test_str(self, tmp_path):
p = FilePath(tmp_path) p = FilePath(tmp_path)
assert str(p) == str(tmp_path) assert str(p) == str(tmp_path)
+31 -20
View File
@@ -22,8 +22,8 @@ from unittest.mock import patch
import pytest import pytest
from alfred.domain.shared.ports import FileEntry
from alfred.domain.subtitles.entities import SubtitleCandidate from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.services.identifier import ( from alfred.domain.subtitles.services.identifier import (
SubtitleIdentifier, SubtitleIdentifier,
_count_entries, _count_entries,
@@ -37,6 +37,19 @@ from alfred.domain.subtitles.value_objects import (
SubtitleType, SubtitleType,
TypeDetectionMethod, TypeDetectionMethod,
) )
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
def _file_entry(path) -> FileEntry:
"""Helper: build a FileEntry from a real tmp_path Path."""
return FileEntry(
path=path,
is_file=path.is_file(),
is_dir=path.is_dir(),
size_kb=(path.stat().st_size / 1024) if path.is_file() else None,
)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@@ -46,7 +59,7 @@ def kb():
@pytest.fixture @pytest.fixture
def identifier(kb): def identifier(kb):
return SubtitleIdentifier(kb) return SubtitleIdentifier(kb, FfprobeMediaProber(), PathlibFilesystemScanner())
def _pattern( def _pattern(
@@ -103,23 +116,19 @@ class TestTokenize:
class TestCountEntries: class TestCountEntries:
def test_last_cue_number(self, tmp_path): def test_last_cue_number(self):
srt = tmp_path / "x.srt" text = (
srt.write_text(
"1\n00:00:01,000 --> 00:00:02,000\nHello\n\n" "1\n00:00:01,000 --> 00:00:02,000\nHello\n\n"
"2\n00:00:03,000 --> 00:00:04,000\nWorld\n\n" "2\n00:00:03,000 --> 00:00:04,000\nWorld\n\n"
"42\n00:00:05,000 --> 00:00:06,000\nLast\n", "42\n00:00:05,000 --> 00:00:06,000\nLast\n"
encoding="utf-8",
) )
assert _count_entries(srt) == 42 assert _count_entries(text) == 42
def test_missing_file_returns_zero(self, tmp_path): def test_missing_file_returns_none(self):
assert _count_entries(tmp_path / "nope.srt") == 0 assert _count_entries(None) is None
def test_empty_file_returns_zero(self, tmp_path): def test_empty_file_returns_zero(self):
f = tmp_path / "x.srt" assert _count_entries("") == 0
f.write_text("")
assert _count_entries(f) == 0
# --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- #
@@ -135,7 +144,7 @@ class TestEmbedded:
video = tmp_path / "v.mkv" video = tmp_path / "v.mkv"
video.write_bytes(b"") video.write_bytes(b"")
with patch( with patch(
"alfred.domain.subtitles.services.identifier.subprocess.run", "alfred.infrastructure.probe.ffprobe_prober.subprocess.run",
side_effect=FileNotFoundError("no ffprobe"), side_effect=FileNotFoundError("no ffprobe"),
): ):
assert identifier._scan_embedded(video) == [] assert identifier._scan_embedded(video) == []
@@ -156,7 +165,7 @@ class TestEmbedded:
stdout = fake_output stdout = fake_output
with patch( with patch(
"alfred.domain.subtitles.services.identifier.subprocess.run", "alfred.infrastructure.probe.ffprobe_prober.subprocess.run",
return_value=FakeResult(), return_value=FakeResult(),
): ):
tracks = identifier._scan_embedded(video) tracks = identifier._scan_embedded(video)
@@ -256,7 +265,7 @@ class TestClassify:
def test_classifies_language_and_format(self, identifier, tmp_path): def test_classifies_language_and_format(self, identifier, tmp_path):
f = tmp_path / "Show.S01E01.English.srt" f = tmp_path / "Show.S01E01.English.srt"
f.write_text("1\n00:00:01,000 --> 00:00:02,000\nHi\n") f.write_text("1\n00:00:01,000 --> 00:00:02,000\nHi\n")
track = identifier._classify_single(f) track = identifier._classify_single(_file_entry(f))
assert track.language.code == "eng" assert track.language.code == "eng"
assert track.format.id == "srt" assert track.format.id == "srt"
assert track.confidence > 0 assert track.confidence > 0
@@ -265,13 +274,13 @@ class TestClassify:
def test_classifies_type_token(self, identifier, tmp_path): def test_classifies_type_token(self, identifier, tmp_path):
f = tmp_path / "Show.S01E01.English.sdh.srt" f = tmp_path / "Show.S01E01.English.sdh.srt"
f.write_text("") f.write_text("")
track = identifier._classify_single(f) track = identifier._classify_single(_file_entry(f))
assert track.subtitle_type == SubtitleType.SDH assert track.subtitle_type == SubtitleType.SDH
def test_unknown_tokens_lower_confidence(self, identifier, tmp_path): def test_unknown_tokens_lower_confidence(self, identifier, tmp_path):
f = tmp_path / "Show.S01E01.gibberish.srt" f = tmp_path / "Show.S01E01.gibberish.srt"
f.write_text("") f.write_text("")
track = identifier._classify_single(f) track = identifier._classify_single(_file_entry(f))
# No lang/type recognized → confidence is 0 or very low. # No lang/type recognized → confidence is 0 or very low.
assert track.language is None assert track.language is None
assert track.confidence < 0.5 assert track.confidence < 0.5
@@ -279,7 +288,9 @@ class TestClassify:
def test_episode_stem_prefix_stripped(self, identifier, tmp_path): def test_episode_stem_prefix_stripped(self, identifier, tmp_path):
f = tmp_path / "Show.S01E01.English.srt" f = tmp_path / "Show.S01E01.English.srt"
f.write_text("") f.write_text("")
track = identifier._classify_single(f, episode_stem="Show.S01E01") track = identifier._classify_single(
_file_entry(f), episode_stem="Show.S01E01"
)
# Only "english" remains as meaningful token → confidence == 1.0 # Only "english" remains as meaningful token → confidence == 1.0
assert track.language.code == "eng" assert track.language.code == "eng"
assert track.confidence == 1.0 assert track.confidence == 1.0
+4 -4
View File
@@ -1,4 +1,4 @@
"""Tests for ``alfred.domain.subtitles.knowledge`` (loader + base). """Tests for ``alfred.infrastructure.knowledge.subtitles`` (loader + base).
Covers: Covers:
@@ -19,9 +19,9 @@ from pathlib import Path
import pytest import pytest
from alfred.domain.subtitles.knowledge import loader as loader_mod from alfred.infrastructure.knowledge.subtitles import loader as loader_mod
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader, _merge from alfred.infrastructure.knowledge.subtitles.loader import KnowledgeLoader, _merge
from alfred.domain.subtitles.value_objects import ( from alfred.domain.subtitles.value_objects import (
ScanStrategy, ScanStrategy,
SubtitleType, SubtitleType,
@@ -25,8 +25,10 @@ from unittest.mock import patch
import pytest import pytest
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.services.pattern_detector import PatternDetector from alfred.domain.subtitles.services.pattern_detector import PatternDetector
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@@ -36,7 +38,7 @@ def kb():
@pytest.fixture @pytest.fixture
def detector(kb): def detector(kb):
return PatternDetector(kb) return PatternDetector(kb, FfprobeMediaProber(), PathlibFilesystemScanner())
def _make_video(folder: Path, name: str = "Show.S01E01.mkv") -> Path: def _make_video(folder: Path, name: str = "Show.S01E01.mkv") -> Path:
-232
View File
@@ -1,232 +0,0 @@
"""Tests for SubtitleScanner and _classify helper."""
from pathlib import Path
from alfred.domain.subtitles.scanner import (
SubtitleCandidate,
SubtitleScanner,
_classify,
)
# ---------------------------------------------------------------------------
# _classify — unit tests for the filename parser
# ---------------------------------------------------------------------------
class TestClassify:
def test_iso_lang_code_639_1_alias(self, tmp_path):
# ``fr`` is an alias of the canonical ISO 639-2/B code ``fre``.
p = tmp_path / "fr.srt"
p.write_text("")
lang, is_sdh, is_forced = _classify(p)
assert lang == "fre"
assert not is_sdh
assert not is_forced
def test_english_keyword(self, tmp_path):
p = tmp_path / "english.srt"
p.write_text("")
lang, _, _ = _classify(p)
assert lang == "eng"
def test_french_keyword(self, tmp_path):
p = tmp_path / "Show.S01E01.French.srt"
p.write_text("")
lang, _, _ = _classify(p)
assert lang == "fre"
def test_vostfr_is_french(self, tmp_path):
p = tmp_path / "Show.S01E01.VOSTFR.srt"
p.write_text("")
lang, _, _ = _classify(p)
assert lang == "fre"
def test_sdh_token(self, tmp_path):
p = tmp_path / "fre.sdh.srt"
p.write_text("")
lang, is_sdh, _ = _classify(p)
assert lang == "fre"
assert is_sdh
def test_hi_no_longer_marks_sdh(self, tmp_path):
# ``hi`` is the ISO 639-1 alias for Hindi; it must not mark a file as
# SDH any more (regression of the previous collision between SDH and
# Hindi tokens). Use ``sdh`` / ``cc`` / ``hearing`` to flag SDH instead.
p = tmp_path / "en.hi.srt"
p.write_text("")
lang, is_sdh, _ = _classify(p)
assert lang == "eng"
assert not is_sdh
def test_forced_token(self, tmp_path):
p = tmp_path / "fre.forced.srt"
p.write_text("")
_, _, is_forced = _classify(p)
assert is_forced
def test_unknown_language_returns_none(self, tmp_path):
p = tmp_path / "Show.S01E01.720p.srt"
p.write_text("")
lang, _, _ = _classify(p)
assert lang is None
def test_dot_separator(self, tmp_path):
p = tmp_path / "fre.sdh.srt"
p.write_text("")
lang, is_sdh, _ = _classify(p)
assert lang == "fre"
assert is_sdh
def test_hyphen_separator(self, tmp_path):
p = tmp_path / "fre-forced.srt"
p.write_text("")
lang, _, is_forced = _classify(p)
assert lang == "fre"
assert is_forced
# ---------------------------------------------------------------------------
# SubtitleCandidate.destination_name
# ---------------------------------------------------------------------------
class TestSubtitleCandidateDestinationName:
def _make(self, lang="fre", is_sdh=False, is_forced=False, ext=".srt", path=None):
return SubtitleCandidate(
source_path=path or Path("/fake/fre.srt"),
language=lang,
is_sdh=is_sdh,
is_forced=is_forced,
extension=ext,
)
def test_standard(self):
assert self._make().destination_name == "fre.srt"
def test_sdh(self):
assert self._make(is_sdh=True).destination_name == "fre.sdh.srt"
def test_forced(self):
assert self._make(is_forced=True).destination_name == "fre.forced.srt"
def test_ass_extension(self):
assert self._make(ext=".ass").destination_name == "fre.ass"
def test_english_standard(self):
assert self._make(lang="eng").destination_name == "eng.srt"
# ---------------------------------------------------------------------------
# SubtitleScanner — integration with real filesystem
# ---------------------------------------------------------------------------
class TestSubtitleScanner:
def _scanner(self, languages=None, min_size_kb=0, keep_sdh=True, keep_forced=True):
return SubtitleScanner(
languages=languages or ["fre", "eng"],
min_size_kb=min_size_kb,
keep_sdh=keep_sdh,
keep_forced=keep_forced,
)
def _video(self, tmp_path):
video = tmp_path / "Movie.mkv"
video.write_bytes(b"video")
return video
def test_finds_adjacent_subtitle(self, tmp_path):
video = self._video(tmp_path)
(tmp_path / "fre.srt").write_text("subtitle content")
candidates = self._scanner().scan(video)
assert len(candidates) == 1
assert candidates[0].language == "fre"
def test_finds_adjacent_subtitle_legacy_639_1(self, tmp_path):
# Reading existing media libraries: ``fr.srt`` is still recognized as
# French and classified canonically as ``fre`` — covers user libraries
# written before the ISO 639-2/B migration.
video = self._video(tmp_path)
(tmp_path / "fr.srt").write_text("subtitle content")
candidates = self._scanner().scan(video)
assert len(candidates) == 1
assert candidates[0].language == "fre"
def test_finds_multiple_languages(self, tmp_path):
video = self._video(tmp_path)
(tmp_path / "fre.srt").write_text("fr subtitle")
(tmp_path / "eng.srt").write_text("en subtitle")
candidates = self._scanner().scan(video)
langs = {c.language for c in candidates}
assert langs == {"fre", "eng"}
def test_scans_subs_subfolder(self, tmp_path):
video = self._video(tmp_path)
subs = tmp_path / "Subs"
subs.mkdir()
(subs / "fre.srt").write_text("subtitle")
candidates = self._scanner().scan(video)
assert any(c.language == "fre" for c in candidates)
def test_filters_unknown_language(self, tmp_path):
video = self._video(tmp_path)
(tmp_path / "unknown.srt").write_text("subtitle")
candidates = self._scanner().scan(video)
assert len(candidates) == 0
def test_filters_wrong_language(self, tmp_path):
video = self._video(tmp_path)
(tmp_path / "ger.srt").write_text("german subtitle")
candidates = self._scanner(languages=["fre"]).scan(video)
assert len(candidates) == 0
def test_filters_too_small_file(self, tmp_path):
video = self._video(tmp_path)
small = tmp_path / "fre.srt"
small.write_bytes(b"x") # 1 byte, well below any min_size_kb
candidates = self._scanner(min_size_kb=10).scan(video)
assert len(candidates) == 0
def test_filters_sdh_when_not_wanted(self, tmp_path):
video = self._video(tmp_path)
(tmp_path / "fre.sdh.srt").write_text("sdh subtitle")
candidates = self._scanner(keep_sdh=False).scan(video)
assert len(candidates) == 0
def test_filters_forced_when_not_wanted(self, tmp_path):
video = self._video(tmp_path)
(tmp_path / "fre.forced.srt").write_text("forced subtitle")
candidates = self._scanner(keep_forced=False).scan(video)
assert len(candidates) == 0
def test_keeps_sdh_when_wanted(self, tmp_path):
video = self._video(tmp_path)
(tmp_path / "fre.sdh.srt").write_text("sdh subtitle")
candidates = self._scanner(keep_sdh=True).scan(video)
assert len(candidates) == 1
assert candidates[0].is_sdh
def test_ignores_non_subtitle_files(self, tmp_path):
video = self._video(tmp_path)
(tmp_path / "fre.nfo").write_text("nfo file")
(tmp_path / "fre.jpg").write_bytes(b"image")
candidates = self._scanner().scan(video)
assert len(candidates) == 0
def test_returns_empty_when_no_subtitles(self, tmp_path):
video = self._video(tmp_path)
candidates = self._scanner().scan(video)
assert candidates == []
+19 -10
View File
@@ -30,9 +30,19 @@ from alfred.domain.subtitles.value_objects import (
RuleScope, RuleScope,
SubtitleFormat, SubtitleFormat,
SubtitleLanguage, SubtitleLanguage,
SubtitleMatchingRules,
SubtitleType, SubtitleType,
) )
# Test fixture: stand-in for what the KB would provide at runtime.
_DEFAULT_RULES = SubtitleMatchingRules(
preferred_languages=["eng"],
preferred_formats=["srt"],
allowed_types=["standard"],
format_priority=["srt", "ass"],
min_confidence=0.7,
)
# --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- #
# Value objects # # Value objects #
# --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- #
@@ -230,18 +240,17 @@ class TestAvailableSubtitles:
class TestSubtitleRuleSet: class TestSubtitleRuleSet:
def test_global_default_uses_kb_defaults(self): def test_global_default_returns_injected_defaults(self):
rs = SubtitleRuleSet.global_default() rs = SubtitleRuleSet.global_default()
rules = rs.resolve() rules = rs.resolve(_DEFAULT_RULES)
# Loaded from subtitles.yaml — defaults must be non-empty. assert rules.preferred_languages == _DEFAULT_RULES.preferred_languages
assert rules.preferred_languages assert rules.preferred_formats == _DEFAULT_RULES.preferred_formats
assert rules.preferred_formats assert rules.min_confidence == _DEFAULT_RULES.min_confidence
assert 0 < rules.min_confidence <= 1
def test_override_persists(self): def test_override_persists(self):
rs = SubtitleRuleSet.global_default() rs = SubtitleRuleSet.global_default()
rs.override(languages=["eng"], min_confidence=0.9) rs.override(languages=["eng"], min_confidence=0.9)
rules = rs.resolve() rules = rs.resolve(_DEFAULT_RULES)
assert rules.preferred_languages == ["eng"] assert rules.preferred_languages == ["eng"]
assert rules.min_confidence == 0.9 assert rules.min_confidence == 0.9
@@ -252,10 +261,10 @@ class TestSubtitleRuleSet:
parent=parent, parent=parent,
) )
child.override(languages=["jpn"]) child.override(languages=["jpn"])
rules = child.resolve() rules = child.resolve(_DEFAULT_RULES)
assert rules.preferred_languages == ["jpn"] assert rules.preferred_languages == ["jpn"]
# min_confidence not overridden at child or parent → falls back to defaults # min_confidence not overridden at child or parent → falls back to defaults
assert rules.min_confidence == parent.resolve().min_confidence assert rules.min_confidence == parent.resolve(_DEFAULT_RULES).min_confidence
def test_to_dict_only_emits_set_deltas(self): def test_to_dict_only_emits_set_deltas(self):
rs = SubtitleRuleSet(scope=RuleScope(level="show", identifier="tt1")) rs = SubtitleRuleSet(scope=RuleScope(level="show", identifier="tt1"))
@@ -286,4 +295,4 @@ class TestSubtitleRuleSet:
# code uses `is not None` explicitly. Verify 0.0 doesn't fall back. # code uses `is not None` explicitly. Verify 0.0 doesn't fall back.
rs = SubtitleRuleSet.global_default() rs = SubtitleRuleSet.global_default()
rs.override(min_confidence=0.0) rs.override(min_confidence=0.0)
assert rs.resolve().min_confidence == 0.0 assert rs.resolve(_DEFAULT_RULES).min_confidence == 0.0
+2 -3
View File
@@ -157,10 +157,9 @@ class TestEpisode:
assert filename.startswith("S01E05") assert filename.startswith("S01E05")
assert "Gray.Matter" in filename assert "Gray.Matter" in filename
def test_has_file_false_when_no_path(self): def test_file_path_unset_by_default(self):
e = self._ep() e = self._ep()
assert not e.has_file() assert e.file_path is None
assert not e.is_downloaded()
def test_str_format(self): def test_str_format(self):
e = self._ep(season=2, episode=3, title="Bit by a Dead Bee") e = self._ep(season=2, episode=3, title="Bit by a Dead Bee")
+20 -8
View File
@@ -17,6 +17,7 @@ from pathlib import Path
import yaml import yaml
from alfred.domain.subtitles.value_objects import SubtitleMatchingRules
from alfred.infrastructure.persistence.memory.ltm.components.subtitle_preferences import ( from alfred.infrastructure.persistence.memory.ltm.components.subtitle_preferences import (
SubtitlePreferences, SubtitlePreferences,
) )
@@ -25,6 +26,15 @@ from alfred.infrastructure.subtitle.rule_repository import (
_filter_override, _filter_override,
) )
# Stand-in for KB defaults, injected at resolve().
_DEFAULT_RULES = SubtitleMatchingRules(
preferred_languages=["eng"],
preferred_formats=["srt"],
allowed_types=["standard"],
format_priority=["srt", "ass"],
min_confidence=0.7,
)
def _write(path: Path, data: dict) -> None: def _write(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
@@ -71,17 +81,17 @@ class TestLoad:
def test_no_files_returns_global_default(self, tmp_path): def test_no_files_returns_global_default(self, tmp_path):
repo = RuleSetRepository(tmp_path) repo = RuleSetRepository(tmp_path)
rs = repo.load() rs = repo.load()
# Should resolve cleanly using the hardcoded defaults. # With no overrides, resolve returns the injected defaults unchanged.
rules = rs.resolve() rules = rs.resolve(_DEFAULT_RULES)
assert rules.preferred_languages # non-empty assert rules.preferred_languages == _DEFAULT_RULES.preferred_languages
assert rules.min_confidence > 0 assert rules.min_confidence == _DEFAULT_RULES.min_confidence
def test_subtitle_preferences_override_base(self, tmp_path): def test_subtitle_preferences_override_base(self, tmp_path):
prefs = SubtitlePreferences( prefs = SubtitlePreferences(
languages=["jpn"], formats=["ass"], types=["standard"] languages=["jpn"], formats=["ass"], types=["standard"]
) )
repo = RuleSetRepository(tmp_path) repo = RuleSetRepository(tmp_path)
rules = repo.load(subtitle_preferences=prefs).resolve() rules = repo.load(subtitle_preferences=prefs).resolve(_DEFAULT_RULES)
assert rules.preferred_languages == ["jpn"] assert rules.preferred_languages == ["jpn"]
assert rules.preferred_formats == ["ass"] assert rules.preferred_formats == ["ass"]
assert rules.allowed_types == ["standard"] assert rules.allowed_types == ["standard"]
@@ -92,7 +102,7 @@ class TestLoad:
{"override": {"languages": ["spa"], "min_confidence": 0.95}}, {"override": {"languages": ["spa"], "min_confidence": 0.95}},
) )
repo = RuleSetRepository(tmp_path) repo = RuleSetRepository(tmp_path)
rules = repo.load().resolve() rules = repo.load().resolve(_DEFAULT_RULES)
assert rules.preferred_languages == ["spa"] assert rules.preferred_languages == ["spa"]
assert rules.min_confidence == 0.95 assert rules.min_confidence == 0.95
@@ -102,7 +112,7 @@ class TestLoad:
{"override": {"format_priority": ["ass", "srt"]}}, {"override": {"format_priority": ["ass", "srt"]}},
) )
repo = RuleSetRepository(tmp_path) repo = RuleSetRepository(tmp_path)
rules = repo.load(release_group="KONTRAST").resolve() rules = repo.load(release_group="KONTRAST").resolve(_DEFAULT_RULES)
assert rules.format_priority == ["ass", "srt"] assert rules.format_priority == ["ass", "srt"]
def test_full_three_level_chain(self, tmp_path): def test_full_three_level_chain(self, tmp_path):
@@ -119,7 +129,9 @@ class TestLoad:
{"override": {"min_confidence": 0.99}}, {"override": {"min_confidence": 0.99}},
) )
repo = RuleSetRepository(tmp_path) repo = RuleSetRepository(tmp_path)
rules = repo.load(release_group="GRP", subtitle_preferences=prefs).resolve() rules = repo.load(release_group="GRP", subtitle_preferences=prefs).resolve(
_DEFAULT_RULES
)
# All three levels visible — local overrides on top # All three levels visible — local overrides on top
assert rules.preferred_languages == ["jpn"] assert rules.preferred_languages == ["jpn"]
assert rules.format_priority == ["ass"] assert rules.format_priority == ["ass"]
@@ -17,7 +17,7 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from alfred.domain.subtitles.entities import SubtitleCandidate from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.services.placer import PlacedTrack from alfred.application.subtitles.placer import PlacedTrack
from alfred.domain.subtitles.value_objects import ( from alfred.domain.subtitles.value_objects import (
SubtitleFormat, SubtitleFormat,
SubtitleLanguage, SubtitleLanguage,