feat(library): add rescan_show orchestrator + walker (Step 4)

Step 4 of specs/dot_alfred.md — rebuild a TVShow aggregate from disk
by reusing the existing release pipeline (inspect_release) on every
video file in a show folder, then persist via the .alfred repository.

- alfred/application/library/walker.py — pure structural walk
  (season folders detected via \bS\d{1,2}\b regex, video files
  filtered against kb.video_extensions, no recursion).
- alfred/application/library/rescan.py — orchestrator that ingests
  each season folder, infers PACK vs EPISODIC from on-disk file
  count + parser output, and assembles via TVShowBuilder. Episode
  paths stored relative to show_root. Logs + skips corrupt input
  (no season parsed, mixed season numbers, unparseable episodes).
- Season now inherits MediaWithTracks: PACK seasons carry
  season-level audio_tracks / subtitle_tracks; EPISODIC seasons
  leave them empty (tracks live per-episode). SeasonBuilder gains
  set_audio_tracks / set_subtitle_tracks; bridge writes/reads them
  in the PACK branch via shared _synth_* helpers.

Out of scope, tracked as tech debt: adjacent .srt capture, multi-
episode (episode_end), TMDB-driven PACK detection (the current
heuristic '1 file == PACK' is a placeholder until ShowTracker lands).

