chore: sprint cleanup — language unification, parser unification, fossils removal
Several weeks of work accumulated without being committed. Grouped here for clarity; see CHANGELOG.md [Unreleased] for the user-facing summary. Highlights ---------- P1 #2 — ISO 639-2/B canonical migration - New Language VO + LanguageRegistry (alfred/domain/shared/knowledge/). - iso_languages.yaml as single source of truth for language codes. - SubtitleKnowledgeBase now delegates lookup to LanguageRegistry; subtitles.yaml only declares subtitle-specific tokens (vostfr, vf, vff, …). - SubtitlePreferences default → ["fre", "eng"]; subtitle filenames written as {iso639_2b}.srt (legacy fr.srt still read via alias). - Scanner: dropped _LANG_KEYWORDS / _SDH_TOKENS / _FORCED_TOKENS / SUBTITLE_EXTENSIONS hardcoded dicts. - Fixed: 'hi' token no longer marks SDH (conflicted with Hindi alias). - Added settings.min_movie_size_bytes (was a module constant). P1 #3 — Release parser unification + data-driven tokenizer - parse_release() is now the single source of truth for release-name parsing. - alfred/knowledge/release/separators.yaml declares the token separators used by the tokenizer (., space, [, ], (, ), _). New conventions can be added without code changes. - Tokenizer now splits on any configured separator instead of name.split('.'). Releases like 'The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]' parse via the direct path without sanitization fallback. - Site-tag extraction always runs first; well-formedness only rejects truly forbidden chars. - _parse_season_episode() extended with NxNN / NxNNxNN alt forms. - Removed dead helpers: _sanitize, _normalize. Domain cleanup - Deleted fossil services with zero production callers: alfred/domain/movies/services.py alfred/domain/tv_shows/services.py alfred/domain/subtitles/services.py (replaced by subtitles/services/ package) alfred/domain/subtitles/repositories.py - Split monolithic subtitle services into a package (identifier, matcher, placer, pattern_detector, utils) + dedicated knowledge/ package. - MediaInfo split into dedicated package (alfred/domain/shared/media/: audio, video, subtitle, info, matching). Persistence cleanup - Removed dead JSON repositories (movie/subtitle/tvshow_repository.py). Tests - Major expansion of the test suite organized to mirror the source tree. - Removed obsolete *_edge_cases test files superseded by structured tests. - Suite: 990 passed, 8 skipped. Misc - .gitignore: exclude env_backup/ and *.bak. - Adjustments across agent/llm, app.py, application/filesystem, and infrastructure/filesystem to align with the new domain layout.
This commit is contained in:
+136
-106
@@ -1,4 +1,26 @@
|
||||
"""Tests for filesystem tools."""
|
||||
"""Tests for the filesystem agent tools (``alfred.agent.tools.filesystem``).
|
||||
|
||||
Three suites:
|
||||
|
||||
1. **TestSetPathForFolder** — Covers the ``set_path_for_folder`` tool, which
|
||||
routes ``download`` / ``torrent`` into ``LongTermMemory.workspace`` and any
|
||||
other folder name into ``LongTermMemory.library_paths``. Asserts on the
|
||||
returned status / error dict and on the actual mutation of the underlying
|
||||
memory.
|
||||
|
||||
2. **TestListFolder** — Covers ``list_folder``: success path, "folder not
|
||||
set" / "not found" / "not a directory" error codes, and the path-traversal
|
||||
defenses (`..`, absolute paths, encoded sequences).
|
||||
|
||||
3. **TestFileManagerSecurity** — Path-traversal and exotic-path defenses
|
||||
exercised through the same ``list_folder`` tool: null-byte injection,
|
||||
parent-escape, symlink-escape, special and unicode characters, oversize
|
||||
paths.
|
||||
|
||||
These tests target the *current* tool surface — there is no longer a
|
||||
``validation_failed`` error code for unknown folder names; unknown names are
|
||||
stored as library collections.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
@@ -7,73 +29,76 @@ import pytest
|
||||
from alfred.agent.tools import filesystem as fs_tools
|
||||
from alfred.infrastructure.persistence import get_memory
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# set_path_for_folder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSetPathForFolder:
|
||||
"""Tests for set_path_for_folder tool."""
|
||||
"""``set_path_for_folder`` writes to LTM workspace or library_paths."""
|
||||
|
||||
def test_success(self, memory, real_folder):
|
||||
"""Should set folder path successfully."""
|
||||
def test_success_returns_ok_status(self, memory, real_folder):
|
||||
result = fs_tools.set_path_for_folder("download", str(real_folder["downloads"]))
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert result["folder_name"] == "download"
|
||||
assert result["path"] == str(real_folder["downloads"])
|
||||
|
||||
def test_saves_to_ltm(self, memory, real_folder):
|
||||
"""Should save path to LTM config."""
|
||||
def test_download_persists_to_workspace(self, memory, real_folder):
|
||||
"""``download`` and ``torrent`` are workspace fields, not library entries."""
|
||||
fs_tools.set_path_for_folder("download", str(real_folder["downloads"]))
|
||||
|
||||
mem = get_memory()
|
||||
assert mem.ltm.download_folder == str(real_folder["downloads"])
|
||||
assert mem.ltm.workspace.download == str(real_folder["downloads"])
|
||||
# Should NOT have leaked into library_paths.
|
||||
assert mem.ltm.library_paths.get("download") is None
|
||||
|
||||
def test_all_folder_types(self, memory, real_folder):
|
||||
"""Should accept all valid folder types."""
|
||||
for folder_type in ["download", "movie", "tvshow", "torrent"]:
|
||||
result = fs_tools.set_path_for_folder(
|
||||
folder_type, str(real_folder["downloads"])
|
||||
)
|
||||
assert result["status"] == "ok"
|
||||
def test_torrent_persists_to_workspace(self, memory, real_folder):
|
||||
fs_tools.set_path_for_folder("torrent", str(real_folder["downloads"]))
|
||||
|
||||
def test_invalid_folder_type(self, memory, real_folder):
|
||||
"""Should reject invalid folder type."""
|
||||
result = fs_tools.set_path_for_folder("invalid", str(real_folder["downloads"]))
|
||||
mem = get_memory()
|
||||
assert mem.ltm.workspace.torrent == str(real_folder["downloads"])
|
||||
|
||||
assert result["error"] == "validation_failed"
|
||||
def test_library_collection_persists_to_library_paths(self, memory, real_folder):
|
||||
"""Any folder name other than download/torrent is a library collection."""
|
||||
fs_tools.set_path_for_folder("movies", str(real_folder["movies"]))
|
||||
|
||||
def test_path_not_exists(self, memory):
|
||||
"""Should reject non-existent path."""
|
||||
result = fs_tools.set_path_for_folder("download", "/nonexistent/path/12345")
|
||||
mem = get_memory()
|
||||
assert mem.ltm.library_paths.get("movies") == str(real_folder["movies"])
|
||||
|
||||
def test_path_not_exists_returns_invalid_path(self, memory):
|
||||
result = fs_tools.set_path_for_folder("download", "/nonexistent/12345/xyz")
|
||||
|
||||
assert result["status"] == "error"
|
||||
assert result["error"] == "invalid_path"
|
||||
assert "does not exist" in result["message"]
|
||||
|
||||
def test_path_is_file(self, memory, real_folder):
|
||||
"""Should reject file path."""
|
||||
def test_path_is_file_returns_invalid_path(self, memory, real_folder):
|
||||
file_path = real_folder["downloads"] / "test_movie.mkv"
|
||||
|
||||
result = fs_tools.set_path_for_folder("download", str(file_path))
|
||||
|
||||
assert result["status"] == "error"
|
||||
assert result["error"] == "invalid_path"
|
||||
assert "not a directory" in result["message"]
|
||||
|
||||
def test_resolves_path(self, memory, real_folder):
|
||||
"""Should resolve relative paths."""
|
||||
# Create a symlink or use relative path
|
||||
relative_path = real_folder["downloads"]
|
||||
def test_resolves_to_absolute_path(self, memory, real_folder):
|
||||
"""Whatever the input form, the stored path is absolute."""
|
||||
result = fs_tools.set_path_for_folder("download", str(real_folder["downloads"]))
|
||||
|
||||
result = fs_tools.set_path_for_folder("download", str(relative_path))
|
||||
|
||||
assert result["status"] == "ok"
|
||||
# Path should be absolute
|
||||
assert Path(result["path"]).is_absolute()
|
||||
|
||||
|
||||
class TestListFolder:
|
||||
"""Tests for list_folder tool."""
|
||||
# ---------------------------------------------------------------------------
|
||||
# list_folder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_success(self, memory, real_folder):
|
||||
"""Should list folder contents."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
|
||||
class TestListFolder:
|
||||
"""``list_folder`` enumerates entries under a configured folder."""
|
||||
|
||||
def test_lists_root_of_workspace_folder(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download")
|
||||
|
||||
@@ -82,73 +107,76 @@ class TestListFolder:
|
||||
assert "test_series" in result["entries"]
|
||||
assert result["count"] == 2
|
||||
|
||||
def test_subfolder(self, memory, real_folder):
|
||||
"""Should list subfolder contents."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_lists_subfolder(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "test_series")
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert "episode1.mkv" in result["entries"]
|
||||
|
||||
def test_folder_not_configured(self, memory):
|
||||
"""Should return error if folder not configured."""
|
||||
def test_lists_library_collection_folder(self, memory, real_folder):
|
||||
"""``list_folder`` also resolves arbitrary library collections."""
|
||||
memory.ltm.library_paths.set("movies", str(real_folder["movies"]))
|
||||
|
||||
result = fs_tools.list_folder("movies")
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert result["entries"] == []
|
||||
|
||||
def test_unconfigured_folder_returns_folder_not_set(self, memory):
|
||||
result = fs_tools.list_folder("download")
|
||||
|
||||
assert result["status"] == "error"
|
||||
assert result["error"] == "folder_not_set"
|
||||
|
||||
def test_invalid_folder_type(self, memory):
|
||||
"""Should reject invalid folder type."""
|
||||
result = fs_tools.list_folder("invalid")
|
||||
def test_unknown_folder_type_returns_folder_not_set(self, memory):
|
||||
"""Unknown library collections also surface as ``folder_not_set``."""
|
||||
result = fs_tools.list_folder("anything_unconfigured")
|
||||
|
||||
assert result["error"] == "validation_failed"
|
||||
assert result["status"] == "error"
|
||||
assert result["error"] == "folder_not_set"
|
||||
|
||||
def test_path_traversal_dotdot(self, memory, real_folder):
|
||||
"""Should block path traversal with .."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_path_traversal_dotdot_is_forbidden(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "../")
|
||||
|
||||
assert result["error"] == "forbidden"
|
||||
|
||||
def test_path_traversal_absolute(self, memory, real_folder):
|
||||
"""Should block absolute paths."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_absolute_path_is_forbidden(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "/etc/passwd")
|
||||
|
||||
assert result["error"] == "forbidden"
|
||||
|
||||
def test_path_traversal_encoded(self, memory, real_folder):
|
||||
"""Should block encoded traversal attempts."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_encoded_traversal_is_blocked(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "..%2F..%2Fetc")
|
||||
|
||||
# Should either be forbidden or not found (depending on normalization)
|
||||
assert result.get("error") in ["forbidden", "not_found"]
|
||||
# The sanitizer must not let a URL-encoded ".." escape; the path is
|
||||
# either rejected outright or simply not found inside the root.
|
||||
assert result["error"] in {"forbidden", "not_found"}
|
||||
|
||||
def test_path_not_exists(self, memory, real_folder):
|
||||
"""Should return error for non-existent path."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_missing_relative_path_returns_not_found(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "nonexistent_folder")
|
||||
result = fs_tools.list_folder("download", "missing_subfolder")
|
||||
|
||||
assert result["error"] == "not_found"
|
||||
|
||||
def test_path_is_file(self, memory, real_folder):
|
||||
"""Should return error if path is a file."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_path_is_file_returns_not_a_directory(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "test_movie.mkv")
|
||||
|
||||
assert result["error"] == "not_a_directory"
|
||||
|
||||
def test_empty_folder(self, memory, real_folder):
|
||||
"""Should handle empty folder."""
|
||||
empty_dir = real_folder["downloads"] / "empty"
|
||||
empty_dir.mkdir()
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
(real_folder["downloads"] / "empty").mkdir()
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "empty")
|
||||
|
||||
@@ -156,85 +184,87 @@ class TestListFolder:
|
||||
assert result["entries"] == []
|
||||
assert result["count"] == 0
|
||||
|
||||
def test_sorted_entries(self, memory, real_folder):
|
||||
"""Should return sorted entries."""
|
||||
# Create files with different names
|
||||
def test_entries_are_sorted(self, memory, real_folder):
|
||||
(real_folder["downloads"] / "zebra.txt").touch()
|
||||
(real_folder["downloads"] / "alpha.txt").touch()
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download")
|
||||
|
||||
assert result["status"] == "ok"
|
||||
# Check that entries are sorted
|
||||
entries = result["entries"]
|
||||
assert entries == sorted(entries)
|
||||
assert result["entries"] == sorted(result["entries"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Security — path traversal and exotic-path defenses
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFileManagerSecurity:
|
||||
"""Security-focused tests for FileManager."""
|
||||
"""Defenses against path-traversal and exotic-path inputs.
|
||||
|
||||
def test_null_byte_injection(self, memory, real_folder):
|
||||
"""Should block null byte injection."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
Exercised via ``list_folder`` because it is the public surface — the
|
||||
underlying ``_sanitize_path`` / ``_is_safe_path`` logic is the same for
|
||||
other read operations.
|
||||
"""
|
||||
|
||||
def test_null_byte_injection_is_forbidden(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "test\x00.txt")
|
||||
|
||||
assert result["error"] == "forbidden"
|
||||
|
||||
def test_path_outside_root(self, memory, real_folder):
|
||||
"""Should block paths that escape root."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_path_escape_via_dotdot_chain_is_forbidden(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
# Try to access parent directory
|
||||
result = fs_tools.list_folder("download", "test_series/../../")
|
||||
|
||||
assert result["error"] == "forbidden"
|
||||
|
||||
def test_symlink_escape(self, memory, real_folder):
|
||||
"""Should handle symlinks that point outside root."""
|
||||
# Create a symlink pointing outside
|
||||
def test_symlink_does_not_crash(self, memory, real_folder):
|
||||
"""A symlink pointing outside the root must never crash the tool.
|
||||
|
||||
The policy here is implementation-defined (forbid, follow, or
|
||||
report ``not_found``). The contract is just: don't raise.
|
||||
"""
|
||||
symlink = real_folder["downloads"] / "escape_link"
|
||||
try:
|
||||
symlink.symlink_to("/tmp")
|
||||
except OSError:
|
||||
pytest.skip("Cannot create symlinks")
|
||||
pytest.skip("Filesystem does not support symlinks")
|
||||
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "escape_link")
|
||||
|
||||
# Should either be forbidden or work (depending on policy)
|
||||
# The important thing is it doesn't crash
|
||||
assert "error" in result or "status" in result
|
||||
# Either an error code or a normal listing is acceptable; what
|
||||
# matters is that we got a dict back instead of an exception.
|
||||
assert isinstance(result, dict)
|
||||
assert "status" in result
|
||||
|
||||
def test_special_characters_in_path(self, memory, real_folder):
|
||||
"""Should handle special characters in path."""
|
||||
special_dir = real_folder["downloads"] / "special !@#$%"
|
||||
special_dir.mkdir()
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_special_characters_in_subfolder_name(self, memory, real_folder):
|
||||
special = real_folder["downloads"] / "special !@#$%"
|
||||
special.mkdir()
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "special !@#$%")
|
||||
|
||||
assert result["status"] == "ok"
|
||||
|
||||
def test_unicode_path(self, memory, real_folder):
|
||||
"""Should handle unicode in path."""
|
||||
def test_unicode_subfolder_name(self, memory, real_folder):
|
||||
unicode_dir = real_folder["downloads"] / "日本語フォルダ"
|
||||
unicode_dir.mkdir()
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
result = fs_tools.list_folder("download", "日本語フォルダ")
|
||||
|
||||
assert result["status"] == "ok"
|
||||
|
||||
def test_very_long_path(self, memory, real_folder):
|
||||
"""Should handle very long paths gracefully."""
|
||||
memory.ltm.download_folder = str(real_folder["downloads"])
|
||||
def test_oversize_path_does_not_crash(self, memory, real_folder):
|
||||
memory.ltm.workspace.download = str(real_folder["downloads"])
|
||||
|
||||
long_path = "a" * 1000
|
||||
result = fs_tools.list_folder("download", "a" * 1000)
|
||||
|
||||
result = fs_tools.list_folder("download", long_path)
|
||||
|
||||
# Should return an error, not crash
|
||||
# Must surface as an error of some kind, not as a stack trace.
|
||||
assert result["status"] == "error"
|
||||
assert "error" in result
|
||||
|
||||
Reference in New Issue
Block a user