Files
francwa e45465d52d feat: split resolve_destination, persona-driven prompts, qBittorrent relocation
Destination resolution
- Replace the single ResolveDestinationUseCase with four dedicated
  functions, one per release type:
    resolve_season_destination    (pack season, folder move)
    resolve_episode_destination   (single episode, file move)
    resolve_movie_destination     (movie, file move)
    resolve_series_destination    (multi-season pack, folder move)
- Each returns a dedicated DTO carrying only the fields relevant to
  that release type — no more polymorphic ResolvedDestination with
  half the fields unused depending on the case.
- Looser series folder matching: exact computed-name match is reused
  silently; any deviation (different group, multiple candidates) now
  prompts the user with all options including the computed name.

Agent tools
- Four new tools wrapping the use cases above; old resolve_destination
  removed from the registry.
- New move_to_destination tool: create_folder + move, chained — used
  after a resolve_* call to perform the actual relocation.
- Low-level filesystem_operations module (create_folder, move via mv)
  for instant same-FS renames (ZFS).

Prompt & persona
- New PromptBuilder (alfred/agent/prompt.py) replacing prompts.py:
  identity + personality block, situational expressions, memory
  schema, episodic/STM/config context, tool catalogue.
- Per-user expression system: knowledge/users/common.yaml +
  {username}.yaml are merged at runtime; one phrase per situation
  (greeting/success/error/...) is sampled into the system prompt.

qBittorrent integration
- Credentials now come from settings (qbittorrent_url/username/password)
  instead of hardcoded defaults.
- New client methods: find_by_name, set_location, recheck — the trio
  needed to update a torrent's save path and re-verify after a move.
- Host→container path translation settings (qbittorrent_host_path /
  qbittorrent_container_path) for docker-mounted setups.

Subtitles
- Identifier: strip parenthesized qualifiers (simplified, brazil…) at
  tokenization; new _tokenize_suffix used for the episode_subfolder
  pattern so episode-stem tokens no longer pollute language detection.
- Placer: extract _build_dest_name so it can be reused by the new
  dry_run path in ManageSubtitlesUseCase.
- Knowledge: add yue, ell, ind, msa, rus, vie, heb, tam, tel, tha,
  hin, ukr; add 'fre' to fra; add 'simplified'/'traditional' to zho.

Misc
- LTM workspace: add 'trash' folder slot.
- Default LLM provider switched to deepseek.
- testing/debug_release.py: CLI to parse a release, hit TMDB, and
  dry-run the destination resolution end-to-end.
2026-05-14 05:01:59 +02:00

336 lines
12 KiB
Python

