refactor(rescan): Phase 4 Step 1 — rescan_show on v2 release repo
Rewrite rescan_show to build a SeriesRelease (Phase 1 v2 aggregate)
and persist it via DotAlfredSeriesReleaseRepository. The orchestrator
keeps reusing inspect_release as the single source of parse/probe
truth — only the assembly target changes (SeriesRelease/SeasonRelease/
EpisodeRelease instead of TVShow/Season/Episode).
New signature
rescan_show(
show_root,
*,
tmdb_id: TmdbId,
imdb_id: ImdbId | None = None,
series_repo: DotAlfredSeriesReleaseRepository,
scanner,
prober,
kb,
) -> SeriesRelease
Identity is TMDB-anchored (tmdb_id required, no coercion); imdb_id is
optional. No TMDB call from rescan — the library index auto-heals
from the new sidecar on its next read.
PACK vs EPISODIC
* Single-video + season-parsed + no-episode → SeasonRelease(
mode=PACK, folder=<season folder>, episodes=()). The slot map stays
empty until the Phase 5 TMDB sync supplies episode_count. We do
not fabricate an EpisodeRange we cannot prove on disk.
* Otherwise → EPISODIC: every file with (season, episode) becomes an
EpisodeRelease with EpisodeRange(start, end) = (E, E). Multi-episode
files (S01E01E02) still record only the first slot — Parser does
not yet expose episode_end (existing tech debt, unchanged).
Package move
The orchestrator moves from alfred/application/library/ to
alfred/application/tv_shows/ for symmetry with alfred/application/
movies/ (Step 2). walker.py + its tests move with it. The empty
library/ package is deleted.
Tests
tests/application/tv_shows/test_rescan.py rewritten end-to-end on
the real v2 repository, real KB, real scanner, stubbed prober.
9 happy-path + edge-case scenarios cover EPISODIC track flattening,
PACK empty-episodes semantics, sidecar round-trip, imdb_id optional,
empty show root, season folder with no videos, prober returning None.
test_walker.py moved verbatim (import path updated).
Full suite: 1214 passed / 10 skipped / 4 xfailed. The three v1
dot_alfred quarantines from Phase 3 stay in place until Step 3.
This commit is contained in:
@@ -0,0 +1,317 @@
|
||||
"""Integration tests for the v2 ``rescan_show``.
|
||||
|
||||
Uses the real filesystem (``tmp_path``), the real release knowledge
|
||||
base, the real v2 ``.alfred`` series repository, and the real scanner.
|
||||
Only the media prober is stubbed — ffprobe needs real bytes and a
|
||||
binary.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from alfred.application.tv_shows import rescan_show
|
||||
from alfred.domain.releases.value_objects import ReleaseMode
|
||||
from alfred.domain.shared.media import (
|
||||
AudioTrack,
|
||||
MediaInfo,
|
||||
SubtitleTrack,
|
||||
VideoTrack,
|
||||
)
|
||||
from alfred.domain.shared.value_objects import ImdbId, TmdbId
|
||||
from alfred.domain.tv_shows.value_objects import SeasonNumber
|
||||
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
from alfred.infrastructure.persistence.dot_alfred.v2.repository import (
|
||||
SIDECAR_FILENAME,
|
||||
DotAlfredSeriesReleaseRepository,
|
||||
)
|
||||
|
||||
_KB = YamlReleaseKnowledge()
|
||||
_SCANNER = PathlibFilesystemScanner()
|
||||
_TMDB_ID = TmdbId(84958)
|
||||
_IMDB_ID = ImdbId("tt0804484")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Helpers #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
_MISSING = object()
|
||||
|
||||
|
||||
class _StubProber:
|
||||
"""Return a canned MediaInfo for every probe call, regardless of path."""
|
||||
|
||||
def __init__(self, info=_MISSING) -> None:
|
||||
self._info: MediaInfo | None = (
|
||||
_default_info() if info is _MISSING else info # type: ignore[assignment]
|
||||
)
|
||||
self.calls: list[Path] = []
|
||||
|
||||
def probe(self, video: Path) -> MediaInfo | None:
|
||||
self.calls.append(video)
|
||||
return self._info
|
||||
|
||||
def list_subtitle_streams(self, video: Path): # pragma: no cover - unused
|
||||
return []
|
||||
|
||||
|
||||
def _default_info() -> MediaInfo:
|
||||
return MediaInfo(
|
||||
video_tracks=(VideoTrack(index=0, codec="hevc", width=1920, height=1080),),
|
||||
audio_tracks=(
|
||||
AudioTrack(
|
||||
index=0,
|
||||
codec="eac3",
|
||||
channels=6,
|
||||
channel_layout="5.1",
|
||||
language="eng",
|
||||
),
|
||||
),
|
||||
subtitle_tracks=(
|
||||
SubtitleTrack(
|
||||
index=0,
|
||||
codec="subrip",
|
||||
language="eng",
|
||||
is_default=False,
|
||||
is_forced=False,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _make_foundation_library(
|
||||
root: Path,
|
||||
*,
|
||||
episodic_episodes: tuple[int, ...] = (1, 2, 3),
|
||||
include_pack_s2: bool = True,
|
||||
) -> Path:
|
||||
"""Build a fake Foundation show folder under ``root``.
|
||||
|
||||
Layout::
|
||||
|
||||
root/Foundation/
|
||||
Foundation.S01.1080p.x265-ELiTE/ ← EPISODIC
|
||||
Foundation.S01E01.1080p.x265-ELiTE.mkv
|
||||
Foundation.S01E02.1080p.x265-ELiTE.mkv
|
||||
Foundation.S01E03.1080p.x265-ELiTE.mkv
|
||||
Foundation.S02.1080p.x265-ELiTE/ ← PACK
|
||||
Foundation.S02.1080p.x265-ELiTE.mkv
|
||||
"""
|
||||
show_root = root / "Foundation"
|
||||
show_root.mkdir()
|
||||
s1 = show_root / "Foundation.S01.1080p.x265-ELiTE"
|
||||
s1.mkdir()
|
||||
for n in episodic_episodes:
|
||||
(s1 / f"Foundation.S01E{n:02d}.1080p.x265-ELiTE.mkv").write_bytes(b"")
|
||||
if include_pack_s2:
|
||||
s2 = show_root / "Foundation.S02.1080p.x265-ELiTE"
|
||||
s2.mkdir()
|
||||
(s2 / "Foundation.S02.1080p.x265-ELiTE.mkv").write_bytes(b"")
|
||||
return show_root
|
||||
|
||||
|
||||
def _library_with_show(tmp_path: Path) -> tuple[Path, Path]:
|
||||
"""Return (library_root, show_root) prepared for the repository."""
|
||||
library = tmp_path / "library"
|
||||
library.mkdir()
|
||||
show_root = _make_foundation_library(library)
|
||||
return library, show_root
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Happy path #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestHappyPath:
|
||||
def test_builds_release_with_episodic_and_pack_seasons(self, tmp_path):
|
||||
library, show_root = _library_with_show(tmp_path)
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
prober = _StubProber()
|
||||
|
||||
release = rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
imdb_id=_IMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=prober,
|
||||
kb=_KB,
|
||||
)
|
||||
|
||||
assert release.tmdb_id == _TMDB_ID
|
||||
assert release.imdb_id == _IMDB_ID
|
||||
assert len(release.seasons) == 2
|
||||
|
||||
s1 = release.get_season(SeasonNumber(1))
|
||||
s2 = release.get_season(SeasonNumber(2))
|
||||
assert s1 is not None and s2 is not None
|
||||
|
||||
# S01 EPISODIC: three single-episode releases, each carrying
|
||||
# its own TrackProfile.
|
||||
assert s1.mode is ReleaseMode.EPISODIC
|
||||
assert s1.episode_count() == 3
|
||||
for ep in s1.episodes:
|
||||
assert ep.episodes.is_single()
|
||||
assert len(ep.tracks.audio_tracks) == 1
|
||||
assert ep.tracks.audio_tracks[0].language == "eng"
|
||||
assert len(ep.tracks.subtitle_tracks) == 1
|
||||
|
||||
# S02 PACK: empty episodes tuple. Phase 5 (TMDB sync) will
|
||||
# populate the slot map once episode_count is known.
|
||||
assert s2.mode is ReleaseMode.PACK
|
||||
assert s2.episodes == ()
|
||||
|
||||
def test_episode_paths_are_relative_to_show_root(self, tmp_path):
|
||||
library, show_root = _library_with_show(tmp_path)
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
release = rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
imdb_id=_IMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=_StubProber(),
|
||||
kb=_KB,
|
||||
)
|
||||
s1 = release.get_season(SeasonNumber(1))
|
||||
assert s1 is not None
|
||||
for ep in s1.episodes:
|
||||
path_str = str(ep.file_path)
|
||||
# Must NOT be absolute and must start with the season folder.
|
||||
assert not Path(path_str).is_absolute()
|
||||
assert path_str.startswith("Foundation.S01.1080p.x265-ELiTE/")
|
||||
|
||||
def test_season_folder_name_recorded(self, tmp_path):
|
||||
library, show_root = _library_with_show(tmp_path)
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
release = rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
imdb_id=_IMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=_StubProber(),
|
||||
kb=_KB,
|
||||
)
|
||||
s1 = release.get_season(SeasonNumber(1))
|
||||
s2 = release.get_season(SeasonNumber(2))
|
||||
assert s1 is not None and s2 is not None
|
||||
assert s1.folder == "Foundation.S01.1080p.x265-ELiTE"
|
||||
assert s2.folder == "Foundation.S02.1080p.x265-ELiTE"
|
||||
|
||||
def test_persists_sidecar_on_disk(self, tmp_path):
|
||||
library, show_root = _library_with_show(tmp_path)
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
imdb_id=_IMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=_StubProber(),
|
||||
kb=_KB,
|
||||
)
|
||||
assert (show_root / SIDECAR_FILENAME).is_file()
|
||||
# Round-trip via the repo.
|
||||
recovered = repo.find_by_tmdb_id(_TMDB_ID)
|
||||
assert recovered is not None
|
||||
assert len(recovered.seasons) == 2
|
||||
|
||||
def test_probe_called_once_per_video(self, tmp_path):
|
||||
library, show_root = _library_with_show(tmp_path)
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
prober = _StubProber()
|
||||
rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
imdb_id=_IMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=prober,
|
||||
kb=_KB,
|
||||
)
|
||||
# 3 episodes + 1 pack video = 4 probes.
|
||||
assert len(prober.calls) == 4
|
||||
|
||||
def test_imdb_id_optional(self, tmp_path):
|
||||
library, show_root = _library_with_show(tmp_path)
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
release = rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=_StubProber(),
|
||||
kb=_KB,
|
||||
)
|
||||
assert release.imdb_id is None
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Edge cases #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_empty_show_root_yields_empty_release(self, tmp_path):
|
||||
library = tmp_path / "library"
|
||||
library.mkdir()
|
||||
show_root = library / "Empty"
|
||||
show_root.mkdir()
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
release = rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=_StubProber(),
|
||||
kb=_KB,
|
||||
)
|
||||
assert release.seasons == ()
|
||||
assert (show_root / SIDECAR_FILENAME).is_file()
|
||||
|
||||
def test_season_folder_without_videos_is_skipped(self, tmp_path, caplog):
|
||||
library = tmp_path / "library"
|
||||
library.mkdir()
|
||||
show_root = library / "Foundation"
|
||||
show_root.mkdir()
|
||||
(show_root / "Foundation.S01.WEB").mkdir() # empty season folder
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
with caplog.at_level("WARNING"):
|
||||
release = rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=_StubProber(),
|
||||
kb=_KB,
|
||||
)
|
||||
assert release.seasons == ()
|
||||
assert any("no video file" in r.message for r in caplog.records)
|
||||
|
||||
def test_prober_returning_none_still_produces_episodes(self, tmp_path):
|
||||
library = tmp_path / "library"
|
||||
library.mkdir()
|
||||
show_root = _make_foundation_library(
|
||||
library, episodic_episodes=(1,), include_pack_s2=False
|
||||
)
|
||||
repo = DotAlfredSeriesReleaseRepository(library)
|
||||
# Prober returns None — inspect_release skips enrichment, tracks empty.
|
||||
release = rescan_show(
|
||||
show_root,
|
||||
tmdb_id=_TMDB_ID,
|
||||
series_repo=repo,
|
||||
scanner=_SCANNER,
|
||||
prober=_StubProber(info=None),
|
||||
kb=_KB,
|
||||
)
|
||||
s1 = release.get_season(SeasonNumber(1))
|
||||
assert s1 is not None
|
||||
assert s1.episode_count() == 1
|
||||
ep = s1.episodes[0]
|
||||
assert ep.tracks.audio_tracks == ()
|
||||
assert ep.tracks.subtitle_tracks == ()
|
||||
@@ -0,0 +1,128 @@
|
||||
"""Tests for the show-tree walker."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from alfred.application.tv_shows.walker import walk_show
|
||||
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
|
||||
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
||||
|
||||
_KB = YamlReleaseKnowledge()
|
||||
_SCANNER = PathlibFilesystemScanner()
|
||||
|
||||
|
||||
def _episode(dir_path, name: str) -> None:
|
||||
(dir_path / name).write_bytes(b"")
|
||||
|
||||
|
||||
class TestSeasonFolderDetection:
|
||||
def test_keeps_folders_with_season_token(self, tmp_path):
|
||||
show = tmp_path / "Foundation"
|
||||
show.mkdir()
|
||||
(show / "Foundation.S01.1080p.WEB-DL.x265-GROUP").mkdir()
|
||||
(show / "Foundation.S02.Complete.1080p.WEB-DL-GROUP").mkdir()
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
names = sorted(s.season_dir.name for s in tree.season_folders)
|
||||
assert names == [
|
||||
"Foundation.S01.1080p.WEB-DL.x265-GROUP",
|
||||
"Foundation.S02.Complete.1080p.WEB-DL-GROUP",
|
||||
]
|
||||
|
||||
def test_skips_folders_without_season_token(self, tmp_path):
|
||||
show = tmp_path / "Foundation"
|
||||
show.mkdir()
|
||||
(show / "Sample").mkdir()
|
||||
(show / "Soundtrack").mkdir()
|
||||
(show / "Foundation.S01.WEB").mkdir()
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
assert [s.season_dir.name for s in tree.season_folders] == [
|
||||
"Foundation.S01.WEB"
|
||||
]
|
||||
|
||||
def test_season_token_is_case_insensitive(self, tmp_path):
|
||||
show = tmp_path / "Foundation"
|
||||
show.mkdir()
|
||||
(show / "Foundation.s01.WEB").mkdir()
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
assert len(tree.season_folders) == 1
|
||||
|
||||
def test_season_token_rejects_in_word(self, tmp_path):
|
||||
# "Season01" without separator must not match (we want a real
|
||||
# Sxx token, not any letter S followed by digits).
|
||||
show = tmp_path / "WeirdShow"
|
||||
show.mkdir()
|
||||
(show / "Pass100").mkdir()
|
||||
(show / "ABS01XYZ").mkdir()
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
assert tree.season_folders == ()
|
||||
|
||||
def test_ignores_files_at_show_root(self, tmp_path):
|
||||
show = tmp_path / "Foundation"
|
||||
show.mkdir()
|
||||
(show / "Foundation.S01.mkv").write_bytes(b"") # file, not a folder
|
||||
(show / "Foundation.S01.WEB").mkdir()
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
assert [s.season_dir.name for s in tree.season_folders] == [
|
||||
"Foundation.S01.WEB"
|
||||
]
|
||||
|
||||
|
||||
class TestVideoFileCollection:
|
||||
def test_keeps_only_video_extensions(self, tmp_path):
|
||||
show = tmp_path / "Foundation"
|
||||
show.mkdir()
|
||||
season = show / "Foundation.S01.WEB"
|
||||
season.mkdir()
|
||||
_episode(season, "Foundation.S01E01.mkv")
|
||||
_episode(season, "Foundation.S01E02.mp4")
|
||||
_episode(season, "Foundation.S01E01.eng.srt") # not a video
|
||||
_episode(season, "info.nfo") # not a video
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
(folder,) = tree.season_folders
|
||||
suffixes = sorted(p.suffix for p in folder.video_files)
|
||||
assert suffixes == [".mkv", ".mp4"]
|
||||
|
||||
def test_empty_season_folder_surfaces(self, tmp_path):
|
||||
show = tmp_path / "Foundation"
|
||||
show.mkdir()
|
||||
(show / "Foundation.S01.WEB").mkdir()
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
(folder,) = tree.season_folders
|
||||
assert folder.video_files == ()
|
||||
|
||||
def test_single_pack_video_is_kept(self, tmp_path):
|
||||
show = tmp_path / "Foundation"
|
||||
show.mkdir()
|
||||
season = show / "Foundation.S01.Complete.WEB"
|
||||
season.mkdir()
|
||||
_episode(season, "Foundation.S01.Complete.WEB.mkv")
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
(folder,) = tree.season_folders
|
||||
assert [p.name for p in folder.video_files] == [
|
||||
"Foundation.S01.Complete.WEB.mkv"
|
||||
]
|
||||
|
||||
def test_does_not_recurse_into_sub_subfolders(self, tmp_path):
|
||||
show = tmp_path / "Foundation"
|
||||
show.mkdir()
|
||||
season = show / "Foundation.S01.WEB"
|
||||
season.mkdir()
|
||||
nested = season / "Extras"
|
||||
nested.mkdir()
|
||||
_episode(nested, "Foundation.S01E01.mkv")
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
(folder,) = tree.season_folders
|
||||
assert folder.video_files == ()
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_empty_show_root(self, tmp_path):
|
||||
show = tmp_path / "Empty"
|
||||
show.mkdir()
|
||||
tree = walk_show(show, scanner=_SCANNER, kb=_KB)
|
||||
assert tree.show_root == show
|
||||
assert tree.season_folders == ()
|
||||
|
||||
def test_missing_show_root_returns_empty_tree(self, tmp_path):
|
||||
# Scanner returns [] for non-existent paths; walker must not raise.
|
||||
tree = walk_show(tmp_path / "Nope", scanner=_SCANNER, kb=_KB)
|
||||
assert tree.season_folders == ()
|
||||
Reference in New Issue
Block a user