18 new tests (11 walker + 7 rescan integration) on tmp_path with
the Foundation layout. Full suite: 1149 passed.
This commit is contained in:
2026-05-24 15:22:18 +02:00
parent 3622c95154
commit de7030fa9c
10 changed files with 823 additions and 18 deletions
+44
View File
@@ -17,6 +17,50 @@ callers).
### Added
- **`rescan_show` orchestrator
(`alfred/application/library/rescan.py`).** Step 4 of the
`specs/dot_alfred.md` plan. Walks an Alfred-managed show folder,
runs the existing `inspect_release` pipeline on every video file it
finds, and assembles a frozen `TVShow` aggregate persisted via the
injected `TVShowRepository`. Reuses the release parser + ffprobe
path verbatim — no duplicated parse/probe logic at the library
layer. PACK vs EPISODIC inferred per season folder from the
on-disk file count + parser output: a single video whose name
carries no `Exx` token becomes a PACK season (tracks lifted to the
season-level `audio_tracks` / `subtitle_tracks`), anything else
becomes EPISODIC (one `Episode` per file). Episode paths are
stored relative to the show root for portability. Files that fail
to parse a season/episode number, or seasons with mixed numbers,
are logged and skipped — the orchestrator never raises. Embedded
subtitle tracks are captured from `ffprobe`; adjacent `.srt`
files, multi-episode entries (`S01E01E02`), and TMDB-driven PACK
detection are tracked as tech debt for a dedicated subtitles /
ShowTracker session. 7 integration tests on `tmp_path` with the
Foundation layout (S01 EPISODIC + S02 PACK) cover the round-trip
through the real `.alfred` repository.
- **Show tree walker (`alfred/application/library/walker.py`).**
Step 4a foundation. `walk_show(show_root, scanner, kb)` returns a
`ShowTree(show_root, season_folders=tuple[SeasonFolder, ...])`
pure structural snapshot, no parsing, no probing. Season folders
are detected by a `\bS\d{1,2}\b` token anywhere in the directory
name (release-style naming, no Plex `Season 01` / `Specials`
conventions). Video files are filtered against
`kb.video_extensions`; no recursion into sub-sub-folders. 11 unit
tests on `tmp_path` cover detection (case-insensitive, in-word
rejection), filtering (subs, NFO, sample files), and edge cases
(empty / missing show root).
- **Season-level audio/subtitle tracks
(`alfred/domain/tv_shows/entities.py`,
`alfred/domain/tv_shows/builders.py`).** `Season` now inherits
from `MediaWithTracks` and carries `audio_tracks` /
`subtitle_tracks` tuples (empty by default). Populated only in
PACK mode (the single release covering the whole season); empty in
EPISODIC mode where tracks live per-episode. `SeasonBuilder`
gains `set_audio_tracks()` / `set_subtitle_tracks()` and forwards
them through `from_existing()`. The bridge writes / reads them in
the PACK branch via shared `_synth_audio_tracks` /
`_synth_subtitle_tracks` helpers used for episodes too.
- **`DotAlfredTVShowRepository` — filesystem-backed implementation of
the `TVShowRepository` port
(`alfred/infrastructure/persistence/dot_alfred/repository.py`).**
+13
View File
@@ -0,0 +1,13 @@
"""Library orchestrators — operate on the Alfred-managed library tree.
The library is a directory of show folders (one per TV show) where each
show holds season folders containing video files. Modules here walk
this tree and reconstruct domain aggregates by reusing the existing
release pipeline (``inspect_release``) rather than duplicating its
parse/probe logic.
"""
from .rescan import rescan_show
from .walker import SeasonFolder, ShowTree, walk_show
__all__ = ["SeasonFolder", "ShowTree", "rescan_show", "walk_show"]
+192
View File
@@ -0,0 +1,192 @@
"""``rescan_show`` — rebuild a TVShow aggregate from disk and persist it.
The orchestrator walks the show folder, runs the existing release
pipeline (``inspect_release``) on every video file it finds, and
assembles the result into a frozen :class:`TVShow` via
:class:`TVShowBuilder`. The aggregate is then handed to the
repository for atomic persistence as a ``.alfred`` sidecar.
Why reuse ``inspect_release``?
-------------------------------
The "fresh download" flow already parses release names, picks a main
video, runs ffprobe and refines media type. We want exactly the same
intelligence applied to library content — running it again here
keeps a single source of truth for parsing/probing rules. The
orchestrator just has to translate per-file :class:`InspectedResult`
into builder calls.
PACK vs EPISODIC
----------------
Detection lives in this layer (until the TMDB-driven
``ShowTracker`` arrives). The current rule:
* A season folder containing exactly one video whose parser yields
``season is not None`` and ``episode is None`` → **PACK**: the
season is recorded with empty ``episodes`` and tracks summarized
from the probe at the season level.
* Otherwise → **EPISODIC**: every file with a valid
``(season, episode)`` becomes an :class:`Episode`.
Files that fall outside both rules (no season parsed, mix of PACK +
episode files in the same folder, etc.) are logged and skipped — the
walker doesn't raise on corrupt input, and neither does the
orchestrator.
Out of scope (tracked as tech debt):
* Adjacent ``.srt`` files — only embedded subtitle tracks are captured.
* Multi-episode files (``S01E01E02``) — only the first episode is
recorded; ``episode_end`` is ignored.
* TMDB-driven PACK detection — for now PACK is inferred from the
on-disk file count + parser output.
"""
from __future__ import annotations
import logging
from pathlib import Path
from alfred.application.library.walker import SeasonFolder, walk_show
from alfred.application.release.inspect import inspect_release
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.shared.media import MediaInfo
from alfred.domain.shared.ports import FilesystemScanner, MediaProber
from alfred.domain.shared.value_objects import FilePath, ImdbId
from alfred.domain.tv_shows.builders import SeasonBuilder, TVShowBuilder
from alfred.domain.tv_shows.entities import Episode, TVShow
from alfred.domain.tv_shows.repositories import TVShowRepository
from alfred.domain.tv_shows.value_objects import EpisodeNumber, SeasonNumber
_LOG = logging.getLogger(__name__)
def rescan_show(
show_root: Path,
*,
imdb_id: ImdbId | str,
tmdb_id: int | None = None,
repository: TVShowRepository,
scanner: FilesystemScanner,
prober: MediaProber,
kb: ReleaseKnowledge,
) -> TVShow:
"""Rebuild and persist the :class:`TVShow` aggregate for ``show_root``.
The show's folder name is used as ``title`` (matching the
convention of :class:`DotAlfredTVShowRepository`). ``imdb_id`` and
``tmdb_id`` are supplied by the caller — the orchestrator does
not call TMDB.
Returns the rebuilt frozen aggregate (also written to disk by
``repository.save``).
"""
tree = walk_show(show_root, scanner=scanner, kb=kb)
builder = TVShowBuilder(
imdb_id=imdb_id, title=show_root.name, tmdb_id=tmdb_id
)
for season_folder in tree.season_folders:
_ingest_season(season_folder, show_root, builder, kb, prober)
show = builder.build()
repository.save(show)
return show
# --------------------------------------------------------------------------- #
# Per-season ingestion #
# --------------------------------------------------------------------------- #
def _ingest_season(
season_folder: SeasonFolder,
show_root: Path,
builder: TVShowBuilder,
kb: ReleaseKnowledge,
prober: MediaProber,
) -> None:
if not season_folder.video_files:
_LOG.warning(
"rescan_show: season folder %s contains no video file — skipping",
season_folder.season_dir,
)
return
# Inspect every video first; we need the whole batch to decide
# PACK vs EPISODIC before touching the builder.
inspected = []
for video_path in season_folder.video_files:
result = inspect_release(video_path.name, video_path, kb, prober)
inspected.append((video_path, result))
seasons = {r.parsed.season for _, r in inspected if r.parsed.season is not None}
if not seasons:
_LOG.warning(
"rescan_show: no season number parsed in %s — skipping",
season_folder.season_dir,
)
return
if len(seasons) > 1:
_LOG.warning(
"rescan_show: mixed season numbers %s in %s — skipping",
sorted(seasons),
season_folder.season_dir,
)
return
season_number = SeasonNumber(seasons.pop())
# Single video, no episode → PACK.
if len(inspected) == 1 and inspected[0][1].parsed.episode is None:
video_path, result = inspected[0]
_ingest_pack(season_number, result.media_info, builder)
return
# Otherwise treat every file as an EPISODIC entry. Files without
# a parseable episode number are logged and dropped.
for video_path, result in inspected:
if result.parsed.episode is None:
_LOG.warning(
"rescan_show: no episode number parsed for %s — skipping",
video_path,
)
continue
episode = _make_episode(
season_number=season_number,
episode_number=EpisodeNumber(result.parsed.episode),
video_path=video_path,
show_root=show_root,
media_info=result.media_info,
)
builder.add_episode(episode)
def _ingest_pack(
season_number: SeasonNumber,
media_info: MediaInfo | None,
builder: TVShowBuilder,
) -> None:
sb: SeasonBuilder = builder.season_builder(season_number)
if media_info is not None:
sb.set_audio_tracks(media_info.audio_tracks)
sb.set_subtitle_tracks(media_info.subtitle_tracks)
def _make_episode(
*,
season_number: SeasonNumber,
episode_number: EpisodeNumber,
video_path: Path,
show_root: Path,
media_info: MediaInfo | None,
) -> Episode:
rel_path = video_path.relative_to(show_root)
audio_tracks = media_info.audio_tracks if media_info else ()
subtitle_tracks = media_info.subtitle_tracks if media_info else ()
return Episode(
season_number=season_number,
episode_number=episode_number,
title="",
file_path=FilePath(str(rel_path)),
audio_tracks=audio_tracks,
subtitle_tracks=subtitle_tracks,
)
+98
View File
@@ -0,0 +1,98 @@
"""Show tree walker — minimal filesystem traversal of a TV show folder.
The walker is intentionally dumb: it lists season folders and the
video files inside them, nothing more. It does not parse release
names, run ffprobe, or classify subtitle files. All of that
intelligence lives in the existing release pipeline
(``inspect_release`` + downstream services); the walker just hands
the orchestrator the paths to feed into that pipeline.
Folder convention
-----------------
Inside an Alfred-managed library, a show root looks like::
Foundation/
Foundation.S01.1080p.WEB-DL.x265-GROUP/ ← season folder
Foundation.S01E01.1080p.WEB-DL.x265.mkv ← episode (EPISODIC)
Foundation.S01E02.1080p.WEB-DL.x265.mkv
Foundation.S02.Complete.1080p.WEB-DL-GROUP/ ← season folder
Foundation.S02.Complete.1080p.WEB-DL.mkv ← PACK (single video)
The walker recognizes a season folder by a ``Sxx`` token anywhere in
its name (case-insensitive). It does **not** care about Plex-style
names (``Season 01``, ``Specials``) — the Alfred library uses
release-style folder names only.
PACK vs EPISODIC detection happens **above** the walker, by the
orchestrator looking at how many video files came back and whether
they carry ``Exx`` tokens. The walker itself reports every video
file it sees, untouched.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from pathlib import Path
from alfred.domain.release.ports import ReleaseKnowledge
from alfred.domain.shared.ports import FilesystemScanner
# Matches any ``Sxx`` token (1-2 digits) bounded by non-alphanumerics.
# Examples that match: ``Foundation.S01.1080p`` , ``S2.Pack`` , ``BBC.s10.bluray``.
# Examples that don't: ``Sample`` , ``Soundtrack`` , ``2024.S0E1`` (no S+digits boundary).
_SEASON_TOKEN_RE = re.compile(r"(?<![A-Za-z0-9])s(\d{1,2})(?![A-Za-z0-9])", re.IGNORECASE)
@dataclass(frozen=True)
class SeasonFolder:
"""One season folder discovered inside a show root."""
season_dir: Path
video_files: tuple[Path, ...]
@dataclass(frozen=True)
class ShowTree:
"""The full structural snapshot of a show on disk."""
show_root: Path
season_folders: tuple[SeasonFolder, ...]
def walk_show(
show_root: Path,
*,
scanner: FilesystemScanner,
kb: ReleaseKnowledge,
) -> ShowTree:
"""Walk ``show_root`` and return its structural tree.
The walker:
* lists direct children of ``show_root``,
* keeps the directories whose name contains a ``Sxx`` token,
* lists each season directory's video files (one level deep —
no recursion into sub-sub-folders),
* sorts season folders by name and video files by name within
each folder.
Empty / unreadable directories surface as a ``SeasonFolder`` with
an empty ``video_files`` tuple. The orchestrator decides how to
react (skip, warn, fail) — the walker never raises.
"""
video_exts = {ext.lower() for ext in kb.video_extensions}
season_folders: list[SeasonFolder] = []
for entry in scanner.scan_dir(show_root):
if not entry.is_dir or not _SEASON_TOKEN_RE.search(entry.name):
continue
videos = tuple(
child.path
for child in scanner.scan_dir(entry.path)
if child.is_file and child.suffix.lower() in video_exts
)
season_folders.append(SeasonFolder(season_dir=entry.path, video_files=videos))
return ShowTree(
show_root=show_root, season_folders=tuple(season_folders)
)
+33 -2
View File
@@ -47,11 +47,11 @@ Invariants enforced at ``build()`` time:
from __future__ import annotations
from ..shared.media import AudioTrack, SubtitleTrack
from ..shared.value_objects import ImdbId
from .entities import Episode, Season, TVShow
from .value_objects import EpisodeNumber, SeasonNumber
# ════════════════════════════════════════════════════════════════════════════
# SeasonBuilder
# ════════════════════════════════════════════════════════════════════════════
@@ -71,6 +71,8 @@ class SeasonBuilder:
season_number = SeasonNumber(season_number)
self._season_number: SeasonNumber = season_number
self._episodes: dict[EpisodeNumber, Episode] = {}
self._audio_tracks: tuple[AudioTrack, ...] = ()
self._subtitle_tracks: tuple[SubtitleTrack, ...] = ()
@classmethod
def from_existing(cls, season: Season) -> SeasonBuilder:
@@ -78,12 +80,36 @@ class SeasonBuilder:
builder = cls(season.season_number)
for ep in season.episodes:
builder._episodes[ep.episode_number] = ep
builder._audio_tracks = season.audio_tracks
builder._subtitle_tracks = season.subtitle_tracks
return builder
@property
def season_number(self) -> SeasonNumber:
return self._season_number
def set_audio_tracks(
self, tracks: tuple[AudioTrack, ...]
) -> SeasonBuilder:
"""
Replace the season-level audio tracks (PACK mode only).
In EPISODIC mode these stay empty — tracks live per-episode.
"""
self._audio_tracks = tuple(tracks)
return self
def set_subtitle_tracks(
self, tracks: tuple[SubtitleTrack, ...]
) -> SeasonBuilder:
"""
Replace the season-level subtitle tracks (PACK mode only).
In EPISODIC mode these stay empty — tracks live per-episode.
"""
self._subtitle_tracks = tuple(tracks)
return self
def add_episode(self, episode: Episode) -> SeasonBuilder:
"""
Add or replace an episode in this season.
@@ -105,7 +131,12 @@ class SeasonBuilder:
ordered = tuple(
self._episodes[n] for n in sorted(self._episodes, key=lambda x: x.value)
)
return Season(season_number=self._season_number, episodes=ordered)
return Season(
season_number=self._season_number,
episodes=ordered,
audio_tracks=self._audio_tracks,
subtitle_tracks=self._subtitle_tracks,
)
# ════════════════════════════════════════════════════════════════════════════
+8 -1
View File
@@ -134,7 +134,7 @@ class Episode(MediaWithTracks):
@dataclass(frozen=True)
class Season:
class Season(MediaWithTracks):
"""
A season of a TV show — owned by ``TVShow``, frozen value.
@@ -146,10 +146,17 @@ class Season:
:class:`SeasonMode` (see ``mode`` property): a season with no episodes
is a PACK (single release covering the whole season), a season with
episodes is EPISODIC (currently airing, one release per episode).
In PACK mode, ``audio_tracks`` and ``subtitle_tracks`` describe the
single release as a whole (probed from the single video file). In
EPISODIC mode, those season-level tuples are typically empty — the
per-episode tuples on each :class:`Episode` hold the truth.
"""
season_number: SeasonNumber
episodes: tuple[Episode, ...] = ()
audio_tracks: tuple[AudioTrack, ...] = field(default_factory=tuple)
subtitle_tracks: tuple[SubtitleTrack, ...] = field(default_factory=tuple)
def __post_init__(self) -> None:
if not isinstance(self.season_number, SeasonNumber):
@@ -30,7 +30,7 @@ from ....domain.shared.media import AudioTrack, SubtitleTrack
from ....domain.shared.value_objects import FilePath
from ....domain.tv_shows.builders import SeasonBuilder, TVShowBuilder
from ....domain.tv_shows.entities import Episode, Season, TVShow
from ....domain.tv_shows.value_objects import EpisodeNumber, SeasonNumber
from ....domain.tv_shows.value_objects import SeasonNumber
from .sidecar import (
EpisodeSidecar,
SeasonSidecar,
@@ -70,14 +70,15 @@ def _season_to_sidecar(season: Season, path: str) -> SeasonSidecar:
episodes=tuple(_episode_to_sidecar(ep) for ep in season.episodes),
)
# PACK mode — no episodes, season-scoped tracks. The domain doesn't
# currently expose season-level tracks (they live on episodes), so a
# PACK season produced from the domain alone has empty tracks. The
# repository populates them from the probe when scanning a real
# folder; here we just relay what is on the season.
# PACK mode — season-scoped tracks (single release covering the whole
# season). Summarize the same way as for episodes.
return SeasonSidecar(
number=season.season_number,
path=path,
audio_languages=tuple(season.audio_languages()),
subtitles=tuple(
_subtitle_track_to_entry(t) for t in season.subtitle_tracks
),
)
@@ -130,13 +131,17 @@ def _season_from_sidecar(season: SeasonSidecar) -> Season:
sb = SeasonBuilder(season.number)
for ep in season.episodes:
sb.add_episode(_episode_from_sidecar(ep, season.number))
if not season.episodes:
# PACK mode — populate season-scoped tracks from the sidecar.
sb.set_audio_tracks(_synth_audio_tracks(season.audio_languages))
sb.set_subtitle_tracks(_synth_subtitle_tracks(season.subtitles))
return sb.build()
def _episode_from_sidecar(
episode: EpisodeSidecar, season_number: SeasonNumber
) -> Episode:
audio_tracks = tuple(
def _synth_audio_tracks(
languages: tuple[str, ...],
) -> tuple[AudioTrack, ...]:
return tuple(
AudioTrack(
index=i,
codec=None,
@@ -144,9 +149,14 @@ def _episode_from_sidecar(
channel_layout=None,
language=lang,
)
for i, lang in enumerate(episode.audio_languages)
for i, lang in enumerate(languages)
)
subtitle_tracks = tuple(
def _synth_subtitle_tracks(
entries: tuple[SubtitleEntry, ...],
) -> tuple[SubtitleTrack, ...]:
return tuple(
SubtitleTrack(
index=i,
codec=None,
@@ -154,15 +164,20 @@ def _episode_from_sidecar(
is_default=False,
is_forced=(entry.type == "forced"),
)
for i, entry in enumerate(episode.subtitles)
for i, entry in enumerate(entries)
)
def _episode_from_sidecar(
episode: EpisodeSidecar, season_number: SeasonNumber
) -> Episode:
return Episode(
season_number=season_number,
episode_number=episode.number,
title="",
file_path=FilePath(episode.path),
audio_tracks=audio_tracks,
subtitle_tracks=subtitle_tracks,
audio_tracks=_synth_audio_tracks(episode.audio_languages),
subtitle_tracks=_synth_subtitle_tracks(episode.subtitles),
)
+277
View File
@@ -0,0 +1,277 @@
"""Integration tests for ``rescan_show``.
Uses the real filesystem (``tmp_path``), the real release knowledge
base, and the real ``.alfred`` repository. Only the media prober is
stubbed — ffprobe needs real bytes and a binary.
"""
from __future__ import annotations
from pathlib import Path
from alfred.application.library import rescan_show
from alfred.domain.shared.media import (
AudioTrack,
MediaInfo,
SubtitleTrack,
VideoTrack,
)
from alfred.domain.shared.value_objects import ImdbId
from alfred.domain.tv_shows.value_objects import SeasonNumber
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.persistence.dot_alfred import (
DotAlfredTVShowRepository,
)
from alfred.infrastructure.persistence.dot_alfred.repository import (
SIDECAR_FILENAME,
)
_KB = YamlReleaseKnowledge()
_SCANNER = PathlibFilesystemScanner()
# --------------------------------------------------------------------------- #
# Helpers #
# --------------------------------------------------------------------------- #
_MISSING = object()
class _StubProber:
"""Return a canned MediaInfo for every probe call, regardless of path."""
def __init__(self, info=_MISSING) -> None:
self._info: MediaInfo | None = (
_default_info() if info is _MISSING else info # type: ignore[assignment]
)
self.calls: list[Path] = []
def probe(self, video: Path) -> MediaInfo | None:
self.calls.append(video)
return self._info
def list_subtitle_streams(self, video: Path): # pragma: no cover - unused
return []
def _default_info() -> MediaInfo:
return MediaInfo(
video_tracks=(VideoTrack(index=0, codec="hevc", width=1920, height=1080),),
audio_tracks=(
AudioTrack(
index=0,
codec="eac3",
channels=6,
channel_layout="5.1",
language="eng",
),
),
subtitle_tracks=(
SubtitleTrack(
index=0,
codec="subrip",
language="eng",
is_default=False,
is_forced=False,
),
),
)
def _make_foundation_library(
root: Path,
*,
episodic_episodes: tuple[int, ...] = (1, 2, 3),
include_pack_s2: bool = True,
) -> Path:
"""Build a fake Foundation show folder under ``root``.
Layout (matches the Foundation fixture naming):
root/Foundation/
Foundation.S01.1080p.x265-ELiTE/ ← EPISODIC
Foundation.S01E01.1080p.x265-ELiTE.mkv
Foundation.S01E02.1080p.x265-ELiTE.mkv
Foundation.S01E03.1080p.x265-ELiTE.mkv
Foundation.S02.1080p.x265-ELiTE/ ← PACK
Foundation.S02.1080p.x265-ELiTE.mkv
"""
show_root = root / "Foundation"
show_root.mkdir()
s1 = show_root / "Foundation.S01.1080p.x265-ELiTE"
s1.mkdir()
for n in episodic_episodes:
(s1 / f"Foundation.S01E{n:02d}.1080p.x265-ELiTE.mkv").write_bytes(b"")
if include_pack_s2:
s2 = show_root / "Foundation.S02.1080p.x265-ELiTE"
s2.mkdir()
(s2 / "Foundation.S02.1080p.x265-ELiTE.mkv").write_bytes(b"")
return show_root
def _library_with_show(tmp_path: Path) -> tuple[Path, Path]:
"""Return (library_root, show_root) prepared for the repository."""
library = tmp_path / "library"
library.mkdir()
show_root = _make_foundation_library(library)
return library, show_root
# --------------------------------------------------------------------------- #
# Happy path #
# --------------------------------------------------------------------------- #
class TestHappyPath:
def test_builds_show_with_episodic_and_pack_seasons(self, tmp_path):
library, show_root = _library_with_show(tmp_path)
repo = DotAlfredTVShowRepository(library)
prober = _StubProber()
show = rescan_show(
show_root,
imdb_id="tt0804484",
tmdb_id=84958,
repository=repo,
scanner=_SCANNER,
prober=prober,
kb=_KB,
)
assert show.imdb_id == ImdbId("tt0804484")
assert show.tmdb_id == 84958
assert show.title == "Foundation"
assert show.seasons_count == 2
s1 = show.get_season(SeasonNumber(1))
s2 = show.get_season(SeasonNumber(2))
assert s1 is not None and s2 is not None
# S01 EPISODIC: three episodes, season-level tracks empty.
assert s1.episode_count == 3
assert s1.audio_tracks == ()
assert s1.subtitle_tracks == ()
# S02 PACK: no episodes, season-level tracks populated.
assert s2.episode_count == 0
assert len(s2.audio_tracks) == 1
assert s2.audio_tracks[0].language == "eng"
assert len(s2.subtitle_tracks) == 1
def test_episode_paths_are_relative_to_show_root(self, tmp_path):
library, show_root = _library_with_show(tmp_path)
repo = DotAlfredTVShowRepository(library)
show = rescan_show(
show_root,
imdb_id="tt0804484",
repository=repo,
scanner=_SCANNER,
prober=_StubProber(),
kb=_KB,
)
s1 = show.get_season(SeasonNumber(1))
assert s1 is not None
for ep in s1.episodes:
assert ep.file_path is not None
path_str = str(ep.file_path)
# Must NOT be absolute and must start with the season folder.
assert not Path(path_str).is_absolute()
assert path_str.startswith("Foundation.S01.1080p.x265-ELiTE/")
def test_persists_sidecar_on_disk(self, tmp_path):
library, show_root = _library_with_show(tmp_path)
repo = DotAlfredTVShowRepository(library)
rescan_show(
show_root,
imdb_id="tt0804484",
repository=repo,
scanner=_SCANNER,
prober=_StubProber(),
kb=_KB,
)
assert (show_root / SIDECAR_FILENAME).is_file()
# Round-trip via the repo.
recovered = repo.find_by_imdb_id(ImdbId("tt0804484"))
assert recovered is not None
assert recovered.seasons_count == 2
def test_probe_called_once_per_video(self, tmp_path):
library, show_root = _library_with_show(tmp_path)
repo = DotAlfredTVShowRepository(library)
prober = _StubProber()
rescan_show(
show_root,
imdb_id="tt0804484",
repository=repo,
scanner=_SCANNER,
prober=prober,
kb=_KB,
)
# 3 episodes + 1 pack video = 4 probes.
assert len(prober.calls) == 4
# --------------------------------------------------------------------------- #
# Edge cases #
# --------------------------------------------------------------------------- #
class TestEdgeCases:
def test_empty_show_root_yields_empty_show(self, tmp_path):
library = tmp_path / "library"
library.mkdir()
show_root = library / "Empty"
show_root.mkdir()
repo = DotAlfredTVShowRepository(library)
show = rescan_show(
show_root,
imdb_id="tt0000001",
repository=repo,
scanner=_SCANNER,
prober=_StubProber(),
kb=_KB,
)
assert show.seasons_count == 0
assert (show_root / SIDECAR_FILENAME).is_file()
def test_season_folder_without_videos_is_skipped(self, tmp_path, caplog):
library = tmp_path / "library"
library.mkdir()
show_root = library / "Foundation"
show_root.mkdir()
(show_root / "Foundation.S01.WEB").mkdir() # empty season folder
repo = DotAlfredTVShowRepository(library)
with caplog.at_level("WARNING"):
show = rescan_show(
show_root,
imdb_id="tt0804484",
repository=repo,
scanner=_SCANNER,
prober=_StubProber(),
kb=_KB,
)
assert show.seasons_count == 0
assert any("no video file" in r.message for r in caplog.records)
def test_prober_returning_none_still_produces_episodes(self, tmp_path):
library = tmp_path / "library"
library.mkdir()
show_root = _make_foundation_library(
library, episodic_episodes=(1,), include_pack_s2=False
)
repo = DotAlfredTVShowRepository(library)
# Prober returns None — inspect_release skips enrichment, tracks empty.
show = rescan_show(
show_root,
imdb_id="tt0804484",
repository=repo,
scanner=_SCANNER,
prober=_StubProber(info=None),
kb=_KB,
)
s1 = show.get_season(SeasonNumber(1))
assert s1 is not None
assert s1.episode_count == 1
ep = s1.episodes[0]
assert ep.audio_tracks == ()
assert ep.subtitle_tracks == ()
+128
View File
@@ -0,0 +1,128 @@
"""Tests for the show-tree walker."""
from __future__ import annotations
from alfred.application.library.walker import walk_show
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
_KB = YamlReleaseKnowledge()
_SCANNER = PathlibFilesystemScanner()
def _episode(dir_path, name: str) -> None:
(dir_path / name).write_bytes(b"")
class TestSeasonFolderDetection:
def test_keeps_folders_with_season_token(self, tmp_path):
show = tmp_path / "Foundation"
show.mkdir()
(show / "Foundation.S01.1080p.WEB-DL.x265-GROUP").mkdir()
(show / "Foundation.S02.Complete.1080p.WEB-DL-GROUP").mkdir()
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
names = sorted(s.season_dir.name for s in tree.season_folders)
assert names == [
"Foundation.S01.1080p.WEB-DL.x265-GROUP",
"Foundation.S02.Complete.1080p.WEB-DL-GROUP",
]
def test_skips_folders_without_season_token(self, tmp_path):
show = tmp_path / "Foundation"
show.mkdir()
(show / "Sample").mkdir()
(show / "Soundtrack").mkdir()
(show / "Foundation.S01.WEB").mkdir()
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
assert [s.season_dir.name for s in tree.season_folders] == [
"Foundation.S01.WEB"
]
def test_season_token_is_case_insensitive(self, tmp_path):
show = tmp_path / "Foundation"
show.mkdir()
(show / "Foundation.s01.WEB").mkdir()
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
assert len(tree.season_folders) == 1
def test_season_token_rejects_in_word(self, tmp_path):
# "Season01" without separator must not match (we want a real
# Sxx token, not any letter S followed by digits).
show = tmp_path / "WeirdShow"
show.mkdir()
(show / "Pass100").mkdir()
(show / "ABS01XYZ").mkdir()
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
assert tree.season_folders == ()
def test_ignores_files_at_show_root(self, tmp_path):
show = tmp_path / "Foundation"
show.mkdir()
(show / "Foundation.S01.mkv").write_bytes(b"") # file, not a folder
(show / "Foundation.S01.WEB").mkdir()
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
assert [s.season_dir.name for s in tree.season_folders] == [
"Foundation.S01.WEB"
]
class TestVideoFileCollection:
def test_keeps_only_video_extensions(self, tmp_path):
show = tmp_path / "Foundation"
show.mkdir()
season = show / "Foundation.S01.WEB"
season.mkdir()
_episode(season, "Foundation.S01E01.mkv")
_episode(season, "Foundation.S01E02.mp4")
_episode(season, "Foundation.S01E01.eng.srt") # not a video
_episode(season, "info.nfo") # not a video
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
(folder,) = tree.season_folders
suffixes = sorted(p.suffix for p in folder.video_files)
assert suffixes == [".mkv", ".mp4"]
def test_empty_season_folder_surfaces(self, tmp_path):
show = tmp_path / "Foundation"
show.mkdir()
(show / "Foundation.S01.WEB").mkdir()
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
(folder,) = tree.season_folders
assert folder.video_files == ()
def test_single_pack_video_is_kept(self, tmp_path):
show = tmp_path / "Foundation"
show.mkdir()
season = show / "Foundation.S01.Complete.WEB"
season.mkdir()
_episode(season, "Foundation.S01.Complete.WEB.mkv")
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
(folder,) = tree.season_folders
assert [p.name for p in folder.video_files] == [
"Foundation.S01.Complete.WEB.mkv"
]
def test_does_not_recurse_into_sub_subfolders(self, tmp_path):
show = tmp_path / "Foundation"
show.mkdir()
season = show / "Foundation.S01.WEB"
season.mkdir()
nested = season / "Extras"
nested.mkdir()
_episode(nested, "Foundation.S01E01.mkv")
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
(folder,) = tree.season_folders
assert folder.video_files == ()
class TestEdgeCases:
def test_empty_show_root(self, tmp_path):
show = tmp_path / "Empty"
show.mkdir()
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
assert tree.show_root == show
assert tree.season_folders == ()
def test_missing_show_root_returns_empty_tree(self, tmp_path):
# Scanner returns [] for non-existent paths; walker must not raise.
tree = walk_show(tmp_path / "Nope", scanner=_SCANNER, kb=_KB)
assert tree.season_folders == ()