Files
alfred/tests/test_tools_edge_cases.py
francwa e07c9ec77b chore: sprint cleanup — language unification, parser unification, fossils removal
Several weeks of work accumulated without being committed. Grouped here for
clarity; see CHANGELOG.md [Unreleased] for the user-facing summary.

Highlights
----------

P1 #2 — ISO 639-2/B canonical migration
- New Language VO + LanguageRegistry (alfred/domain/shared/knowledge/).
- iso_languages.yaml as single source of truth for language codes.
- SubtitleKnowledgeBase now delegates lookup to LanguageRegistry; subtitles.yaml
  only declares subtitle-specific tokens (vostfr, vf, vff, …).
- SubtitlePreferences default → ["fre", "eng"]; subtitle filenames written as
  {iso639_2b}.srt (legacy fr.srt still read via alias).
- Scanner: dropped _LANG_KEYWORDS / _SDH_TOKENS / _FORCED_TOKENS /
  SUBTITLE_EXTENSIONS hardcoded dicts.
- Fixed: 'hi' token no longer marks SDH (conflicted with Hindi alias).
- Added settings.min_movie_size_bytes (was a module constant).

P1 #3 — Release parser unification + data-driven tokenizer
- parse_release() is now the single source of truth for release-name parsing.
- alfred/knowledge/release/separators.yaml declares the token separators used
  by the tokenizer (., space, [, ], (, ), _). New conventions can be added
  without code changes.
- Tokenizer now splits on any configured separator instead of name.split('.').
  Releases like 'The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]' parse via
  the direct path without sanitization fallback.
- Site-tag extraction always runs first; well-formedness only rejects truly
  forbidden chars.
- _parse_season_episode() extended with NxNN / NxNNxNN alt forms.
- Removed dead helpers: _sanitize, _normalize.

Domain cleanup
- Deleted fossil services with zero production callers:
    alfred/domain/movies/services.py
    alfred/domain/tv_shows/services.py
    alfred/domain/subtitles/services.py (replaced by subtitles/services/ package)
    alfred/domain/subtitles/repositories.py
- Split monolithic subtitle services into a package (identifier, matcher,
  placer, pattern_detector, utils) + dedicated knowledge/ package.
- MediaInfo split into dedicated package (alfred/domain/shared/media/:
  audio, video, subtitle, info, matching).

Persistence cleanup
- Removed dead JSON repositories (movie/subtitle/tvshow_repository.py).

Tests
- Major expansion of the test suite organized to mirror the source tree.
- Removed obsolete *_edge_cases test files superseded by structured tests.
- Suite: 990 passed, 8 skipped.

Misc
- .gitignore: exclude env_backup/ and *.bak.
- Adjustments across agent/llm, app.py, application/filesystem, and
  infrastructure/filesystem to align with the new domain layout.
2026-05-17 23:38:00 +02:00

459 lines
16 KiB
Python

