e07c9ec77b
Several weeks of work accumulated without being committed. Grouped here for clarity; see CHANGELOG.md [Unreleased] for the user-facing summary. Highlights ---------- P1 #2 — ISO 639-2/B canonical migration - New Language VO + LanguageRegistry (alfred/domain/shared/knowledge/). - iso_languages.yaml as single source of truth for language codes. - SubtitleKnowledgeBase now delegates lookup to LanguageRegistry; subtitles.yaml only declares subtitle-specific tokens (vostfr, vf, vff, …). - SubtitlePreferences default → ["fre", "eng"]; subtitle filenames written as {iso639_2b}.srt (legacy fr.srt still read via alias). - Scanner: dropped _LANG_KEYWORDS / _SDH_TOKENS / _FORCED_TOKENS / SUBTITLE_EXTENSIONS hardcoded dicts. - Fixed: 'hi' token no longer marks SDH (conflicted with Hindi alias). - Added settings.min_movie_size_bytes (was a module constant). P1 #3 — Release parser unification + data-driven tokenizer - parse_release() is now the single source of truth for release-name parsing. - alfred/knowledge/release/separators.yaml declares the token separators used by the tokenizer (., space, [, ], (, ), _). New conventions can be added without code changes. - Tokenizer now splits on any configured separator instead of name.split('.'). Releases like 'The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]' parse via the direct path without sanitization fallback. - Site-tag extraction always runs first; well-formedness only rejects truly forbidden chars. - _parse_season_episode() extended with NxNN / NxNNxNN alt forms. - Removed dead helpers: _sanitize, _normalize. Domain cleanup - Deleted fossil services with zero production callers: alfred/domain/movies/services.py alfred/domain/tv_shows/services.py alfred/domain/subtitles/services.py (replaced by subtitles/services/ package) alfred/domain/subtitles/repositories.py - Split monolithic subtitle services into a package (identifier, matcher, placer, pattern_detector, utils) + dedicated knowledge/ package. - MediaInfo split into dedicated package (alfred/domain/shared/media/: audio, video, subtitle, info, matching). Persistence cleanup - Removed dead JSON repositories (movie/subtitle/tvshow_repository.py). Tests - Major expansion of the test suite organized to mirror the source tree. - Removed obsolete *_edge_cases test files superseded by structured tests. - Suite: 990 passed, 8 skipped. Misc - .gitignore: exclude env_backup/ and *.bak. - Adjustments across agent/llm, app.py, application/filesystem, and infrastructure/filesystem to align with the new domain layout.
385 lines
14 KiB
Python
385 lines
14 KiB
Python
"""Tests for the smaller ``alfred.infrastructure.filesystem`` helpers.
|
|
|
|
Covers four siblings of ``FileManager`` that had near-zero coverage:
|
|
|
|
- ``ffprobe.probe`` — wraps ``ffprobe`` JSON output into a ``MediaInfo``.
|
|
- ``filesystem_operations.create_folder`` / ``move`` — thin
|
|
``mkdir`` / ``mv`` wrappers returning dict-shaped responses.
|
|
- ``organizer.MediaOrganizer`` — computes destination paths for movies
|
|
and TV episodes; creates folders for them.
|
|
- ``find_video.find_video_file`` — first-video lookup in a folder.
|
|
|
|
External commands (``ffprobe`` / ``mv``) are patched via ``subprocess.run``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from alfred.domain.movies.entities import Movie
|
|
from alfred.domain.movies.value_objects import MovieTitle, Quality, ReleaseYear
|
|
from alfred.domain.shared.value_objects import ImdbId
|
|
from alfred.domain.tv_shows.entities import Episode, TVShow
|
|
from alfred.domain.tv_shows.value_objects import (
|
|
EpisodeNumber,
|
|
SeasonNumber,
|
|
ShowStatus,
|
|
)
|
|
from alfred.infrastructure.filesystem import ffprobe
|
|
from alfred.infrastructure.filesystem.filesystem_operations import (
|
|
create_folder,
|
|
move,
|
|
)
|
|
from alfred.infrastructure.filesystem.find_video import find_video_file
|
|
from alfred.infrastructure.filesystem.organizer import MediaOrganizer
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# ffprobe.probe #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _ffprobe_result(returncode=0, stdout="{}", stderr="") -> MagicMock:
|
|
return MagicMock(returncode=returncode, stdout=stdout, stderr=stderr)
|
|
|
|
|
|
class TestFfprobe:
|
|
def test_timeout_returns_none(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired(cmd="ffprobe", timeout=30),
|
|
):
|
|
assert ffprobe.probe(f) is None
|
|
|
|
def test_nonzero_returncode_returns_none(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(returncode=1, stderr="not a media file"),
|
|
):
|
|
assert ffprobe.probe(f) is None
|
|
|
|
def test_invalid_json_returns_none(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout="not json {"),
|
|
):
|
|
assert ffprobe.probe(f) is None
|
|
|
|
def test_parses_format_duration_and_bitrate(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
payload = {
|
|
"format": {"duration": "1234.5", "bit_rate": "5000000"},
|
|
"streams": [],
|
|
}
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
|
):
|
|
info = ffprobe.probe(f)
|
|
assert info is not None
|
|
assert info.duration_seconds == 1234.5
|
|
assert info.bitrate_kbps == 5000 # bit_rate // 1000
|
|
|
|
def test_invalid_numeric_format_fields_skipped(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
payload = {
|
|
"format": {"duration": "garbage", "bit_rate": "also-bad"},
|
|
"streams": [],
|
|
}
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
|
):
|
|
info = ffprobe.probe(f)
|
|
assert info is not None
|
|
assert info.duration_seconds is None
|
|
assert info.bitrate_kbps is None
|
|
|
|
def test_parses_streams(self, tmp_path):
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
payload = {
|
|
"format": {},
|
|
"streams": [
|
|
{
|
|
"index": 0,
|
|
"codec_type": "video",
|
|
"codec_name": "h264",
|
|
"width": 1920,
|
|
"height": 1080,
|
|
},
|
|
{
|
|
"index": 1,
|
|
"codec_type": "audio",
|
|
"codec_name": "ac3",
|
|
"channels": 6,
|
|
"channel_layout": "5.1",
|
|
"tags": {"language": "eng"},
|
|
"disposition": {"default": 1},
|
|
},
|
|
{
|
|
"index": 2,
|
|
"codec_type": "audio",
|
|
"codec_name": "aac",
|
|
"channels": 2,
|
|
"tags": {"language": "fra"},
|
|
},
|
|
{
|
|
"index": 3,
|
|
"codec_type": "subtitle",
|
|
"codec_name": "subrip",
|
|
"tags": {"language": "fra"},
|
|
"disposition": {"forced": 1},
|
|
},
|
|
],
|
|
}
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
|
):
|
|
info = ffprobe.probe(f)
|
|
assert info.video_codec == "h264"
|
|
assert info.width == 1920 and info.height == 1080
|
|
assert len(info.audio_tracks) == 2
|
|
eng = info.audio_tracks[0]
|
|
assert eng.language == "eng"
|
|
assert eng.is_default is True
|
|
assert info.audio_tracks[1].is_default is False
|
|
assert len(info.subtitle_tracks) == 1
|
|
assert info.subtitle_tracks[0].is_forced is True
|
|
|
|
def test_first_video_stream_wins(self, tmp_path):
|
|
# The implementation only fills video_codec on the FIRST video stream.
|
|
f = tmp_path / "x.mkv"
|
|
f.write_bytes(b"")
|
|
payload = {
|
|
"format": {},
|
|
"streams": [
|
|
{"codec_type": "video", "codec_name": "h264", "width": 1920},
|
|
{"codec_type": "video", "codec_name": "hevc", "width": 3840},
|
|
],
|
|
}
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.ffprobe.subprocess.run",
|
|
return_value=_ffprobe_result(stdout=json.dumps(payload)),
|
|
):
|
|
info = ffprobe.probe(f)
|
|
assert info.video_codec == "h264"
|
|
assert info.width == 1920
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# filesystem_operations #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestCreateFolder:
|
|
def test_creates_nested(self, tmp_path):
|
|
target = tmp_path / "a" / "b" / "c"
|
|
out = create_folder(str(target))
|
|
assert out == {"status": "ok", "path": str(target)}
|
|
assert target.is_dir()
|
|
|
|
def test_existing_is_ok(self, tmp_path):
|
|
out = create_folder(str(tmp_path))
|
|
assert out["status"] == "ok"
|
|
|
|
def test_os_error_wrapped(self, tmp_path):
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.filesystem_operations.Path.mkdir",
|
|
side_effect=OSError("readonly fs"),
|
|
):
|
|
out = create_folder(str(tmp_path / "x"))
|
|
assert out == {
|
|
"status": "error",
|
|
"error": "mkdir_failed",
|
|
"message": "readonly fs",
|
|
}
|
|
|
|
|
|
class TestMove:
|
|
def test_source_not_found(self, tmp_path):
|
|
out = move(str(tmp_path / "ghost"), str(tmp_path / "dst"))
|
|
assert out["status"] == "error"
|
|
assert out["error"] == "source_not_found"
|
|
|
|
def test_destination_exists(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.write_text("x")
|
|
dst = tmp_path / "dst"
|
|
dst.write_text("y")
|
|
out = move(str(src), str(dst))
|
|
assert out["error"] == "destination_exists"
|
|
|
|
def test_happy_path_returns_ok(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.write_text("x")
|
|
dst = tmp_path / "dst"
|
|
# Patch subprocess so we don't actually shell out; pretend success.
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.filesystem_operations.subprocess.run",
|
|
return_value=MagicMock(returncode=0, stderr=""),
|
|
):
|
|
out = move(str(src), str(dst))
|
|
assert out == {"status": "ok", "source": str(src), "destination": str(dst)}
|
|
|
|
def test_mv_failure_wrapped(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.write_text("x")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.filesystem_operations.subprocess.run",
|
|
return_value=MagicMock(returncode=1, stderr="cross-device link\n"),
|
|
):
|
|
out = move(str(src), str(tmp_path / "dst"))
|
|
assert out["error"] == "move_failed"
|
|
assert out["message"] == "cross-device link"
|
|
|
|
def test_os_error_wrapped(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.write_text("x")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.filesystem_operations.subprocess.run",
|
|
side_effect=OSError("ENOSPC"),
|
|
):
|
|
out = move(str(src), str(tmp_path / "dst"))
|
|
assert out["error"] == "move_failed"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# find_video #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestFindVideo:
|
|
def test_returns_file_directly_when_video(self, tmp_path):
|
|
f = tmp_path / "Movie.mkv"
|
|
f.write_bytes(b"")
|
|
assert find_video_file(f) == f
|
|
|
|
def test_returns_none_when_file_is_not_video(self, tmp_path):
|
|
f = tmp_path / "notes.txt"
|
|
f.write_text("x")
|
|
assert find_video_file(f) is None
|
|
|
|
def test_returns_none_when_folder_has_no_video(self, tmp_path):
|
|
(tmp_path / "a.txt").write_text("x")
|
|
assert find_video_file(tmp_path) is None
|
|
|
|
def test_returns_first_sorted_video(self, tmp_path):
|
|
(tmp_path / "B.mkv").write_bytes(b"")
|
|
(tmp_path / "A.mkv").write_bytes(b"")
|
|
(tmp_path / "C.mkv").write_bytes(b"")
|
|
found = find_video_file(tmp_path)
|
|
assert found.name == "A.mkv"
|
|
|
|
def test_recurses_into_subfolders(self, tmp_path):
|
|
sub = tmp_path / "sub"
|
|
sub.mkdir()
|
|
(sub / "X.mkv").write_bytes(b"")
|
|
found = find_video_file(tmp_path)
|
|
assert found is not None and found.name == "X.mkv"
|
|
|
|
def test_case_insensitive_extension(self, tmp_path):
|
|
f = tmp_path / "Movie.MKV"
|
|
f.write_bytes(b"")
|
|
assert find_video_file(f) == f
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# MediaOrganizer #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _movie() -> Movie:
|
|
return Movie(
|
|
imdb_id=ImdbId("tt1375666"),
|
|
title=MovieTitle("Inception"),
|
|
release_year=ReleaseYear(2010),
|
|
quality=Quality.HD,
|
|
)
|
|
|
|
|
|
def _show() -> TVShow:
|
|
return TVShow(
|
|
imdb_id=ImdbId("tt0773262"),
|
|
title="Dexter",
|
|
expected_seasons=8,
|
|
status=ShowStatus.ENDED,
|
|
)
|
|
|
|
|
|
def _episode() -> Episode:
|
|
return Episode(
|
|
season_number=SeasonNumber(1),
|
|
episode_number=EpisodeNumber(1),
|
|
title="Dexter",
|
|
)
|
|
|
|
|
|
class TestMediaOrganizer:
|
|
def test_get_movie_destination(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
out = org.get_movie_destination(_movie(), "source.mkv")
|
|
# Path: /movies/<folder>/<filename>.mkv
|
|
assert out.suffix == ".mkv"
|
|
assert out.parent.name == _movie().get_folder_name()
|
|
assert out.parent.parent == tmp_path / "movies"
|
|
|
|
def test_get_movie_destination_preserves_extension(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
out = org.get_movie_destination(_movie(), "source.MP4")
|
|
assert out.suffix == ".MP4"
|
|
|
|
def test_get_episode_destination(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
out = org.get_episode_destination(_show(), _episode(), "raw.mkv")
|
|
# Path: /tv/<show>/<season>/<episode>.mkv
|
|
assert out.suffix == ".mkv"
|
|
assert out.parent.parent.parent == tmp_path / "tv"
|
|
assert out.parent.parent.name == _show().get_folder_name()
|
|
|
|
def test_create_movie_directory_creates_folder(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
assert org.create_movie_directory(_movie()) is True
|
|
assert (tmp_path / "movies" / _movie().get_folder_name()).is_dir()
|
|
|
|
def test_create_movie_directory_already_exists_ok(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
org.create_movie_directory(_movie())
|
|
# Second call is also fine (parents=True, exist_ok=True).
|
|
assert org.create_movie_directory(_movie()) is True
|
|
|
|
def test_create_movie_directory_failure_returns_false(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.organizer.Path.mkdir",
|
|
side_effect=PermissionError("denied"),
|
|
):
|
|
assert org.create_movie_directory(_movie()) is False
|
|
|
|
def test_create_episode_directory_creates_season_folder(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
assert org.create_episode_directory(_show(), 1) is True
|
|
# /tv/<show>/<season> exists
|
|
show_dir = tmp_path / "tv" / _show().get_folder_name()
|
|
assert show_dir.is_dir()
|
|
# At least one child (the season folder) was created.
|
|
assert any(show_dir.iterdir())
|
|
|
|
def test_create_episode_directory_failure_returns_false(self, tmp_path):
|
|
org = MediaOrganizer(tmp_path / "movies", tmp_path / "tv")
|
|
with patch(
|
|
"alfred.infrastructure.filesystem.organizer.Path.mkdir",
|
|
side_effect=OSError("readonly"),
|
|
):
|
|
assert org.create_episode_directory(_show(), 1) is False
|