e45465d52d
Destination resolution
- Replace the single ResolveDestinationUseCase with four dedicated
functions, one per release type:
resolve_season_destination (pack season, folder move)
resolve_episode_destination (single episode, file move)
resolve_movie_destination (movie, file move)
resolve_series_destination (multi-season pack, folder move)
- Each returns a dedicated DTO carrying only the fields relevant to
that release type — no more polymorphic ResolvedDestination with
half the fields unused depending on the case.
- Looser series folder matching: exact computed-name match is reused
silently; any deviation (different group, multiple candidates) now
prompts the user with all options including the computed name.
Agent tools
- Four new tools wrapping the use cases above; old resolve_destination
removed from the registry.
- New move_to_destination tool: create_folder + move, chained — used
after a resolve_* call to perform the actual relocation.
- Low-level filesystem_operations module (create_folder, move via mv)
for instant same-FS renames (ZFS).
Prompt & persona
- New PromptBuilder (alfred/agent/prompt.py) replacing prompts.py:
identity + personality block, situational expressions, memory
schema, episodic/STM/config context, tool catalogue.
- Per-user expression system: knowledge/users/common.yaml +
{username}.yaml are merged at runtime; one phrase per situation
(greeting/success/error/...) is sampled into the system prompt.
qBittorrent integration
- Credentials now come from settings (qbittorrent_url/username/password)
instead of hardcoded defaults.
- New client methods: find_by_name, set_location, recheck — the trio
needed to update a torrent's save path and re-verify after a move.
- Host→container path translation settings (qbittorrent_host_path /
qbittorrent_container_path) for docker-mounted setups.
Subtitles
- Identifier: strip parenthesized qualifiers (simplified, brazil…) at
tokenization; new _tokenize_suffix used for the episode_subfolder
pattern so episode-stem tokens no longer pollute language detection.
- Placer: extract _build_dest_name so it can be reused by the new
dry_run path in ManageSubtitlesUseCase.
- Knowledge: add yue, ell, ind, msa, rus, vie, heb, tam, tel, tha,
hin, ukr; add 'fre' to fra; add 'simplified'/'traditional' to zho.
Misc
- LTM workspace: add 'trash' folder slot.
- Default LLM provider switched to deepseek.
- testing/debug_release.py: CLI to parse a release, hit TMDB, and
dry-run the destination resolution end-to-end.
336 lines
12 KiB
Python
336 lines
12 KiB
Python
"""
|
|
Tests for alfred.infrastructure.filesystem.file_manager.FileManager
|
|
|
|
Uses real temp filesystem. No mocks on os.link — we test the actual behavior.
|
|
"""
|
|
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from alfred.infrastructure.filesystem.exceptions import PathTraversalError
|
|
from alfred.infrastructure.filesystem.file_manager import FileManager
|
|
|
|
|
|
@pytest.fixture
|
|
def fm():
|
|
return FileManager()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# copy_file (hard-link)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCopyFile:
|
|
def test_creates_hard_link(self, fm, tmp_path):
|
|
src = tmp_path / "source.mkv"
|
|
src.write_bytes(b"video data")
|
|
dst = tmp_path / "dest.mkv"
|
|
|
|
result = fm.copy_file(str(src), str(dst))
|
|
|
|
assert result["status"] == "ok"
|
|
assert dst.exists()
|
|
# Same inode = hard link
|
|
assert src.stat().st_ino == dst.stat().st_ino
|
|
|
|
def test_returns_correct_metadata(self, fm, tmp_path):
|
|
src = tmp_path / "movie.mkv"
|
|
src.write_bytes(b"x" * 1024)
|
|
dst = tmp_path / "movie_copy.mkv"
|
|
|
|
result = fm.copy_file(str(src), str(dst))
|
|
|
|
assert result["filename"] == "movie_copy.mkv"
|
|
assert result["size"] == 1024
|
|
assert result["source"] == str(src)
|
|
assert result["destination"] == str(dst)
|
|
|
|
def test_source_not_found(self, fm, tmp_path):
|
|
result = fm.copy_file(str(tmp_path / "nope.mkv"), str(tmp_path / "dst.mkv"))
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "source_not_found"
|
|
|
|
def test_source_is_directory(self, fm, tmp_path):
|
|
src_dir = tmp_path / "a_dir"
|
|
src_dir.mkdir()
|
|
result = fm.copy_file(str(src_dir), str(tmp_path / "dst.mkv"))
|
|
assert result["error"] == "source_not_file"
|
|
|
|
def test_destination_already_exists(self, fm, tmp_path):
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(b"data")
|
|
dst = tmp_path / "dst.mkv"
|
|
dst.write_bytes(b"other")
|
|
|
|
result = fm.copy_file(str(src), str(dst))
|
|
assert result["error"] == "destination_exists"
|
|
|
|
def test_destination_dir_not_found(self, fm, tmp_path):
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(b"data")
|
|
result = fm.copy_file(str(src), str(tmp_path / "nonexistent" / "dst.mkv"))
|
|
assert result["error"] == "destination_dir_not_found"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# move_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMoveFile:
|
|
def test_moves_file(self, fm, tmp_path):
|
|
src = tmp_path / "episode.mkv"
|
|
src.write_bytes(b"video")
|
|
dst_dir = tmp_path / "library"
|
|
dst_dir.mkdir()
|
|
dst = dst_dir / "episode.mkv"
|
|
|
|
result = fm.move_file(str(src), str(dst))
|
|
|
|
assert result["status"] == "ok"
|
|
assert dst.exists()
|
|
assert not src.exists()
|
|
|
|
def test_source_deleted_after_move(self, fm, tmp_path):
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(b"data")
|
|
dst = tmp_path / "dst.mkv"
|
|
|
|
fm.move_file(str(src), str(dst))
|
|
assert not src.exists()
|
|
|
|
def test_move_preserves_content(self, fm, tmp_path):
|
|
content = b"important video content"
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(content)
|
|
dst = tmp_path / "dst.mkv"
|
|
|
|
fm.move_file(str(src), str(dst))
|
|
assert dst.read_bytes() == content
|
|
|
|
def test_move_fails_if_source_missing(self, fm, tmp_path):
|
|
result = fm.move_file(str(tmp_path / "ghost.mkv"), str(tmp_path / "dst.mkv"))
|
|
assert result["status"] == "error"
|
|
|
|
def test_move_fails_if_destination_exists(self, fm, tmp_path):
|
|
src = tmp_path / "src.mkv"
|
|
src.write_bytes(b"a")
|
|
dst = tmp_path / "dst.mkv"
|
|
dst.write_bytes(b"b")
|
|
|
|
result = fm.move_file(str(src), str(dst))
|
|
assert result["status"] == "error"
|
|
# Source should NOT be deleted since the link failed
|
|
assert src.exists()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_seed_links
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCreateSeedLinks:
|
|
def _setup(self, tmp_path):
|
|
"""Create realistic download + library + torrent structure."""
|
|
download = tmp_path / "downloads" / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
download.mkdir(parents=True)
|
|
video = download / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST.mp4"
|
|
video.write_bytes(b"video content")
|
|
(download / "KONTRAST.txt").write_text("release info")
|
|
(download / "[TGx]info.txt").write_text("tgx info")
|
|
subs = download / "Subs" / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST"
|
|
subs.mkdir(parents=True)
|
|
(subs / "2_eng.srt").write_text("subtitle content")
|
|
|
|
library = (
|
|
tmp_path
|
|
/ "tv"
|
|
/ "Oz.1997.1080p.WEBRip.x265-KONTRAST"
|
|
/ "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
)
|
|
library.mkdir(parents=True)
|
|
lib_video = library / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST.mp4"
|
|
# Hard-link the video to simulate post-move state
|
|
os.link(video, lib_video)
|
|
video.unlink() # simulate the move
|
|
|
|
torrents = tmp_path / "torrents"
|
|
torrents.mkdir()
|
|
|
|
return lib_video, download, torrents
|
|
|
|
def test_creates_torrent_subfolder(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
assert result["status"] == "ok"
|
|
expected = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
assert expected.is_dir()
|
|
|
|
def test_hard_links_library_video(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
linked = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST" / lib_video.name
|
|
assert linked.exists()
|
|
assert linked.stat().st_ino == lib_video.stat().st_ino
|
|
|
|
def test_copies_remaining_files(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
assert result["status"] == "ok"
|
|
torrent_sub = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
# txt files should be copied
|
|
assert (torrent_sub / "KONTRAST.txt").exists()
|
|
assert (torrent_sub / "[TGx]info.txt").exists()
|
|
|
|
def test_copies_subs_subdirectory(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
srt = (
|
|
torrents
|
|
/ "Oz.S01.1080p.WEBRip.x265-KONTRAST"
|
|
/ "Subs"
|
|
/ "Oz.S01E01.1080p.WEBRip.x265-KONTRAST"
|
|
/ "2_eng.srt"
|
|
)
|
|
assert srt.exists()
|
|
|
|
def test_returns_copied_and_skipped(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
assert result["copied_count"] >= 3 # txt x2 + srt
|
|
assert result["skipped"] == []
|
|
|
|
def test_skips_already_existing_files(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
# First call
|
|
fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
# Add a new txt file and call again with a fresh lib_video copy
|
|
lib2 = lib_video.parent / "Oz.S01E02.1080p.WEBRip.x265-KONTRAST.mp4"
|
|
lib2.write_bytes(b"ep2")
|
|
(download / "extra.nfo").write_text("nfo")
|
|
result2 = fm.create_seed_links(str(lib2), str(download), str(torrents))
|
|
|
|
assert result2["status"] == "ok"
|
|
# The already-copied files should appear in skipped
|
|
skipped_names = [Path(s).name for s in result2["skipped"]]
|
|
assert "KONTRAST.txt" in skipped_names
|
|
|
|
def test_error_library_file_not_found(self, fm, tmp_path):
|
|
download = tmp_path / "downloads" / "SomeShow.S01"
|
|
download.mkdir(parents=True)
|
|
torrents = tmp_path / "torrents"
|
|
torrents.mkdir()
|
|
|
|
result = fm.create_seed_links(
|
|
str(tmp_path / "ghost.mkv"),
|
|
str(download),
|
|
str(torrents),
|
|
)
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "library_file_not_found"
|
|
|
|
def test_error_source_folder_not_found(self, fm, tmp_path):
|
|
lib = tmp_path / "lib.mkv"
|
|
lib.write_bytes(b"v")
|
|
torrents = tmp_path / "torrents"
|
|
torrents.mkdir()
|
|
|
|
result = fm.create_seed_links(
|
|
str(lib),
|
|
str(tmp_path / "ghost_folder"),
|
|
str(torrents),
|
|
)
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "source_folder_not_found"
|
|
|
|
def test_error_torrent_folder_not_found(self, fm, tmp_path):
|
|
lib = tmp_path / "lib.mkv"
|
|
lib.write_bytes(b"v")
|
|
download = tmp_path / "dl"
|
|
download.mkdir()
|
|
|
|
result = fm.create_seed_links(
|
|
str(lib),
|
|
str(download),
|
|
str(tmp_path / "no_torrents"),
|
|
)
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "torrent_folder_not_found"
|
|
|
|
def test_error_link_already_exists(self, fm, tmp_path):
|
|
lib_video, download, torrents = self._setup(tmp_path)
|
|
fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
|
|
# Second call with same video → link_dest already exists
|
|
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "destination_exists"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_folder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestListFolder:
|
|
def test_lists_entries(self, fm, memory_configured, infra_temp):
|
|
result = fm.list_folder("download")
|
|
assert result["status"] == "ok"
|
|
assert result["count"] > 0
|
|
assert isinstance(result["entries"], list)
|
|
|
|
def test_entries_are_sorted(self, fm, memory_configured, infra_temp):
|
|
result = fm.list_folder("download")
|
|
assert result["entries"] == sorted(result["entries"])
|
|
|
|
def test_folder_not_set(self, fm, memory):
|
|
result = fm.list_folder("tv_show")
|
|
assert result["status"] == "error"
|
|
assert result["error"] == "folder_not_set"
|
|
|
|
def test_invalid_folder_type(self, fm, memory):
|
|
result = fm.list_folder("nonexistent_type")
|
|
assert result["status"] == "error"
|
|
|
|
def test_relative_path_within_folder(self, fm, memory_configured, infra_temp):
|
|
result = fm.list_folder("download", "test_series")
|
|
assert result["status"] == "ok"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _sanitize_path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSanitizePath:
|
|
def test_normal_path(self, fm):
|
|
assert fm._sanitize_path("some/path") == "some/path"
|
|
|
|
def test_dot_path(self, fm):
|
|
assert fm._sanitize_path(".") == "."
|
|
|
|
def test_absolute_path_rejected(self, fm):
|
|
with pytest.raises(PathTraversalError):
|
|
fm._sanitize_path("/etc/passwd")
|
|
|
|
def test_parent_traversal_rejected(self, fm):
|
|
with pytest.raises(PathTraversalError):
|
|
fm._sanitize_path("../../etc/passwd")
|
|
|
|
def test_null_byte_rejected(self, fm):
|
|
with pytest.raises(PathTraversalError):
|
|
fm._sanitize_path("some\x00path")
|
|
|
|
def test_normalises_redundant_dots(self, fm):
|
|
result = fm._sanitize_path("some/./path")
|
|
assert ".." not in result
|