"""Edge-case tests for the agent tools.
Exercises pathological and adversarial inputs for the public tool surface:
- **TestFindTorrentEdgeCases** — wraps ``find_torrent`` (mocking the use
case) to assert behavior on absent results, malformed responses, and
unexpected exceptions.
- **TestFilesystemEdgeCases** — pushes ``set_path_for_folder`` /
``list_folder`` through traversal attempts, null bytes, hidden files,
broken/escaping symlinks, unicode, deep paths, and oversize inputs.
Uses the current LTM API (``memory.ltm.workspace.download``); the legacy
flat attribute ``download_folder`` no longer exists.
"""
from unittest.mock import Mock, patch
import pytest
from alfred.agent.tools import api as api_tools
from alfred.agent.tools import filesystem as fs_tools
from alfred.infrastructure.persistence import get_memory
class TestFindTorrentEdgeCases:
"""Edge case tests for find_torrent."""
@patch("alfred.agent.tools.api.SearchTorrentsUseCase")
def test_empty_query(self, mock_use_case_class, memory):
"""Should handle empty query."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "error",
"error": "invalid_query",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_torrent("")
assert result["status"] == "error"
@patch("alfred.agent.tools.api.SearchTorrentsUseCase")
def test_very_long_query(self, mock_use_case_class, memory):
"""Should handle very long query."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
long_query = "x" * 10000
result = api_tools.find_torrent(long_query)
# Should not crash
assert "status" in result
@patch("alfred.agent.tools.api.SearchTorrentsUseCase")
def test_special_characters_in_query(self, mock_use_case_class, memory):
"""Should handle special characters in query."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
special_query = "Movie (2024) [1080p] {x265} <HDR>"
result = api_tools.find_torrent(special_query)
assert "status" in result
@patch("alfred.agent.tools.api.SearchTorrentsUseCase")
def test_unicode_query(self, mock_use_case_class, memory):
"""Should handle unicode in query."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_torrent("日本語映画 2024")
assert "status" in result
@patch("alfred.agent.tools.api.SearchTorrentsUseCase")
def test_results_with_missing_fields(self, mock_use_case_class, memory):
"""Should handle results with missing fields."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"torrents": [
{"name": "Torrent 1"}, # Missing seeders, magnet, etc.
{}, # Completely empty
],
"count": 2,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_torrent("Test")
assert result["status"] == "ok"
mem = get_memory()
assert len(mem.episodic.last_search_results["results"]) == 2
@patch("alfred.agent.tools.api.SearchTorrentsUseCase")
def test_api_timeout(self, mock_use_case_class, memory):
"""Should handle API timeout."""
mock_use_case = Mock()
mock_use_case.execute.side_effect = TimeoutError("Connection timed out")
mock_use_case_class.return_value = mock_use_case
with pytest.raises(TimeoutError):
api_tools.find_torrent("Test")
class TestGetTorrentByIndexEdgeCases:
"""Edge case tests for get_torrent_by_index."""
def test_index_as_float(self, memory_with_search_results):
"""Should handle float index (converted to int)."""
# Python will convert 2.0 to 2 when passed as int
result = api_tools.get_torrent_by_index(int(2.9))
assert result["status"] == "ok"
assert result["torrent"]["index"] == 2
def test_results_modified_between_calls(self, memory):
"""Should handle results being modified."""
memory.episodic.store_search_results("query1", [{"name": "Result 1"}])
# Get first result
result1 = api_tools.get_torrent_by_index(1)
assert result1["status"] == "ok"
# Store new results
memory.episodic.store_search_results("query2", [{"name": "New Result"}])
# Get first result again - should be new result
result2 = api_tools.get_torrent_by_index(1)
assert result2["torrent"]["name"] == "New Result"
def test_result_with_index_already_set(self, memory):
"""Should handle results that already have index field."""
memory.episodic.store_search_results(
"query",
[{"name": "Result", "index": 999}], # Pre-existing index
)
result = api_tools.get_torrent_by_index(1)
# May overwrite or error depending on implementation
assert result["status"] in ["ok", "error"]
class TestAddTorrentEdgeCases:
"""Edge case tests for add_torrent functions."""
@patch("alfred.agent.tools.api.AddTorrentUseCase")
def test_invalid_magnet_link(self, mock_use_case_class, memory):
"""Should handle invalid magnet link."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "error",
"error": "invalid_magnet",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.add_torrent_to_qbittorrent("not a magnet link")
assert result["status"] == "error"
@patch("alfred.agent.tools.api.AddTorrentUseCase")
def test_empty_magnet_link(self, mock_use_case_class, memory):
"""Should handle empty magnet link."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "error",
"error": "empty_magnet",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.add_torrent_to_qbittorrent("")
assert result["status"] == "error"
@patch("alfred.agent.tools.api.AddTorrentUseCase")
def test_very_long_magnet_link(self, mock_use_case_class, memory):
"""Should handle very long magnet link."""
mock_response = Mock()
mock_response.to_dict.return_value = {"status": "ok"}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
long_magnet = "magnet:?xt=urn:btih:" + "a" * 10000
result = api_tools.add_torrent_to_qbittorrent(long_magnet)
assert "status" in result
@patch("alfred.agent.tools.api.AddTorrentUseCase")
def test_qbittorrent_connection_refused(self, mock_use_case_class, memory):
"""Should handle qBittorrent connection refused."""
mock_use_case = Mock()
mock_use_case.execute.side_effect = ConnectionRefusedError()
mock_use_case_class.return_value = mock_use_case
with pytest.raises(ConnectionRefusedError):
api_tools.add_torrent_to_qbittorrent("magnet:?xt=...")
def test_add_by_index_with_empty_magnet(self, memory):
"""Should handle torrent with empty magnet."""
memory.episodic.store_search_results(
"query",
[{"name": "Torrent", "magnet": ""}],
)
result = api_tools.add_torrent_by_index(1)
assert result["status"] == "error"
assert result["error"] == "no_magnet"
def test_add_by_index_with_whitespace_magnet(self, memory):
"""Should handle torrent with whitespace magnet."""
memory.episodic.store_search_results(
"query",
[{"name": "Torrent", "magnet": " "}],
)
result = api_tools.add_torrent_by_index(1)
# Whitespace-only magnet should be treated as no magnet
# Behavior depends on implementation
assert "status" in result
class TestFilesystemEdgeCases:
"""Edge case tests for filesystem tools."""
def test_set_path_with_trailing_slash(self, memory, real_folder):
"""Should handle path with trailing slash."""
path_with_slash = str(real_folder["downloads"]) + "/"
result = fs_tools.set_path_for_folder("download", path_with_slash)
assert result["status"] == "ok"
def test_set_path_with_double_slashes(self, memory, real_folder):
"""Should handle path with double slashes."""
path_double = str(real_folder["downloads"]).replace("/", "//")
result = fs_tools.set_path_for_folder("download", path_double)
# Should normalize and work
assert result["status"] == "ok"
def test_set_path_with_dot_segments(self, memory, real_folder):
"""Should handle path with . segments."""
path_with_dots = str(real_folder["downloads"]) + "/./."
result = fs_tools.set_path_for_folder("download", path_with_dots)
assert result["status"] == "ok"
def test_list_folder_with_hidden_files(self, memory, real_folder):
"""Should list hidden files."""
hidden_file = real_folder["downloads"] / ".hidden"
hidden_file.touch()
memory.ltm.workspace.download = str(real_folder["downloads"])
result = fs_tools.list_folder("download")
assert ".hidden" in result["entries"]
def test_list_folder_with_broken_symlink(self, memory, real_folder):
"""Should handle broken symlinks."""
broken_link = real_folder["downloads"] / "broken_link"
try:
broken_link.symlink_to("/nonexistent/target")
except OSError:
pytest.skip("Cannot create symlinks")
memory.ltm.workspace.download = str(real_folder["downloads"])
result = fs_tools.list_folder("download")
# Should still list the symlink
assert "broken_link" in result["entries"]
def test_list_folder_with_permission_denied_file(self, memory, real_folder):
"""Should handle files with no read permission."""
import os
no_read = real_folder["downloads"] / "no_read.txt"
no_read.touch()
try:
os.chmod(no_read, 0o000)
memory.ltm.workspace.download = str(real_folder["downloads"])
result = fs_tools.list_folder("download")
# Should still list the file (listing doesn't require read permission)
assert "no_read.txt" in result["entries"]
finally:
os.chmod(no_read, 0o644)
def test_list_folder_case_sensitivity(self, memory, real_folder):
"""Should handle case sensitivity correctly."""
memory.ltm.workspace.download = str(real_folder["downloads"])
# Try with different cases
result_lower = fs_tools.list_folder("download")
# Note: folder_type is validated, so "DOWNLOAD" would fail validation
assert result_lower["status"] == "ok"
def test_list_folder_with_spaces_in_path(self, memory, real_folder):
"""Should handle spaces in path."""
space_dir = real_folder["downloads"] / "folder with spaces"
space_dir.mkdir()
memory.ltm.workspace.download = str(real_folder["downloads"])
result = fs_tools.list_folder("download", "folder with spaces")
assert result["status"] == "ok"
def test_path_traversal_with_encoded_chars(self, memory, real_folder):
"""Should block URL-encoded traversal attempts."""
memory.ltm.workspace.download = str(real_folder["downloads"])
# Various encoding attempts
attempts = [
"..%2f",
"..%5c",
"%2e%2e/",
"..%252f",
]
for attempt in attempts:
result = fs_tools.list_folder("download", attempt)
# Should either be forbidden or not found
assert (
result.get("error") in ["forbidden", "not_found", None]
or result.get("status") == "ok"
)
def test_path_with_null_byte(self, memory, real_folder):
"""Should block null byte injection."""
memory.ltm.workspace.download = str(real_folder["downloads"])
result = fs_tools.list_folder("download", "file\x00.txt")
assert result["error"] == "forbidden"
def test_very_deep_path(self, memory, real_folder):
"""Should handle very deep paths."""
# Create deep directory structure
deep_path = real_folder["downloads"]
for i in range(20):
deep_path = deep_path / f"level{i}"
deep_path.mkdir(parents=True)
memory.ltm.workspace.download = str(real_folder["downloads"])
# Navigate to deep path
relative_path = "/".join([f"level{i}" for i in range(20)])
result = fs_tools.list_folder("download", relative_path)
assert result["status"] == "ok"
def test_folder_with_many_files(self, memory, real_folder):
"""Should handle folder with many files."""
# Create many files
for i in range(1000):
(real_folder["downloads"] / f"file_{i:04d}.txt").touch()
memory.ltm.workspace.download = str(real_folder["downloads"])
result = fs_tools.list_folder("download")
assert result["status"] == "ok"
assert result["count"] >= 1000
class TestFindMediaImdbIdEdgeCases:
"""Edge case tests for find_media_imdb_id."""
@patch("alfred.agent.tools.api.SearchMovieUseCase")
def test_movie_with_same_name_different_years(self, mock_use_case_class, memory):
"""Should handle movies with same name."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"imdb_id": "tt1234567",
"title": "The Thing",
"year": 1982,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_media_imdb_id("The Thing 1982")
assert result["status"] == "ok"
@patch("alfred.agent.tools.api.SearchMovieUseCase")
def test_movie_with_special_title(self, mock_use_case_class, memory):
"""Should handle movies with special characters in title."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"imdb_id": "tt1234567",
"title": "Se7en",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_media_imdb_id("Se7en")
assert result["status"] == "ok"
@patch("alfred.agent.tools.api.SearchMovieUseCase")
def test_tv_show_vs_movie(self, mock_use_case_class, memory):
"""Should distinguish TV shows from movies."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"imdb_id": "tt0944947",
"title": "Game of Thrones",
"media_type": "tv",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_media_imdb_id("Game of Thrones")
assert result["media_type"] == "tv"