Merge branch 'refactor/domain-io-extraction'

Extract all I/O (subprocess, filesystem, YAML loading) from the domain
layer via ports/adapters. domain/subtitles/ now has zero imports from
infrastructure/. The remaining domain → infra leak (release knowledge
loaded at import time) is documented in tech-debt for a dedicated branch.
This commit is contained in:
2026-05-19 15:16:59 +02:00
41 changed files with 603 additions and 329 deletions
+42
View File
@@ -112,6 +112,48 @@ callers).
### Internal
- **Domain I/O extraction** (`refactor/domain-io-extraction`): the domain
layer no longer performs subprocess calls, filesystem scans, or YAML
loading. Achieved in a series of focused commits:
- **Knowledge YAML loaders moved to infrastructure**:
`alfred/domain/release/knowledge.py`,
`alfred/domain/shared/knowledge/language_registry.py`, and
`alfred/domain/subtitles/knowledge/{base,loader}.py` relocated to
`alfred/infrastructure/knowledge/`. Re-exports were dropped — callers
import directly from the new location.
- **`MediaProber` and `FilesystemScanner` Protocol ports** introduced at
`alfred/domain/shared/ports/` with frozen-dataclass DTOs
(`SubtitleStreamInfo`, `FileEntry`). `SubtitleIdentifier` and
`PatternDetector` are now constructor-injected with concrete adapters
(`FfprobeMediaProber` wrapping `subprocess.run(ffprobe)` and
`PathlibFilesystemScanner` wrapping `pathlib`). No more direct
`subprocess`/`pathlib` usage from the subtitle domain services.
- **Live filesystem methods removed from VOs and entities**:
`FilePath.exists()` / `.is_file()` / `.is_dir()` deleted —
`FilePath` is now a pure address VO. `Movie.has_file()` and
`Episode.is_downloaded()` dropped. Callers either rely on a prior
detection step or use try/except over pre-checks (eliminates
TOCTOU races).
- **`SubtitlePlacer` moved to the application layer** at
`alfred/application/subtitles/placer.py` — it performs `os.link`
I/O, which doesn't belong in the domain. Pre-checks replaced with
try/except for `FileNotFoundError`/`FileExistsError`.
- **`SubtitleRuleSet.resolve()` no longer reaches into the knowledge
base**: the implicit `DEFAULT_RULES()` helper is gone, replaced by
an explicit `default_rules: SubtitleMatchingRules` parameter. The
`ManageSubtitles` use case loads defaults from the KB once and
passes them in.
- **`SubtitleKnowledge` Protocol port** at
`alfred/domain/subtitles/ports/knowledge.py` declares the read-only
query surface domain services consume (7 methods:
`known_extensions`, `format_for_extension`, `language_for_token`,
`is_known_lang_token`, `type_for_token`, `is_known_type_token`,
`patterns`). `SubtitleIdentifier` and `PatternDetector` depend on
this Protocol instead of the concrete `SubtitleKnowledgeBase` from
infrastructure — `domain/subtitles/` now has zero imports from
`infrastructure/`. The remaining domain → infra leak
(`domain/release/` loading separator YAML at import-time) is
documented in tech-debt and scheduled for its own branch.
- **`to_dot_folder_name(title)` helper** in
`alfred/domain/shared/value_objects.py` — extracts the
`re.sub(r"[^\w\s\.\-]", "", title).replace(" ", ".")` pattern that was
@@ -5,19 +5,21 @@ from pathlib import Path
from alfred.domain.shared.value_objects import ImdbId
from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader
from alfred.domain.subtitles.services.identifier import SubtitleIdentifier
from alfred.domain.subtitles.services.matcher import SubtitleMatcher
from alfred.domain.subtitles.services.pattern_detector import PatternDetector
from alfred.domain.subtitles.services.placer import (
from alfred.application.subtitles.placer import (
PlacedTrack,
SubtitlePlacer,
_build_dest_name,
)
from alfred.domain.subtitles.services.utils import available_subtitles
from alfred.domain.subtitles.value_objects import ScanStrategy
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from alfred.infrastructure.knowledge.subtitles.loader import KnowledgeLoader
from alfred.infrastructure.persistence.context import get_memory
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
from alfred.infrastructure.subtitle.metadata_store import SubtitleMetadataStore
from alfred.infrastructure.subtitle.rule_repository import RuleSetRepository
@@ -91,13 +93,21 @@ class ManageSubtitlesUseCase:
)
kb = SubtitleKnowledgeBase(KnowledgeLoader())
prober = FfprobeMediaProber()
scanner = PathlibFilesystemScanner()
library_root = _infer_library_root(dest_path, media_type)
store = SubtitleMetadataStore(library_root)
repo = RuleSetRepository(library_root)
# --- Pattern resolution ---
pattern = self._resolve_pattern(
kb, store, source_path, confirmed_pattern_id, release_group
kb,
prober,
scanner,
store,
source_path,
confirmed_pattern_id,
release_group,
)
if pattern is None:
return ManageSubtitlesResponse(
@@ -108,7 +118,7 @@ class ManageSubtitlesUseCase:
# --- Identify ---
media_id = _to_imdb_id(imdb_id)
identifier = SubtitleIdentifier(kb)
identifier = SubtitleIdentifier(kb, prober, scanner)
metadata = identifier.identify(
video_path=source_path,
pattern=pattern,
@@ -153,7 +163,7 @@ class ManageSubtitlesUseCase:
subtitle_prefs = memory.ltm.subtitle_preferences
except Exception:
pass
rules = repo.load(release_group, subtitle_prefs).resolve()
rules = repo.load(release_group, subtitle_prefs).resolve(kb.default_rules())
matcher = SubtitleMatcher()
matched, unresolved = matcher.match(metadata.external_tracks, rules)
@@ -228,6 +238,8 @@ class ManageSubtitlesUseCase:
def _resolve_pattern(
self,
kb: SubtitleKnowledgeBase,
prober: FfprobeMediaProber,
scanner: PathlibFilesystemScanner,
store: SubtitleMetadataStore,
source_path: Path,
confirmed_pattern_id: str | None,
@@ -250,7 +262,7 @@ class ManageSubtitlesUseCase:
# 3. Auto-detect
release_root = source_path.parent
detector = PatternDetector(kb)
detector = PatternDetector(kb, prober, scanner)
result = detector.detect(release_root, source_path)
if result["detected"] and result["confidence"] >= 0.6:
@@ -5,8 +5,8 @@ import os
from dataclasses import dataclass
from pathlib import Path
from ..entities import SubtitleCandidate
from ..value_objects import SubtitleType
from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.value_objects import SubtitleType
logger = logging.getLogger(__name__)
@@ -78,8 +78,8 @@ class SubtitlePlacer:
skipped.append((track, "embedded — no file to place"))
continue
if not track.file_path or not track.file_path.exists():
skipped.append((track, "source file not found"))
if not track.file_path:
skipped.append((track, "source file not set"))
continue
try:
@@ -90,11 +90,6 @@ class SubtitlePlacer:
dest_path = dest_dir / dest_name
if dest_path.exists():
logger.debug(f"SubtitlePlacer: skip {dest_name} — already exists")
skipped.append((track, "destination already exists"))
continue
try:
os.link(track.file_path, dest_path)
placed.append(
@@ -105,6 +100,11 @@ class SubtitlePlacer:
)
)
logger.info(f"SubtitlePlacer: placed {dest_name}")
except FileNotFoundError:
skipped.append((track, "source file not found"))
except FileExistsError:
logger.debug(f"SubtitlePlacer: skip {dest_name} — already exists")
skipped.append((track, "destination already exists"))
except OSError as e:
logger.warning(f"SubtitlePlacer: failed to place {dest_name}: {e}")
skipped.append((track, str(e)))
-8
View File
@@ -65,14 +65,6 @@ class Movie(MediaWithTracks):
def __hash__(self) -> int:
return hash(self.imdb_id)
def has_file(self) -> bool:
"""Check if the movie has an associated file."""
return self.file_path is not None and self.file_path.exists()
def is_downloaded(self) -> bool:
"""Check if the movie is downloaded (has a file)."""
return self.has_file()
# Track helpers (has_audio_in / audio_languages / has_subtitles_in /
# has_forced_subs / subtitle_languages) come from MediaWithTracks.
+1 -1
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
import re
from .knowledge import load_separators
from alfred.infrastructure.knowledge.release import load_separators
from .value_objects import (
_AUDIO,
_CODECS,
+1 -1
View File
@@ -6,7 +6,7 @@ from dataclasses import dataclass, field
from enum import Enum
from ..shared.exceptions import ValidationError
from .knowledge import (
from alfred.infrastructure.knowledge.release import (
load_audio,
load_codecs,
load_editions,
@@ -1,5 +0,0 @@
"""Shared knowledge loaders (cross-domain)."""
from .language_registry import LanguageRegistry
__all__ = ["LanguageRegistry"]
+17
View File
@@ -0,0 +1,17 @@
"""Ports — Protocol interfaces the domain depends on.
Adapters live in ``alfred/infrastructure/`` and implement these protocols.
Domain code never imports infrastructure; it accepts a port via constructor
injection and calls it. Tests can pass in-memory fakes that satisfy the
Protocol without going through real I/O.
"""
from .filesystem_scanner import FileEntry, FilesystemScanner
from .media_prober import MediaProber, SubtitleStreamInfo
__all__ = [
"FileEntry",
"FilesystemScanner",
"MediaProber",
"SubtitleStreamInfo",
]
@@ -0,0 +1,59 @@
"""FilesystemScanner port — abstracts filesystem inspection.
The domain never calls ``Path.iterdir``, ``Path.is_file``, ``Path.stat`` or
``open()`` directly. It asks the scanner for a ``FileEntry`` snapshot and
reasons from there. One scan = one I/O round-trip; no callbacks back to disk.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
@dataclass(frozen=True)
class FileEntry:
"""Frozen snapshot of one filesystem entry, taken at scan time.
The entry carries enough metadata for the domain to classify and order
files without re-querying the OS. ``size_kb`` is ``None`` for directories
and for files whose size could not be read.
"""
path: Path
is_file: bool
is_dir: bool
size_kb: float | None
@property
def name(self) -> str:
return self.path.name
@property
def stem(self) -> str:
return self.path.stem
@property
def suffix(self) -> str:
return self.path.suffix
class FilesystemScanner(Protocol):
"""Read-only filesystem inspection."""
def scan_dir(self, path: Path) -> list[FileEntry]:
"""Return sorted entries directly inside ``path``.
Returns an empty list when ``path`` is not a directory or is
unreadable. Adapters must not raise.
"""
...
def stat(self, path: Path) -> FileEntry | None:
"""Stat a single path; ``None`` when it doesn't exist or is unreadable."""
...
def read_text(self, path: Path, encoding: str = "utf-8") -> str | None:
"""Read a text file in one go; ``None`` on any error."""
...
@@ -0,0 +1,39 @@
"""MediaProber port — abstracts media stream inspection (e.g. ffprobe).
The adapter (typically wrapping ffprobe) maps low-level container metadata
into the small set of stream attributes the domain reasons about. Replacing
ffprobe with another tool only requires a new adapter — domain stays put.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
@dataclass(frozen=True)
class SubtitleStreamInfo:
"""A single embedded subtitle stream, as seen by the prober.
``language`` is the raw language tag emitted by the container (typically
ISO 639-2 like ``"fre"``, ``"eng"``); may be empty/None when the stream
has no language tag. The domain resolves it to a canonical ``Language``
via the knowledge base.
"""
language: str | None
is_hearing_impaired: bool
is_forced: bool
class MediaProber(Protocol):
"""Inspect a media file's stream metadata."""
def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]:
"""Return all subtitle streams in ``video``.
Returns an empty list when the file is missing, unreadable, or has
no subtitle streams. Adapters must not raise.
"""
...
-12
View File
@@ -67,18 +67,6 @@ class FilePath:
# Use object.__setattr__ because dataclass is frozen
object.__setattr__(self, "value", path_obj)
def exists(self) -> bool:
"""Check if the path exists."""
return self.value.exists()
def is_file(self) -> bool:
"""Check if the path is a file."""
return self.value.is_file()
def is_dir(self) -> bool:
"""Check if the path is a directory."""
return self.value.is_dir()
def __str__(self) -> str:
return str(self.value)
-3
View File
@@ -3,7 +3,6 @@
from .aggregates import SubtitleRuleSet
from .entities import MediaSubtitleMetadata, SubtitleCandidate
from .exceptions import SubtitleNotFound
from .knowledge import KnowledgeLoader, SubtitleKnowledgeBase
from .services import PatternDetector, SubtitleIdentifier, SubtitleMatcher
from .value_objects import (
RuleScope,
@@ -20,8 +19,6 @@ __all__ = [
"SubtitleCandidate",
"MediaSubtitleMetadata",
"SubtitleRuleSet",
"SubtitleKnowledgeBase",
"KnowledgeLoader",
"SubtitleIdentifier",
"SubtitleMatcher",
"PatternDetector",
+9 -9
View File
@@ -4,15 +4,9 @@ from dataclasses import dataclass, field
from typing import Any
from ..shared.value_objects import ImdbId
from .knowledge.base import SubtitleKnowledgeBase
from .value_objects import RuleScope, SubtitleMatchingRules
def DEFAULT_RULES() -> SubtitleMatchingRules:
"""Load default matching rules from subtitles.yaml (defaults section)."""
return SubtitleKnowledgeBase().default_rules()
@dataclass
class SubtitleRuleSet:
"""
@@ -36,12 +30,18 @@ class SubtitleRuleSet:
_format_priority: list[str] | None = field(default=None, repr=False)
_min_confidence: float | None = field(default=None, repr=False)
def resolve(self) -> SubtitleMatchingRules:
def resolve(self, default_rules: SubtitleMatchingRules) -> SubtitleMatchingRules:
"""
Walk the parent chain and merge deltas into effective rules.
Falls back to DEFAULT_RULES at the top of the chain.
``default_rules`` seeds the top of the chain — it is the caller's
responsibility to load these from the knowledge base (infrastructure).
Keeping the default rules as a parameter keeps the domain free of
any I/O dependency.
"""
base = self.parent.resolve() if self.parent else DEFAULT_RULES()
base = (
self.parent.resolve(default_rules) if self.parent else default_rules
)
return SubtitleMatchingRules(
preferred_languages=self._languages or base.preferred_languages,
preferred_formats=self._formats or base.preferred_formats,
@@ -0,0 +1,6 @@
"""Domain ports for the subtitles domain — Protocol-based abstractions
that decouple domain services from concrete infrastructure adapters."""
from .knowledge import SubtitleKnowledge
__all__ = ["SubtitleKnowledge"]
@@ -0,0 +1,38 @@
"""SubtitleKnowledge port — the query surface domain services need from the
subtitle knowledge base, expressed as a Protocol so the domain never imports
the infrastructure adapter that backs it.
The concrete implementation lives in
``alfred/infrastructure/knowledge/subtitles/base.py`` (the YAML-backed
``SubtitleKnowledgeBase``). Tests can supply any object that satisfies this
structural contract.
"""
from __future__ import annotations
from typing import Protocol
from ..value_objects import SubtitleFormat, SubtitleLanguage, SubtitlePattern, SubtitleType
class SubtitleKnowledge(Protocol):
"""Read-only query surface for subtitle knowledge consumed by the domain.
Only the methods that domain services actually call belong here — anything
else (defaults loading, reload, pattern groups, raw dicts) stays on the
concrete class and is reserved for the application layer.
"""
def known_extensions(self) -> set[str]: ...
def format_for_extension(self, ext: str) -> SubtitleFormat | None: ...
def language_for_token(self, token: str) -> SubtitleLanguage | None: ...
def is_known_lang_token(self, token: str) -> bool: ...
def type_for_token(self, token: str) -> SubtitleType | None: ...
def is_known_type_token(self, token: str) -> bool: ...
def patterns(self) -> dict[str, SubtitlePattern]: ...
@@ -1,13 +1,9 @@
from .identifier import SubtitleIdentifier
from .matcher import SubtitleMatcher
from .pattern_detector import PatternDetector
from .placer import PlacedTrack, PlaceResult, SubtitlePlacer
__all__ = [
"SubtitleIdentifier",
"SubtitleMatcher",
"PatternDetector",
"SubtitlePlacer",
"PlacedTrack",
"PlaceResult",
]
+74 -111
View File
@@ -1,14 +1,13 @@
"""SubtitleIdentifier — finds and classifies all subtitle tracks for a video file."""
import json
import logging
import re
import subprocess
from pathlib import Path
from ...shared.ports import FilesystemScanner, MediaProber
from ..ports import SubtitleKnowledge
from ...shared.value_objects import ImdbId
from ..entities import MediaSubtitleMetadata, SubtitleCandidate
from ..knowledge.base import SubtitleKnowledgeBase
from ..value_objects import ScanStrategy, SubtitlePattern, SubtitleType
logger = logging.getLogger(__name__)
@@ -37,17 +36,14 @@ def _tokenize_suffix(stem: str, episode_stem: str) -> list[str]:
return _tokenize(stem)
def _count_entries(path: Path) -> int:
"""Return the entry count of an SRT file by finding the last cue number."""
try:
with open(path, encoding="utf-8", errors="replace") as f:
lines = f.read().splitlines()
for line in reversed(lines):
if line.strip().isdigit():
return int(line.strip())
return 0
except Exception:
return 0
def _count_entries(text: str | None) -> int | None:
"""Return the entry count of an SRT body by finding the last cue number."""
if text is None:
return None
for line in reversed(text.splitlines()):
if line.strip().isdigit():
return int(line.strip())
return 0
class SubtitleIdentifier:
@@ -60,8 +56,15 @@ class SubtitleIdentifier:
the caller (use case) decides whether to ask the user for clarification.
"""
def __init__(self, kb: SubtitleKnowledgeBase):
def __init__(
self,
kb: SubtitleKnowledge,
prober: MediaProber,
scanner: FilesystemScanner,
):
self.kb = kb
self.prober = prober
self.scanner = scanner
def identify(
self,
@@ -88,52 +91,21 @@ class SubtitleIdentifier:
return metadata
# ------------------------------------------------------------------
# Embedded tracks — ffprobe
# Embedded tracks — via MediaProber
# ------------------------------------------------------------------
def _scan_embedded(self, video_path: Path) -> list[SubtitleCandidate]:
if not video_path.exists():
return []
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"s",
str(video_path),
],
capture_output=True,
text=True,
timeout=30,
check=False,
)
data = json.loads(result.stdout)
except (
subprocess.TimeoutExpired,
json.JSONDecodeError,
FileNotFoundError,
) as e:
logger.debug(
f"SubtitleIdentifier: ffprobe failed for {video_path.name}: {e}"
)
return []
streams = self.prober.list_subtitle_streams(video_path)
tracks = []
for stream in data.get("streams", []):
tags = stream.get("tags", {})
disposition = stream.get("disposition", {})
lang_code = tags.get("language", "")
for stream in streams:
lang = (
self.kb.language_for_token(stream.language) if stream.language else None
)
lang = self.kb.language_for_token(lang_code) if lang_code else None
if disposition.get("hearing_impaired"):
if stream.is_hearing_impaired:
stype = SubtitleType.SDH
elif disposition.get("forced"):
elif stream.is_forced:
stype = SubtitleType.FORCED
else:
stype = SubtitleType.STANDARD
@@ -144,7 +116,7 @@ class SubtitleIdentifier:
format=None,
subtitle_type=stype,
is_embedded=True,
raw_tokens=[lang_code] if lang_code else [],
raw_tokens=[stream.language] if stream.language else [],
)
)
@@ -176,57 +148,47 @@ class SubtitleIdentifier:
return self._classify_files(candidates, pattern, episode_stem=episode_stem)
def _find_adjacent(self, video_path: Path) -> list[Path]:
def _find_adjacent(self, video_path: Path) -> list:
known = self.kb.known_extensions()
return [
p
for p in sorted(video_path.parent.iterdir())
if p.is_file()
and p.suffix.lower() in self.kb.known_extensions()
and p.stem != video_path.stem
entry
for entry in self.scanner.scan_dir(video_path.parent)
if entry.is_file
and entry.suffix.lower() in known
and entry.stem != video_path.stem
]
def _find_flat(self, video_path: Path, root_folder: str) -> list[Path]:
subs_dir = video_path.parent / root_folder
if not subs_dir.is_dir():
# Also look at release root (one level up)
subs_dir = video_path.parent.parent / root_folder
if not subs_dir.is_dir():
return []
return [
p
for p in sorted(subs_dir.iterdir())
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
]
def _find_flat(self, video_path: Path, root_folder: str) -> list:
known = self.kb.known_extensions()
# Adjacent first, then release root (one level up)
for subs_dir in (
video_path.parent / root_folder,
video_path.parent.parent / root_folder,
):
entries = self.scanner.scan_dir(subs_dir)
if entries:
return [
e for e in entries if e.is_file and e.suffix.lower() in known
]
return []
def _find_episode_subfolder(
self, video_path: Path, root_folder: str
) -> tuple[list[Path], str]:
"""
Look for Subs/{episode_stem}/*.srt
Checks two locations:
1. Adjacent to the video: video_path.parent / root_folder / video_path.stem
2. Release root (one level up): video_path.parent.parent / root_folder / video_path.stem
Returns (files, episode_stem) so the classifier can strip the prefix.
"""
) -> tuple[list, str]:
"""Look for Subs/{episode_stem}/*.srt — adjacent or one level up."""
episode_stem = video_path.stem
candidates_dirs = [
known = self.kb.known_extensions()
for subs_dir in (
video_path.parent / root_folder / episode_stem,
video_path.parent.parent / root_folder / episode_stem,
]
for subs_dir in candidates_dirs:
if subs_dir.is_dir():
files = [
p
for p in sorted(subs_dir.iterdir())
if p.is_file() and p.suffix.lower() in self.kb.known_extensions()
]
if files:
logger.debug(
f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}"
)
return files, episode_stem
):
entries = self.scanner.scan_dir(subs_dir)
files = [e for e in entries if e.is_file and e.suffix.lower() in known]
if files:
logger.debug(
f"SubtitleIdentifier: found {len(files)} file(s) in {subs_dir}"
)
return files, episode_stem
return [], episode_stem
# ------------------------------------------------------------------
@@ -235,14 +197,13 @@ class SubtitleIdentifier:
def _classify_files(
self,
paths: list[Path],
entries: list,
pattern: SubtitlePattern,
episode_stem: str | None = None,
) -> list[SubtitleCandidate]:
tracks = []
for path in paths:
track = self._classify_single(path, episode_stem=episode_stem)
tracks.append(track)
tracks = [
self._classify_single(entry, episode_stem=episode_stem) for entry in entries
]
# Post-process: if multiple tracks share same language but type is ambiguous,
# apply size_and_count disambiguation
@@ -252,13 +213,13 @@ class SubtitleIdentifier:
return tracks
def _classify_single(
self, path: Path, episode_stem: str | None = None
self, entry, episode_stem: str | None = None
) -> SubtitleCandidate:
fmt = self.kb.format_for_extension(path.suffix)
fmt = self.kb.format_for_extension(entry.suffix)
tokens = (
_tokenize_suffix(path.stem, episode_stem)
_tokenize_suffix(entry.stem, episode_stem)
if episode_stem
else _tokenize(path.stem)
else _tokenize(entry.stem)
)
language = None
@@ -284,19 +245,21 @@ class SubtitleIdentifier:
if unknown_tokens:
logger.debug(
f"SubtitleIdentifier: unknown tokens in '{path.name}': {unknown_tokens}"
f"SubtitleIdentifier: unknown tokens in '{entry.name}': {unknown_tokens}"
)
size_kb = path.stat().st_size / 1024 if path.exists() else None
entry_count = _count_entries(path) if path.exists() else None
# Entry count: only meaningful for SRT files; read text on demand.
entry_count: int | None = None
if entry.suffix.lower() == ".srt":
entry_count = _count_entries(self.scanner.read_text(entry.path))
return SubtitleCandidate(
language=language,
format=fmt,
subtitle_type=subtitle_type,
is_embedded=False,
file_path=path,
file_size_kb=size_kb,
file_path=entry.path,
file_size_kb=entry.size_kb,
entry_count=entry_count,
confidence=confidence,
raw_tokens=tokens,
@@ -1,11 +1,10 @@
"""PatternDetector — discovers the subtitle structure of a release folder."""
import json
import logging
import subprocess
from pathlib import Path
from ..knowledge.base import SubtitleKnowledgeBase
from ...shared.ports import FilesystemScanner, MediaProber
from ..ports import SubtitleKnowledge
from ..value_objects import ScanStrategy, SubtitlePattern
logger = logging.getLogger(__name__)
@@ -20,8 +19,15 @@ class PatternDetector:
a release follows. The result is proposed to the user for confirmation.
"""
def __init__(self, kb: SubtitleKnowledgeBase):
def __init__(
self,
kb: SubtitleKnowledge,
prober: MediaProber,
scanner: FilesystemScanner,
):
self.kb = kb
self.prober = prober
self.scanner = scanner
def detect(self, release_root: Path, sample_video: Path) -> dict:
"""
@@ -45,29 +51,7 @@ class PatternDetector:
}
def _has_embedded_subtitles(self, video_path: Path) -> bool:
"""Run ffprobe to check whether the video has embedded subtitle streams."""
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"s",
str(video_path),
],
capture_output=True,
text=True,
timeout=30,
check=False,
)
data = json.loads(result.stdout)
return len(data.get("streams", [])) > 0
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
return False
return len(self.prober.list_subtitle_streams(video_path)) > 0
def _inspect(self, release_root: Path, sample_video: Path) -> dict:
"""Gather structural facts about the release."""
@@ -84,61 +68,59 @@ class PatternDetector:
}
# Check for Subs/ folder — adjacent or at release root
for subs_candidate in [
for subs_candidate in (
sample_video.parent / "Subs",
release_root / "Subs",
]:
if subs_candidate.is_dir():
findings["has_subs_folder"] = True
findings["subs_root"] = str(subs_candidate)
):
children = self.scanner.scan_dir(subs_candidate)
if not children:
continue
# Is it flat or episode_subfolder?
children = list(subs_candidate.iterdir())
sub_files = [
c
for c in children
if c.is_file() and c.suffix.lower() in known_exts
findings["has_subs_folder"] = True
findings["subs_root"] = str(subs_candidate)
# Is it flat or episode_subfolder?
sub_files = [
c for c in children if c.is_file and c.suffix.lower() in known_exts
]
sub_dirs = [c for c in children if c.is_dir]
if sub_dirs and not sub_files:
findings["subs_strategy"] = "episode_subfolder"
# Count files in a sample subfolder
sample_files = [
f
for f in self.scanner.scan_dir(sub_dirs[0].path)
if f.is_file and f.suffix.lower() in known_exts
]
sub_dirs = [c for c in children if c.is_dir()]
if sub_dirs and not sub_files:
findings["subs_strategy"] = "episode_subfolder"
# Count files in a sample subfolder
sample_sub = sub_dirs[0]
sample_files = [
f
for f in sample_sub.iterdir()
if f.is_file() and f.suffix.lower() in known_exts
]
findings["files_per_episode"] = len(sample_files)
# Check naming conventions
for f in sample_files:
stem = f.stem
parts = stem.split("_")
if parts[0].isdigit():
findings["has_numeric_prefix"] = True
if any(
self.kb.is_known_lang_token(t.lower())
for t in stem.replace("_", ".").split(".")
):
findings["has_lang_tokens"] = True
else:
findings["subs_strategy"] = "flat"
findings["files_per_episode"] = len(sub_files)
for f in sub_files:
if any(
self.kb.is_known_lang_token(t.lower())
for t in f.stem.replace("_", ".").split(".")
):
findings["has_lang_tokens"] = True
break
findings["files_per_episode"] = len(sample_files)
# Check naming conventions
for f in sample_files:
parts = f.stem.split("_")
if parts[0].isdigit():
findings["has_numeric_prefix"] = True
if any(
self.kb.is_known_lang_token(t.lower())
for t in f.stem.replace("_", ".").split(".")
):
findings["has_lang_tokens"] = True
else:
findings["subs_strategy"] = "flat"
findings["files_per_episode"] = len(sub_files)
for f in sub_files:
if any(
self.kb.is_known_lang_token(t.lower())
for t in f.stem.replace("_", ".").split(".")
):
findings["has_lang_tokens"] = True
break
# Check adjacent subs (next to the video)
if not findings["has_subs_folder"]:
adjacent = [
p
for p in sample_video.parent.iterdir()
if p.is_file() and p.suffix.lower() in known_exts
e
for e in self.scanner.scan_dir(sample_video.parent)
if e.is_file and e.suffix.lower() in known_exts
]
if adjacent:
findings["adjacent_subs"] = True
@@ -221,6 +203,6 @@ class PatternDetector:
parts.append("no external subtitle files found")
if findings.get("has_embedded"):
parts.append("embedded tracks detected (ffprobe)")
parts.append("embedded tracks detected")
return "".join(parts) if parts else "nothing found"
-10
View File
@@ -91,16 +91,6 @@ class Episode(MediaWithTracks):
def __hash__(self) -> int:
return hash((self.season_number, self.episode_number))
# ── File presence ──────────────────────────────────────────────────────
def has_file(self) -> bool:
"""True if a file path is set and the file actually exists on disk."""
return self.file_path is not None and self.file_path.exists()
def is_downloaded(self) -> bool:
"""Alias of ``has_file()`` — reads better in collection-status contexts."""
return self.has_file()
# Track helpers (has_audio_in / audio_languages / has_subtitles_in /
# has_forced_subs / subtitle_languages) come from MediaWithTracks.
@@ -0,0 +1,66 @@
"""PathlibFilesystemScanner — FilesystemScanner adapter backed by pathlib."""
from __future__ import annotations
import logging
from pathlib import Path
from alfred.domain.shared.ports import FileEntry
logger = logging.getLogger(__name__)
class PathlibFilesystemScanner:
"""Read-only filesystem scanner using ``pathlib``.
Implements :class:`alfred.domain.shared.ports.FilesystemScanner`
structurally. Never raises — failures are logged and surfaced as
empty results.
"""
def scan_dir(self, path: Path) -> list[FileEntry]:
try:
if not path.is_dir():
return []
children = sorted(path.iterdir())
except OSError as e:
logger.debug(f"PathlibFilesystemScanner: scan_dir failed for {path}: {e}")
return []
entries: list[FileEntry] = []
for child in children:
entry = self._make_entry(child)
if entry is not None:
entries.append(entry)
return entries
def stat(self, path: Path) -> FileEntry | None:
return self._make_entry(path)
def read_text(self, path: Path, encoding: str = "utf-8") -> str | None:
try:
with open(path, encoding=encoding, errors="replace") as f:
return f.read()
except OSError as e:
logger.debug(f"PathlibFilesystemScanner: read_text failed for {path}: {e}")
return None
# ------------------------------------------------------------------
def _make_entry(self, path: Path) -> FileEntry | None:
try:
is_file = path.is_file()
is_dir = path.is_dir()
except OSError:
return None
if not (is_file or is_dir):
return None
size_kb: float | None = None
if is_file:
try:
size_kb = path.stat().st_size / 1024
except OSError:
size_kb = None
return FileEntry(path=path, is_file=is_file, is_dir=is_dir, size_kb=size_kb)
@@ -0,0 +1,6 @@
"""Knowledge loaders — YAML I/O kept out of the domain layer.
Each submodule reads its YAML files from ``alfred/knowledge/`` (builtin,
versioned) and ``data/knowledge/`` (learned, gitignored), and exposes plain
Python values (sets, dicts, classes) for domain code to consume.
"""
@@ -13,7 +13,7 @@ import yaml
import alfred as _alfred_pkg
from ..value_objects import Language
from alfred.domain.shared.value_objects import Language
logger = logging.getLogger(__name__)
@@ -2,8 +2,8 @@
import logging
from ...shared.knowledge.language_registry import LanguageRegistry
from ..value_objects import (
from alfred.infrastructure.knowledge.language_registry import LanguageRegistry
from alfred.domain.subtitles.value_objects import (
ScanStrategy,
SubtitleFormat,
SubtitleLanguage,
+5
View File
@@ -0,0 +1,5 @@
"""Media probing adapters — concrete implementations of MediaProber."""
from .ffprobe_prober import FfprobeMediaProber
__all__ = ["FfprobeMediaProber"]
@@ -0,0 +1,65 @@
"""FfprobeMediaProber — MediaProber adapter backed by the ffprobe CLI."""
from __future__ import annotations
import json
import logging
import subprocess
from pathlib import Path
from alfred.domain.shared.ports import SubtitleStreamInfo
logger = logging.getLogger(__name__)
_FFPROBE_TIMEOUT_SECONDS = 30
class FfprobeMediaProber:
"""Inspect media files by shelling out to ``ffprobe``.
Implements :class:`alfred.domain.shared.ports.MediaProber` structurally.
Never raises — failures are logged and surfaced as empty results.
"""
def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]:
if not video.exists():
return []
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"s",
str(video),
],
capture_output=True,
text=True,
timeout=_FFPROBE_TIMEOUT_SECONDS,
check=False,
)
data = json.loads(result.stdout)
except (
subprocess.TimeoutExpired,
json.JSONDecodeError,
FileNotFoundError,
) as e:
logger.debug(f"FfprobeMediaProber: ffprobe failed for {video.name}: {e}")
return []
streams: list[SubtitleStreamInfo] = []
for stream in data.get("streams", []):
tags = stream.get("tags", {}) or {}
disposition = stream.get("disposition", {}) or {}
streams.append(
SubtitleStreamInfo(
language=tags.get("language") or None,
is_hearing_impaired=bool(disposition.get("hearing_impaired")),
is_forced=bool(disposition.get("forced")),
)
)
return streams
@@ -14,7 +14,7 @@ from pathlib import Path
from typing import Any
from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.services.placer import PlacedTrack
from alfred.application.subtitles.placer import PlacedTrack
from alfred.infrastructure.metadata.store import MetadataStore
logger = logging.getLogger(__name__)
@@ -54,7 +54,7 @@ class RuleSetRepository:
Build and return the resolved RuleSet chain.
If subtitle_preferences is provided, it seeds the global base rule set
from LTM (overriding the hardcoded DEFAULT_RULES).
from LTM (overriding the knowledge-base defaults at resolve time).
Returns global default if no overrides exist.
"""
base = SubtitleRuleSet.global_default()
+1 -1
View File
@@ -41,7 +41,7 @@ from alfred.application.filesystem.manage_subtitles import (
_to_unresolved_dto,
)
from alfred.domain.subtitles.entities import MediaSubtitleMetadata, SubtitleCandidate
from alfred.domain.subtitles.services.placer import PlacedTrack, PlaceResult
from alfred.application.subtitles.placer import PlacedTrack, PlaceResult
from alfred.domain.subtitles.value_objects import (
ScanStrategy,
SubtitleFormat,
@@ -1,4 +1,4 @@
"""Tests for ``alfred.domain.subtitles.services.placer.SubtitlePlacer``.
"""Tests for ``alfred.application.subtitles.placer.SubtitlePlacer``.
The placer hard-links subtitle files next to a destination video, naming
them ``{video_stem}.{lang}[.sdh|.forced].{ext}``.
@@ -22,7 +22,7 @@ from unittest.mock import patch
import pytest
from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.services.placer import (
from alfred.application.subtitles.placer import (
PlacedTrack,
PlaceResult,
SubtitlePlacer,
@@ -198,7 +198,7 @@ class TestOSError:
video.write_bytes(b"")
track = _track(src)
with patch(
"alfred.domain.subtitles.services.placer.os.link",
"alfred.application.subtitles.placer.os.link",
side_effect=OSError("cross-device link"),
):
result = placer.place([track], video)
-17
View File
@@ -72,23 +72,6 @@ class TestFilePath:
with pytest.raises(ValidationError):
FilePath(123) # type: ignore
def test_exists_true(self, tmp_path):
p = FilePath(tmp_path)
assert p.exists()
def test_exists_false(self, tmp_path):
p = FilePath(tmp_path / "nonexistent")
assert not p.exists()
def test_is_file(self, tmp_path):
f = tmp_path / "file.txt"
f.write_text("x")
assert FilePath(f).is_file()
assert not FilePath(tmp_path).is_file()
def test_is_dir(self, tmp_path):
assert FilePath(tmp_path).is_dir()
def test_str(self, tmp_path):
p = FilePath(tmp_path)
assert str(p) == str(tmp_path)
+31 -20
View File
@@ -22,8 +22,8 @@ from unittest.mock import patch
import pytest
from alfred.domain.shared.ports import FileEntry
from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.services.identifier import (
SubtitleIdentifier,
_count_entries,
@@ -37,6 +37,19 @@ from alfred.domain.subtitles.value_objects import (
SubtitleType,
TypeDetectionMethod,
)
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
def _file_entry(path) -> FileEntry:
"""Helper: build a FileEntry from a real tmp_path Path."""
return FileEntry(
path=path,
is_file=path.is_file(),
is_dir=path.is_dir(),
size_kb=(path.stat().st_size / 1024) if path.is_file() else None,
)
@pytest.fixture(scope="module")
@@ -46,7 +59,7 @@ def kb():
@pytest.fixture
def identifier(kb):
return SubtitleIdentifier(kb)
return SubtitleIdentifier(kb, FfprobeMediaProber(), PathlibFilesystemScanner())
def _pattern(
@@ -103,23 +116,19 @@ class TestTokenize:
class TestCountEntries:
def test_last_cue_number(self, tmp_path):
srt = tmp_path / "x.srt"
srt.write_text(
def test_last_cue_number(self):
text = (
"1\n00:00:01,000 --> 00:00:02,000\nHello\n\n"
"2\n00:00:03,000 --> 00:00:04,000\nWorld\n\n"
"42\n00:00:05,000 --> 00:00:06,000\nLast\n",
encoding="utf-8",
"42\n00:00:05,000 --> 00:00:06,000\nLast\n"
)
assert _count_entries(srt) == 42
assert _count_entries(text) == 42
def test_missing_file_returns_zero(self, tmp_path):
assert _count_entries(tmp_path / "nope.srt") == 0
def test_missing_file_returns_none(self):
assert _count_entries(None) is None
def test_empty_file_returns_zero(self, tmp_path):
f = tmp_path / "x.srt"
f.write_text("")
assert _count_entries(f) == 0
def test_empty_file_returns_zero(self):
assert _count_entries("") == 0
# --------------------------------------------------------------------------- #
@@ -135,7 +144,7 @@ class TestEmbedded:
video = tmp_path / "v.mkv"
video.write_bytes(b"")
with patch(
"alfred.domain.subtitles.services.identifier.subprocess.run",
"alfred.infrastructure.probe.ffprobe_prober.subprocess.run",
side_effect=FileNotFoundError("no ffprobe"),
):
assert identifier._scan_embedded(video) == []
@@ -156,7 +165,7 @@ class TestEmbedded:
stdout = fake_output
with patch(
"alfred.domain.subtitles.services.identifier.subprocess.run",
"alfred.infrastructure.probe.ffprobe_prober.subprocess.run",
return_value=FakeResult(),
):
tracks = identifier._scan_embedded(video)
@@ -256,7 +265,7 @@ class TestClassify:
def test_classifies_language_and_format(self, identifier, tmp_path):
f = tmp_path / "Show.S01E01.English.srt"
f.write_text("1\n00:00:01,000 --> 00:00:02,000\nHi\n")
track = identifier._classify_single(f)
track = identifier._classify_single(_file_entry(f))
assert track.language.code == "eng"
assert track.format.id == "srt"
assert track.confidence > 0
@@ -265,13 +274,13 @@ class TestClassify:
def test_classifies_type_token(self, identifier, tmp_path):
f = tmp_path / "Show.S01E01.English.sdh.srt"
f.write_text("")
track = identifier._classify_single(f)
track = identifier._classify_single(_file_entry(f))
assert track.subtitle_type == SubtitleType.SDH
def test_unknown_tokens_lower_confidence(self, identifier, tmp_path):
f = tmp_path / "Show.S01E01.gibberish.srt"
f.write_text("")
track = identifier._classify_single(f)
track = identifier._classify_single(_file_entry(f))
# No lang/type recognized → confidence is 0 or very low.
assert track.language is None
assert track.confidence < 0.5
@@ -279,7 +288,9 @@ class TestClassify:
def test_episode_stem_prefix_stripped(self, identifier, tmp_path):
f = tmp_path / "Show.S01E01.English.srt"
f.write_text("")
track = identifier._classify_single(f, episode_stem="Show.S01E01")
track = identifier._classify_single(
_file_entry(f), episode_stem="Show.S01E01"
)
# Only "english" remains as meaningful token → confidence == 1.0
assert track.language.code == "eng"
assert track.confidence == 1.0
+4 -4
View File
@@ -1,4 +1,4 @@
"""Tests for ``alfred.domain.subtitles.knowledge`` (loader + base).
"""Tests for ``alfred.infrastructure.knowledge.subtitles`` (loader + base).
Covers:
@@ -19,9 +19,9 @@ from pathlib import Path
import pytest
from alfred.domain.subtitles.knowledge import loader as loader_mod
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.knowledge.loader import KnowledgeLoader, _merge
from alfred.infrastructure.knowledge.subtitles import loader as loader_mod
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from alfred.infrastructure.knowledge.subtitles.loader import KnowledgeLoader, _merge
from alfred.domain.subtitles.value_objects import (
ScanStrategy,
SubtitleType,
@@ -25,8 +25,10 @@ from unittest.mock import patch
import pytest
from alfred.domain.subtitles.knowledge.base import SubtitleKnowledgeBase
from alfred.domain.subtitles.services.pattern_detector import PatternDetector
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge.subtitles.base import SubtitleKnowledgeBase
from alfred.infrastructure.probe.ffprobe_prober import FfprobeMediaProber
@pytest.fixture(scope="module")
@@ -36,7 +38,7 @@ def kb():
@pytest.fixture
def detector(kb):
return PatternDetector(kb)
return PatternDetector(kb, FfprobeMediaProber(), PathlibFilesystemScanner())
def _make_video(folder: Path, name: str = "Show.S01E01.mkv") -> Path:
+19 -10
View File
@@ -30,9 +30,19 @@ from alfred.domain.subtitles.value_objects import (
RuleScope,
SubtitleFormat,
SubtitleLanguage,
SubtitleMatchingRules,
SubtitleType,
)
# Test fixture: stand-in for what the KB would provide at runtime.
_DEFAULT_RULES = SubtitleMatchingRules(
preferred_languages=["eng"],
preferred_formats=["srt"],
allowed_types=["standard"],
format_priority=["srt", "ass"],
min_confidence=0.7,
)
# --------------------------------------------------------------------------- #
# Value objects #
# --------------------------------------------------------------------------- #
@@ -230,18 +240,17 @@ class TestAvailableSubtitles:
class TestSubtitleRuleSet:
def test_global_default_uses_kb_defaults(self):
def test_global_default_returns_injected_defaults(self):
rs = SubtitleRuleSet.global_default()
rules = rs.resolve()
# Loaded from subtitles.yaml — defaults must be non-empty.
assert rules.preferred_languages
assert rules.preferred_formats
assert 0 < rules.min_confidence <= 1
rules = rs.resolve(_DEFAULT_RULES)
assert rules.preferred_languages == _DEFAULT_RULES.preferred_languages
assert rules.preferred_formats == _DEFAULT_RULES.preferred_formats
assert rules.min_confidence == _DEFAULT_RULES.min_confidence
def test_override_persists(self):
rs = SubtitleRuleSet.global_default()
rs.override(languages=["eng"], min_confidence=0.9)
rules = rs.resolve()
rules = rs.resolve(_DEFAULT_RULES)
assert rules.preferred_languages == ["eng"]
assert rules.min_confidence == 0.9
@@ -252,10 +261,10 @@ class TestSubtitleRuleSet:
parent=parent,
)
child.override(languages=["jpn"])
rules = child.resolve()
rules = child.resolve(_DEFAULT_RULES)
assert rules.preferred_languages == ["jpn"]
# min_confidence not overridden at child or parent → falls back to defaults
assert rules.min_confidence == parent.resolve().min_confidence
assert rules.min_confidence == parent.resolve(_DEFAULT_RULES).min_confidence
def test_to_dict_only_emits_set_deltas(self):
rs = SubtitleRuleSet(scope=RuleScope(level="show", identifier="tt1"))
@@ -286,4 +295,4 @@ class TestSubtitleRuleSet:
# code uses `is not None` explicitly. Verify 0.0 doesn't fall back.
rs = SubtitleRuleSet.global_default()
rs.override(min_confidence=0.0)
assert rs.resolve().min_confidence == 0.0
assert rs.resolve(_DEFAULT_RULES).min_confidence == 0.0
+2 -3
View File
@@ -157,10 +157,9 @@ class TestEpisode:
assert filename.startswith("S01E05")
assert "Gray.Matter" in filename
def test_has_file_false_when_no_path(self):
def test_file_path_unset_by_default(self):
e = self._ep()
assert not e.has_file()
assert not e.is_downloaded()
assert e.file_path is None
def test_str_format(self):
e = self._ep(season=2, episode=3, title="Bit by a Dead Bee")
+20 -8
View File
@@ -17,6 +17,7 @@ from pathlib import Path
import yaml
from alfred.domain.subtitles.value_objects import SubtitleMatchingRules
from alfred.infrastructure.persistence.memory.ltm.components.subtitle_preferences import (
SubtitlePreferences,
)
@@ -25,6 +26,15 @@ from alfred.infrastructure.subtitle.rule_repository import (
_filter_override,
)
# Stand-in for KB defaults, injected at resolve().
_DEFAULT_RULES = SubtitleMatchingRules(
preferred_languages=["eng"],
preferred_formats=["srt"],
allowed_types=["standard"],
format_priority=["srt", "ass"],
min_confidence=0.7,
)
def _write(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
@@ -71,17 +81,17 @@ class TestLoad:
def test_no_files_returns_global_default(self, tmp_path):
repo = RuleSetRepository(tmp_path)
rs = repo.load()
# Should resolve cleanly using the hardcoded defaults.
rules = rs.resolve()
assert rules.preferred_languages # non-empty
assert rules.min_confidence > 0
# With no overrides, resolve returns the injected defaults unchanged.
rules = rs.resolve(_DEFAULT_RULES)
assert rules.preferred_languages == _DEFAULT_RULES.preferred_languages
assert rules.min_confidence == _DEFAULT_RULES.min_confidence
def test_subtitle_preferences_override_base(self, tmp_path):
prefs = SubtitlePreferences(
languages=["jpn"], formats=["ass"], types=["standard"]
)
repo = RuleSetRepository(tmp_path)
rules = repo.load(subtitle_preferences=prefs).resolve()
rules = repo.load(subtitle_preferences=prefs).resolve(_DEFAULT_RULES)
assert rules.preferred_languages == ["jpn"]
assert rules.preferred_formats == ["ass"]
assert rules.allowed_types == ["standard"]
@@ -92,7 +102,7 @@ class TestLoad:
{"override": {"languages": ["spa"], "min_confidence": 0.95}},
)
repo = RuleSetRepository(tmp_path)
rules = repo.load().resolve()
rules = repo.load().resolve(_DEFAULT_RULES)
assert rules.preferred_languages == ["spa"]
assert rules.min_confidence == 0.95
@@ -102,7 +112,7 @@ class TestLoad:
{"override": {"format_priority": ["ass", "srt"]}},
)
repo = RuleSetRepository(tmp_path)
rules = repo.load(release_group="KONTRAST").resolve()
rules = repo.load(release_group="KONTRAST").resolve(_DEFAULT_RULES)
assert rules.format_priority == ["ass", "srt"]
def test_full_three_level_chain(self, tmp_path):
@@ -119,7 +129,9 @@ class TestLoad:
{"override": {"min_confidence": 0.99}},
)
repo = RuleSetRepository(tmp_path)
rules = repo.load(release_group="GRP", subtitle_preferences=prefs).resolve()
rules = repo.load(release_group="GRP", subtitle_preferences=prefs).resolve(
_DEFAULT_RULES
)
# All three levels visible — local overrides on top
assert rules.preferred_languages == ["jpn"]
assert rules.format_priority == ["ass"]
@@ -17,7 +17,7 @@ from __future__ import annotations
from pathlib import Path
from alfred.domain.subtitles.entities import SubtitleCandidate
from alfred.domain.subtitles.services.placer import PlacedTrack
from alfred.application.subtitles.placer import PlacedTrack
from alfred.domain.subtitles.value_objects import (
SubtitleFormat,
SubtitleLanguage,