"""
Tests for alfred.infrastructure.filesystem.file_manager.FileManager
Uses real temp filesystem. No mocks on os.link — we test the actual behavior.
"""
import os
from pathlib import Path
import pytest
from alfred.infrastructure.filesystem.exceptions import PathTraversalError
from alfred.infrastructure.filesystem.file_manager import FileManager
@pytest.fixture
def fm():
return FileManager()
# ---------------------------------------------------------------------------
# copy_file (hard-link)
# ---------------------------------------------------------------------------
class TestCopyFile:
def test_creates_hard_link(self, fm, tmp_path):
src = tmp_path / "source.mkv"
src.write_bytes(b"video data")
dst = tmp_path / "dest.mkv"
result = fm.copy_file(str(src), str(dst))
assert result["status"] == "ok"
assert dst.exists()
# Same inode = hard link
assert src.stat().st_ino == dst.stat().st_ino
def test_returns_correct_metadata(self, fm, tmp_path):
src = tmp_path / "movie.mkv"
src.write_bytes(b"x" * 1024)
dst = tmp_path / "movie_copy.mkv"
result = fm.copy_file(str(src), str(dst))
assert result["filename"] == "movie_copy.mkv"
assert result["size"] == 1024
assert result["source"] == str(src)
assert result["destination"] == str(dst)
def test_source_not_found(self, fm, tmp_path):
result = fm.copy_file(str(tmp_path / "nope.mkv"), str(tmp_path / "dst.mkv"))
assert result["status"] == "error"
assert result["error"] == "source_not_found"
def test_source_is_directory(self, fm, tmp_path):
src_dir = tmp_path / "a_dir"
src_dir.mkdir()
result = fm.copy_file(str(src_dir), str(tmp_path / "dst.mkv"))
assert result["error"] == "source_not_file"
def test_destination_already_exists(self, fm, tmp_path):
src = tmp_path / "src.mkv"
src.write_bytes(b"data")
dst = tmp_path / "dst.mkv"
dst.write_bytes(b"other")
result = fm.copy_file(str(src), str(dst))
assert result["error"] == "destination_exists"
def test_destination_dir_not_found(self, fm, tmp_path):
src = tmp_path / "src.mkv"
src.write_bytes(b"data")
result = fm.copy_file(str(src), str(tmp_path / "nonexistent" / "dst.mkv"))
assert result["error"] == "destination_dir_not_found"
# ---------------------------------------------------------------------------
# move_file
# ---------------------------------------------------------------------------
class TestMoveFile:
def test_moves_file(self, fm, tmp_path):
src = tmp_path / "episode.mkv"
src.write_bytes(b"video")
dst_dir = tmp_path / "library"
dst_dir.mkdir()
dst = dst_dir / "episode.mkv"
result = fm.move_file(str(src), str(dst))
assert result["status"] == "ok"
assert dst.exists()
assert not src.exists()
def test_source_deleted_after_move(self, fm, tmp_path):
src = tmp_path / "src.mkv"
src.write_bytes(b"data")
dst = tmp_path / "dst.mkv"
fm.move_file(str(src), str(dst))
assert not src.exists()
def test_move_preserves_content(self, fm, tmp_path):
content = b"important video content"
src = tmp_path / "src.mkv"
src.write_bytes(content)
dst = tmp_path / "dst.mkv"
fm.move_file(str(src), str(dst))
assert dst.read_bytes() == content
def test_move_fails_if_source_missing(self, fm, tmp_path):
result = fm.move_file(str(tmp_path / "ghost.mkv"), str(tmp_path / "dst.mkv"))
assert result["status"] == "error"
def test_move_fails_if_destination_exists(self, fm, tmp_path):
src = tmp_path / "src.mkv"
src.write_bytes(b"a")
dst = tmp_path / "dst.mkv"
dst.write_bytes(b"b")
result = fm.move_file(str(src), str(dst))
assert result["status"] == "error"
# Source should NOT be deleted since the link failed
assert src.exists()
# ---------------------------------------------------------------------------
# create_seed_links
# ---------------------------------------------------------------------------
class TestCreateSeedLinks:
def _setup(self, tmp_path):
"""Create realistic download + library + torrent structure."""
download = tmp_path / "downloads" / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
download.mkdir(parents=True)
video = download / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST.mp4"
video.write_bytes(b"video content")
(download / "KONTRAST.txt").write_text("release info")
(download / "[TGx]info.txt").write_text("tgx info")
subs = download / "Subs" / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST"
subs.mkdir(parents=True)
(subs / "2_eng.srt").write_text("subtitle content")
library = (
tmp_path
/ "tv"
/ "Oz.1997.1080p.WEBRip.x265-KONTRAST"
/ "Oz.S01.1080p.WEBRip.x265-KONTRAST"
)
library.mkdir(parents=True)
lib_video = library / "Oz.S01E01.1080p.WEBRip.x265-KONTRAST.mp4"
# Hard-link the video to simulate post-move state
os.link(video, lib_video)
video.unlink() # simulate the move
torrents = tmp_path / "torrents"
torrents.mkdir()
return lib_video, download, torrents
def test_creates_torrent_subfolder(self, fm, tmp_path):
lib_video, download, torrents = self._setup(tmp_path)
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
assert result["status"] == "ok"
expected = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
assert expected.is_dir()
def test_hard_links_library_video(self, fm, tmp_path):
lib_video, download, torrents = self._setup(tmp_path)
fm.create_seed_links(str(lib_video), str(download), str(torrents))
linked = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST" / lib_video.name
assert linked.exists()
assert linked.stat().st_ino == lib_video.stat().st_ino
def test_copies_remaining_files(self, fm, tmp_path):
lib_video, download, torrents = self._setup(tmp_path)
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
assert result["status"] == "ok"
torrent_sub = torrents / "Oz.S01.1080p.WEBRip.x265-KONTRAST"
# txt files should be copied
assert (torrent_sub / "KONTRAST.txt").exists()
assert (torrent_sub / "[TGx]info.txt").exists()
def test_copies_subs_subdirectory(self, fm, tmp_path):
lib_video, download, torrents = self._setup(tmp_path)
fm.create_seed_links(str(lib_video), str(download), str(torrents))
srt = (
torrents
/ "Oz.S01.1080p.WEBRip.x265-KONTRAST"
/ "Subs"
/ "Oz.S01E01.1080p.WEBRip.x265-KONTRAST"
/ "2_eng.srt"
)
assert srt.exists()
def test_returns_copied_and_skipped(self, fm, tmp_path):
lib_video, download, torrents = self._setup(tmp_path)
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
assert result["copied_count"] >= 3 # txt x2 + srt
assert result["skipped"] == []
def test_skips_already_existing_files(self, fm, tmp_path):
lib_video, download, torrents = self._setup(tmp_path)
# First call
fm.create_seed_links(str(lib_video), str(download), str(torrents))
# Add a new txt file and call again with a fresh lib_video copy
lib2 = lib_video.parent / "Oz.S01E02.1080p.WEBRip.x265-KONTRAST.mp4"
lib2.write_bytes(b"ep2")
(download / "extra.nfo").write_text("nfo")
result2 = fm.create_seed_links(str(lib2), str(download), str(torrents))
assert result2["status"] == "ok"
# The already-copied files should appear in skipped
skipped_names = [Path(s).name for s in result2["skipped"]]
assert "KONTRAST.txt" in skipped_names
def test_error_library_file_not_found(self, fm, tmp_path):
download = tmp_path / "downloads" / "SomeShow.S01"
download.mkdir(parents=True)
torrents = tmp_path / "torrents"
torrents.mkdir()
result = fm.create_seed_links(
str(tmp_path / "ghost.mkv"),
str(download),
str(torrents),
)
assert result["status"] == "error"
assert result["error"] == "library_file_not_found"
def test_error_source_folder_not_found(self, fm, tmp_path):
lib = tmp_path / "lib.mkv"
lib.write_bytes(b"v")
torrents = tmp_path / "torrents"
torrents.mkdir()
result = fm.create_seed_links(
str(lib),
str(tmp_path / "ghost_folder"),
str(torrents),
)
assert result["status"] == "error"
assert result["error"] == "source_folder_not_found"
def test_error_torrent_folder_not_found(self, fm, tmp_path):
lib = tmp_path / "lib.mkv"
lib.write_bytes(b"v")
download = tmp_path / "dl"
download.mkdir()
result = fm.create_seed_links(
str(lib),
str(download),
str(tmp_path / "no_torrents"),
)
assert result["status"] == "error"
assert result["error"] == "torrent_folder_not_found"
def test_error_link_already_exists(self, fm, tmp_path):
lib_video, download, torrents = self._setup(tmp_path)
fm.create_seed_links(str(lib_video), str(download), str(torrents))
# Second call with same video → link_dest already exists
result = fm.create_seed_links(str(lib_video), str(download), str(torrents))
assert result["status"] == "error"
assert result["error"] == "destination_exists"
# ---------------------------------------------------------------------------
# list_folder
# ---------------------------------------------------------------------------
class TestListFolder:
def test_lists_entries(self, fm, memory_configured, infra_temp):
result = fm.list_folder("download")
assert result["status"] == "ok"
assert result["count"] > 0
assert isinstance(result["entries"], list)
def test_entries_are_sorted(self, fm, memory_configured, infra_temp):
result = fm.list_folder("download")
assert result["entries"] == sorted(result["entries"])
def test_folder_not_set(self, fm, memory):
result = fm.list_folder("tv_show")
assert result["status"] == "error"
assert result["error"] == "folder_not_set"
def test_invalid_folder_type(self, fm, memory):
result = fm.list_folder("nonexistent_type")
assert result["status"] == "error"
def test_relative_path_within_folder(self, fm, memory_configured, infra_temp):
result = fm.list_folder("download", "test_series")
assert result["status"] == "ok"
# ---------------------------------------------------------------------------
# _sanitize_path
# ---------------------------------------------------------------------------
class TestSanitizePath:
def test_normal_path(self, fm):
assert fm._sanitize_path("some/path") == "some/path"
def test_dot_path(self, fm):
assert fm._sanitize_path(".") == "."
def test_absolute_path_rejected(self, fm):
with pytest.raises(PathTraversalError):
fm._sanitize_path("/etc/passwd")
def test_parent_traversal_rejected(self, fm):
with pytest.raises(PathTraversalError):
fm._sanitize_path("../../etc/passwd")
def test_null_byte_rejected(self, fm):
with pytest.raises(PathTraversalError):
fm._sanitize_path("some\x00path")
def test_normalises_redundant_dots(self, fm):
result = fm._sanitize_path("some/./path")
assert ".." not in result