6802933acd
- test_release.py / test_release_fixtures.py: module-level _KB = YamlReleaseKnowledge() + thin _parse(name) helper threading it into parse_release. test_show_folder_name_strips_windows_chars renamed to test_show_folder_name_uses_already_safe_title to reflect the Option B contract (caller sanitizes via kb.sanitize_for_fs). - test_detect_media_type.py: same _KB pattern, all detect_media_type(parsed, path) calls now pass kb. - test_filesystem_extras.py: find_video_file(path) calls now pass kb. - test_enrich_from_probe.py: _bare() helper adds the new title_sanitized field. - test_resolve_destination.py: drop _sanitize import + TestSanitize class (helper deleted), add tmdb_title_safe arg to _resolve_series_folder calls. 987 passed, 8 skipped.
388 lines
14 KiB
Python
388 lines
14 KiB
Python
"""Tests for the smaller ``alfred.infrastructure.filesystem`` helpers.
|
|
|
|
Covers four siblings of ``FileManager`` that had near-zero coverage:
|
|
|
|
- ``ffprobe.probe`` — wraps ``ffprobe`` JSON output into a ``MediaInfo``.
|
|
- ``filesystem_operations.create_folder`` / ``move`` — thin
|
|
``mkdir`` / ``mv`` wrappers returning dict-shaped responses.
|
|
- ``organizer.MediaOrganizer`` — computes destination paths for movies
|
|
and TV episodes; creates folders for them.
|
|
- ``find_video.find_video_file`` — first-video lookup in a folder.
|
|
|
|
External commands (``ffprobe`` / ``mv``) are patched via ``subprocess.run``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from alfred.domain.movies.entities import Movie
|
|
from alfred.domain.movies.value_objects import MovieTitle, Quality, ReleaseYear
|
|
from alfred.domain.shared.value_objects import ImdbId
|
|
from alfred.domain.tv_shows.entities import Episode, TVShow
|
|
from alfred.domain.tv_shows.value_objects import (
|
|
EpisodeNumber,
|
|
SeasonNumber,
|
|
ShowStatus,
|
|
)
|
|
from alfred.infrastructure.filesystem import ffprobe
|
|
from alfred.infrastructure.filesystem.filesystem_operations import (
|
|
create_folder,
|
|
move,
|
|
)
|
|
from alfred.infrastructure.filesystem.find_video import find_video_file
|
|
from alfred.infrastructure.filesystem.organizer import MediaOrganizer
|
|
from alfred.infrastructure.knowledge.release_kb import YamlReleaseKnowledge
|
|
|
|
_KB = YamlReleaseKnowledge()
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# ffprobe.probe #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _ffprobe_result(returncode=0, stdout="{}", stderr="") -> MagicMock:
|
|
return MagicMock(returncode=returncode, stdout=stdout, stderr=stderr)
|
|
|
|
|
|
class TestFfprobe:
|
|
def test_timeout_returns_none(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired(cmd="ffprobe", timeout=30),
|
|
):
|
|
assert ffprobe.probe(f) is None
|
|
|
|
def test_nonzero_returncode_returns_none(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(returncode=1, stderr="not a media file"),
|
|
):
|
|
assert ffprobe.probe(f) is None
|
|
|
|
def test_invalid_json_returns_none(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout="not json {"),
|
|
):
|
|
assert ffprobe.probe(f) is None
|
|
|
|
def test_parses_format_duration_and_bitrate(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
payload = {
|
|
"format": {"duration": "1234.5", "bit_rate": "5000000"},
|
|
"streams": [],
|
|
}
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
|
):
|
|
info = ffprobe.probe(f)
|
|
assert info is not None
|
|
assert info.duration_seconds == 1234.5
|
|
assert info.bitrate_kbps == 5000 # bit_rate // 1000
|
|
|
|
def test_invalid_numeric_format_fields_skipped(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
payload = {
|
|
"format": {"duration": "garbage", "bit_rate": "also-bad"},
|
|
"streams": [],
|
|
}
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
|
):
|
|
info = ffprobe.probe(f)
|
|
assert info is not None
|
|
assert info.duration_seconds is None
|
|
assert info.bitrate_kbps is None
|
|
|
|
def test_parses_streams(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
payload = {
|
|
"format": {},
|
|
"streams": [
|
|
{
|
|
"index": 0,
|
|
"codec_type": "video",
|
|
"codec_name": "h264",
|
|
"width": 1920,
|
|
"height": 1080,
|
|
},
|
|
{
|
|
"index": 1,
|
|
"codec_type": "audio",
|
|
"codec_name": "ac3",
|
|
"channels": 6,
|
|
"channel_layout": "5.1",
|
|
"tags": {"language": "eng"},
|
|
"disposition": {"default": 1},
|
|
},
|
|
{
|
|
"index": 2,
|
|
"codec_type": "audio",
|
|
"codec_name": "aac",
|
|
"channels": 2,
|
|
"tags": {"language": "fra"},
|
|
},
|
|
{
|
|
"index": 3,
|
|
"codec_type": "subtitle",
|
|
"codec_name": "subrip",
|
|
"tags": {"language": "fra"},
|
|
"disposition": {"forced": 1},
|
|
},
|
|
],
|
|
}
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
|
):
|
|
info = ffprobe.probe(f)
|
|
assert info.video_codec == "h264"
|
|
assert info.width == 1920 and info.height == 1080
|
|
assert len(info.audio_tracks) == 2
|
|
eng = info.audio_tracks[0]
|
|
assert eng.language == "eng"
|
|
assert eng.is_default is True
|
|
assert info.audio_tracks[1].is_default is False
|
|
assert len(info.subtitle_tracks) == 1
|
|
assert info.subtitle_tracks[0].is_forced is True
|
|
|
|
def test_first_video_stream_wins(self, tmp_path):
|
|
# The implementation only fills video_codec on the FIRST video stream.
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
payload = {
|
|
"format": {},
|
|
"streams": [
|
|
{"codec_type": "video", "codec_name": "h264", "width": 1920},
|
|
{"codec_type": "video", "codec_name": "hevc", "width": 3840},
|
|
],
|
|
}
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
|
):
|
|
info = ffprobe.probe(f)
|
|
assert info.video_codec == "h264"
|
|
assert info.width == 1920
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# filesystem_operations #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestCreateFolder:
|
|
def test_creates_nested(self, tmp_path):
|
|
target = tmp_path / "a" / "b" / "c"
|
|
out = create_folder(str(target))
|
|
assert out == {"status": "ok", "path": str(target)}
|
|
assert target.is_dir()
|
|
|
|
def test_existing_is_ok(self, tmp_path):
|
|
out = create_folder(str(tmp_path))
|
|
assert out["status"] == "ok"
|
|
|
|
def test_os_error_wrapped(self, tmp_path):
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.filesystem_operations.Path.mkdir",
|
|
side_effect=OSError("readonly fs"),
|
|
):
|
|
out = create_folder(str(tmp_path / "x"))
|
|
assert out == {
|
|
"status": "error",
|
|
"error": "mkdir_failed",
|
|
"message": "readonly fs",
|
|
}
|
|
|
|
|
|
class TestMove:
|
|
def test_source_not_found(self, tmp_path):
|
|
out = move(str(tmp_path / "ghost"), str(tmp_path / "dst"))
|
|
assert out["status"] == "error"
|
|
assert out["error"] == "source_not_found"
|
|
|
|
def test_destination_exists(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.write_text("x")
|
|
dst = tmp_path / "dst"
|
|
dst.write_text("y")
|
|
out = move(str(src), str(dst))
|
|
assert out["error"] == "destination_exists"
|
|
|
|
def test_happy_path_returns_ok(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.write_text("x")
|
|
dst = tmp_path / "dst"
|
|
# Patch subprocess so we don't actually shell out; pretend success.
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.filesystem_operations.subprocess.run",
|
|
return_value=MagicMock(returncode=0, stderr=""),
|
|
):
|
|
out = move(str(src), str(dst))
|
|
assert out == {"status": "ok", "source": str(src), "destination": str(dst)}
|
|
|
|
def test_mv_failure_wrapped(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.write_text("x")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.filesystem_operations.subprocess.run",
|
|
return_value=MagicMock(returncode=1, stderr="cross-device link\n"),
|
|
):
|
|
out = move(str(src), str(tmp_path / "dst"))
|
|
assert out["error"] == "move_failed"
|
|
assert out["message"] == "cross-device link"
|
|
|
|
def test_os_error_wrapped(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.write_text("x")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.filesystem_operations.subprocess.run",
|
|
side_effect=OSError("ENOSPC"),
|
|
):
|
|
out = move(str(src), str(tmp_path / "dst"))
|
|
assert out["error"] == "move_failed"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# find_video #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestFindVideo:
|
|
def test_returns_file_directly_when_video(self, tmp_path):
|
|
f = tmp_path / "Movie.mkv"
|
|
f.write_bytes(b"")
|
|
assert find_video_file(f, _KB) == f
|
|
|
|
def test_returns_none_when_file_is_not_video(self, tmp_path):
|
|
f = tmp_path / "notes.txt"
|
|
f.write_text("x")
|
|
assert find_video_file(f, _KB) is None
|
|
|
|
def test_returns_none_when_folder_has_no_video(self, tmp_path):
|
|
(tmp_path / "a.txt").write_text("x")
|
|
assert find_video_file(tmp_path, _KB) is None
|
|
|
|
def test_returns_first_sorted_video(self, tmp_path):
|
|
(tmp_path / "B.mkv").write_bytes(b"")
|
|
(tmp_path / "A.mkv").write_bytes(b"")
|
|
(tmp_path / "C.mkv").write_bytes(b"")
|
|
found = find_video_file(tmp_path, _KB)
|
|
assert found.name == "A.mkv"
|
|
|
|
def test_recurses_into_subfolders(self, tmp_path):
|
|
sub = tmp_path / "sub"
|
|
sub.mkdir()
|
|
(sub / "X.mkv").write_bytes(b"")
|
|
found = find_video_file(tmp_path, _KB)
|
|
assert found is not None and found.name == "X.mkv"
|
|
|
|
def test_case_insensitive_extension(self, tmp_path):
|
|
f = tmp_path / "Movie.MKV"
|
|
f.write_bytes(b"")
|
|
assert find_video_file(f, _KB) == f
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# MediaOrganizer #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _movie() -> Movie:
|
|
return Movie(
|
|
imdb_id=ImdbId("tt1375666"),
|
|
title=MovieTitle("Inception"),
|
|
release_year=ReleaseYear(2010),
|
|
quality=Quality.HD,
|
|
)
|
|
|
|
|
|
def _show() -> TVShow:
|
|
return TVShow(
|
|
imdb_id=ImdbId("tt0773262"),
|
|
title="Dexter",
|
|
expected_seasons=8,
|
|
status=ShowStatus.ENDED,
|
|
)
|
|
|
|
|
|
def _episode() -> Episode:
|
|
return Episode(
|
|
season_number=SeasonNumber(1),
|
|
episode_number=EpisodeNumber(1),
|
|
title="Dexter",
|
|
)
|
|
|
|
|
|
class TestMediaOrganizer:
|
|
def test_get_movie_destination(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
out = org.get_movie_destination(_movie(), "source.mkv")
|
|
# Path: /movies/<folder>/<filename>.mkv
|
|
assert out.suffix == ".mkv"
|
|
assert out.parent.name == _movie().get_folder_name()
|
|
assert out.parent.parent == tmp_path / "movies"
|
|
|
|
def test_get_movie_destination_preserves_extension(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
out = org.get_movie_destination(_movie(), "source.MP4")
|
|
assert out.suffix == ".MP4"
|
|
|
|
def test_get_episode_destination(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
out = org.get_episode_destination(_show(), _episode(), "raw.mkv")
|
|
# Path: /tv/<show>/<season>/<episode>.mkv
|
|
assert out.suffix == ".mkv"
|
|
assert out.parent.parent.parent == tmp_path / "tv"
|
|
assert out.parent.parent.name == _show().get_folder_name()
|
|
|
|
def test_create_movie_directory_creates_folder(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
assert org.create_movie_directory(_movie()) is True
|
|
assert (tmp_path / "movies" / _movie().get_folder_name()).is_dir()
|
|
|
|
def test_create_movie_directory_already_exists_ok(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
org.create_movie_directory(_movie())
|
|
# Second call is also fine (parents=True, exist_ok=True).
|
|
assert org.create_movie_directory(_movie()) is True
|
|
|
|
def test_create_movie_directory_failure_returns_false(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.organizer.Path.mkdir",
|
|
side_effect=PermissionError("denied"),
|
|
):
|
|
assert org.create_movie_directory(_movie()) is False
|
|
|
|
def test_create_episode_directory_creates_season_folder(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
assert org.create_episode_directory(_show(), 1) is True
|
|
# /tv/<show>/<season> exists
|
|
show_dir = tmp_path / "tv" / _show().get_folder_name()
|
|
assert show_dir.is_dir()
|
|
# At least one child (the season folder) was created.
|
|
assert any(show_dir.iterdir())
|
|
|
|
def test_create_episode_directory_failure_returns_false(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.organizer.Path.mkdir",
|
|
side_effect=OSError("readonly"),
|
|
):
|
|
assert org.create_episode_directory(_show(), 1) is False
|