249c5de76a
- Refactor memory system (episodic/STM/LTM with components) - Implement complete subtitle domain (scanner, matcher, placer) - Add YAML workflow infrastructure - Externalize knowledge base (patterns, release groups) - Add comprehensive testing suite - Create manual testing CLIs
326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""
|
|
Tests for alfred.infrastructure.filesystem.file_manager.FileManager
|
|
|
|
Uses real temp filesystem. No mocks on os.link — we test the actual behavior.
|
|
"""
|
|
|
|
import os
|
|
import stat
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from alfred.infrastructure.filesystem.file_manager import FileManager
|
|
from alfred.infrastructure.filesystem.exceptions import PathTraversalError
|
|
|
|
|
|
@pytest.fixture
|
|
def fm():
|
|
return FileManager()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# copy_file (hard-link)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCopyFile:
|
|
|
|
def test_creates_hard_link(self, fm, tmp_path):
|
|
src = tmp_path / "source.mkv"
|
|
src.write_bytes(b"video data")
|
|
dst = tmp_path / "dest.mkv"
|
|
|
|
result = fm.copy_file(str(src), str(dst))
|
|
|
|
assert result["status"] == "ok"
|
|
assert dst.exists()
|
|
# Same inode = hard link
|
|
assert src.stat().st_ino == dst.stat().st_ino
|
|
|
|
def test_returns_correct_metadata(self, fm, tmp_path):
|
|
src = tmp_path / "movie.mkv"
|
|
src.write_bytes(b"x" * 1024)
|
|
dst = tmp_path / "movie_copy.mkv"
|
|
|
|
result = fm.copy_file(str(src), str(dst))
|
|
|
|
assert result["filename"] == "movie_copy.mkv"
|
|
assert result["size"] == 1024
|
|
assert result["source"] == str(src)
|
|
assert result["destination"] == str(dst)
|
|
|
|
def test_source_not_found(self, fm, tmp_path):
|
|
result = fm.copy_file(str(tmp_path / "nope.mkv"), str(tmp_path / "dst.mkv"))
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "source_not_found"
|
|
|
|
def test_source_is_directory(self, fm, tmp_path):
|
|
src_dir = tmp_path / "a_dir"
|
|
src_dir.mkdir()
|
|
result = fm.copy_file(str(src_dir), str(tmp_path / "dst.mkv"))
|
|
assert result["error"] == "source_not_file"
|
|
|
|
def test_destination_already_exists(self, fm, tmp_path):
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(b"data")
|
|
dst = tmp_path / "dst.mkv"
|
|
dst.write_bytes(b"other")
|
|
|
|
result = fm.copy_file(str(src), str(dst))
|
|
assert result["error"] == "destination_exists"
|
|
|
|
def test_destination_dir_not_found(self, fm, tmp_path):
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(b"data")
|
|
result = fm.copy_file(str(src), str(tmp_path / "nonexistent" / "dst.mkv"))
|
|
assert result["error"] == "destination_dir_not_found"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# move_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMoveFile:
|
|
|
|
def test_moves_file(self, fm, tmp_path):
|
|
src = tmp_path / "episode.mkv"
|
|
src.write_bytes(b"video")
|
|
dst_dir = tmp_path / "library"
|
|
dst_dir.mkdir()
|
|
dst = dst_dir / "episode.mkv"
|
|
|
|
result = fm.move_file(str(src), str(dst))
|
|
|
|
assert result["status"] == "ok"
|
|
assert dst.exists()
|
|
assert not src.exists()
|
|
|
|
def test_source_deleted_after_move(self, fm, tmp_path):
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(b"data")
|
|
dst = tmp_path / "dst.mkv"
|
|
|
|
fm.move_file(str(src), str(dst))
|
|
assert not src.exists()
|
|
|
|
def test_move_preserves_content(self, fm, tmp_path):
|
|
content = b"important video content"
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(content)
|
|
dst = tmp_path / "dst.mkv"
|
|
|
|
fm.move_file(str(src), str(dst))
|
|
assert dst.read_bytes() == content
|
|
|
|
def test_move_fails_if_source_missing(self, fm, tmp_path):
|
|
result = fm.move_file(str(tmp_path / "ghost.mkv"), str(tmp_path / "dst.mkv"))
|
|
assert result["status"] == "error"
|
|
|
|
def test_move_fails_if_destination_exists(self, fm, tmp_path):
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(b"a")
|
|
dst = tmp_path / "dst.mkv"
|
|
dst.write_bytes(b"b")
|
|
|
|
result = fm.move_file(str(src), str(dst))
|
|
assert result["status"] == "error"
|
|
# Source should NOT be deleted since the link failed
|
|
assert src.exists()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_seed_links
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCreateSeedLinks:
|
|
|
|
def _setup(self, tmp_path):
|
|
"""Create realistic download + library + torrent structure."""
|
|
download = tmp_path / "downloads" / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
download.mkdir(parents=True)
|
|
video = download / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST.mp4"
|
|
video.write_bytes(b"video content")
|
|
(download / "KONTRAST.txt").write_text("release info")
|
|
(download / "[TGx]info.txt").write_text("tgx info")
|
|
subs = download / "Subs" / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST"
|
|
subs.mkdir(parents=True)
|
|
(subs / "2_eng.srt").write_text("subtitle content")
|
|
|
|
library = tmp_path / "tv" / "Oz.1997.1080p.WEBRip.x265-KONTRAST" / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
library.mkdir(parents=True)
|
|
lib_video = library / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST.mp4"
|
|
# Hard-link the video to simulate post-move state
|
|
os.link(video, lib_video)
|
|
video.unlink() # simulate the move
|
|
|
|
torrents = tmp_path / "torrents"
|
|
torrents.mkdir()
|
|
|
|
return lib_video, download, torrents
|
|
|
|
def test_creates_torrent_subfolder(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
assert result["status"] == "ok"
|
|
expected = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
assert expected.is_dir()
|
|
|
|
def test_hard_links_library_video(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
linked = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST" / lib_video.name
|
|
assert linked.exists()
|
|
assert linked.stat().st_ino == lib_video.stat().st_ino
|
|
|
|
def test_copies_remaining_files(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
assert result["status"] == "ok"
|
|
torrent_sub = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
# txt files should be copied
|
|
assert (torrent_sub / "KONTRAST.txt").exists()
|
|
assert (torrent_sub / "[TGx]info.txt").exists()
|
|
|
|
def test_copies_subs_subdirectory(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
srt = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST" / "Subs" / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST" / "2_eng.srt"
|
|
assert srt.exists()
|
|
|
|
def test_returns_copied_and_skipped(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
assert result["copied_count"] >= 3 # txt x2 + srt
|
|
assert result["skipped"] == []
|
|
|
|
def test_skips_already_existing_files(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
# First call
|
|
fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
# Add a new txt file and call again with a fresh lib_video copy
|
|
lib2 = lib_video.parent / "Oz.S01E02.1080p.WEBRip.x265-KONTRAST.mp4"
|
|
lib2.write_bytes(b"ep2")
|
|
(download / "extra.nfo").write_text("nfo")
|
|
result2 = fm.create_seed_links(str(lib2), str(download), str(torrents))
|
|
|
|
assert result2["status"] == "ok"
|
|
# The already-copied files should appear in skipped
|
|
skipped_names = [Path(s).name for s in result2["skipped"]]
|
|
assert "KONTRAST.txt" in skipped_names
|
|
|
|
def test_error_library_file_not_found(self, fm, tmp_path):
|
|
download = tmp_path / "downloads" / "SomeShow.S01"
|
|
download.mkdir(parents=True)
|
|
torrents = tmp_path / "torrents"
|
|
torrents.mkdir()
|
|
|
|
result = fm.create_seed_links(
|
|
str(tmp_path / "ghost.mkv"),
|
|
str(download),
|
|
str(torrents),
|
|
)
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "library_file_not_found"
|
|
|
|
def test_error_source_folder_not_found(self, fm, tmp_path):
|
|
lib = tmp_path / "lib.mkv"
|
|
lib.write_bytes(b"v")
|
|
torrents = tmp_path / "torrents"
|
|
torrents.mkdir()
|
|
|
|
result = fm.create_seed_links(
|
|
str(lib),
|
|
str(tmp_path / "ghost_folder"),
|
|
str(torrents),
|
|
)
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "source_folder_not_found"
|
|
|
|
def test_error_torrent_folder_not_found(self, fm, tmp_path):
|
|
lib = tmp_path / "lib.mkv"
|
|
lib.write_bytes(b"v")
|
|
download = tmp_path / "dl"
|
|
download.mkdir()
|
|
|
|
result = fm.create_seed_links(
|
|
str(lib),
|
|
str(download),
|
|
str(tmp_path / "no_torrents"),
|
|
)
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "torrent_folder_not_found"
|
|
|
|
def test_error_link_already_exists(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
# Second call with same video → link_dest already exists
|
|
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "destination_exists"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_folder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestListFolder:
|
|
|
|
def test_lists_entries(self, fm, memory_configured, infra_temp):
|
|
result = fm.list_folder("download")
|
|
assert result["status"] == "ok"
|
|
assert result["count"] > 0
|
|
assert isinstance(result["entries"], list)
|
|
|
|
def test_entries_are_sorted(self, fm, memory_configured, infra_temp):
|
|
result = fm.list_folder("download")
|
|
assert result["entries"] == sorted(result["entries"])
|
|
|
|
def test_folder_not_set(self, fm, memory):
|
|
result = fm.list_folder("tv_show")
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "folder_not_set"
|
|
|
|
def test_invalid_folder_type(self, fm, memory):
|
|
result = fm.list_folder("nonexistent_type")
|
|
assert result["status"] == "error"
|
|
|
|
def test_relative_path_within_folder(self, fm, memory_configured, infra_temp):
|
|
result = fm.list_folder("download", "test_series")
|
|
assert result["status"] == "ok"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _sanitize_path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSanitizePath:
|
|
|
|
def test_normal_path(self, fm):
|
|
assert fm._sanitize_path("some/path") == "some/path"
|
|
|
|
def test_dot_path(self, fm):
|
|
assert fm._sanitize_path(".") == "."
|
|
|
|
def test_absolute_path_rejected(self, fm):
|
|
with pytest.raises(PathTraversalError):
|
|
fm._sanitize_path("/etc/passwd")
|
|
|
|
def test_parent_traversal_rejected(self, fm):
|
|
with pytest.raises(PathTraversalError):
|
|
fm._sanitize_path("../../etc/passwd")
|
|
|
|
def test_null_byte_rejected(self, fm):
|
|
with pytest.raises(PathTraversalError):
|
|
fm._sanitize_path("some\x00path")
|
|
|
|
def test_normalises_redundant_dots(self, fm):
|
|
result = fm._sanitize_path("some/./path")
|
|
assert ".." not in result
|