Files
alfred/tests/test_memory.py
T
francwa e07c9ec77b chore: sprint cleanup — language unification, parser unification, fossils removal
Several weeks of work accumulated without being committed. Grouped here for
clarity; see CHANGELOG.md [Unreleased] for the user-facing summary.

Highlights
----------

P1 #2 — ISO 639-2/B canonical migration
- New Language VO + LanguageRegistry (alfred/domain/shared/knowledge/).
- iso_languages.yaml as single source of truth for language codes.
- SubtitleKnowledgeBase now delegates lookup to LanguageRegistry; subtitles.yaml
  only declares subtitle-specific tokens (vostfr, vf, vff, …).
- SubtitlePreferences default → ["fre", "eng"]; subtitle filenames written as
  {iso639_2b}.srt (legacy fr.srt still read via alias).
- Scanner: dropped _LANG_KEYWORDS / _SDH_TOKENS / _FORCED_TOKENS /
  SUBTITLE_EXTENSIONS hardcoded dicts.
- Fixed: 'hi' token no longer marks SDH (conflicted with Hindi alias).
- Added settings.min_movie_size_bytes (was a module constant).

P1 #3 — Release parser unification + data-driven tokenizer
- parse_release() is now the single source of truth for release-name parsing.
- alfred/knowledge/release/separators.yaml declares the token separators used
  by the tokenizer (., space, [, ], (, ), _). New conventions can be added
  without code changes.
- Tokenizer now splits on any configured separator instead of name.split('.').
  Releases like 'The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]' parse via
  the direct path without sanitization fallback.
- Site-tag extraction always runs first; well-formedness only rejects truly
  forbidden chars.
- _parse_season_episode() extended with NxNN / NxNNxNN alt forms.
- Removed dead helpers: _sanitize, _normalize.

Domain cleanup
- Deleted fossil services with zero production callers:
    alfred/domain/movies/services.py
    alfred/domain/tv_shows/services.py
    alfred/domain/subtitles/services.py (replaced by subtitles/services/ package)
    alfred/domain/subtitles/repositories.py
- Split monolithic subtitle services into a package (identifier, matcher,
  placer, pattern_detector, utils) + dedicated knowledge/ package.
- MediaInfo split into dedicated package (alfred/domain/shared/media/:
  audio, video, subtitle, info, matching).

Persistence cleanup
- Removed dead JSON repositories (movie/subtitle/tvshow_repository.py).

Tests
- Major expansion of the test suite organized to mirror the source tree.
- Removed obsolete *_edge_cases test files superseded by structured tests.
- Suite: 990 passed, 8 skipped.

Misc
- .gitignore: exclude env_backup/ and *.bak.
- Adjustments across agent/llm, app.py, application/filesystem, and
  infrastructure/filesystem to align with the new domain layout.
2026-05-17 23:38:00 +02:00

395 lines
14 KiB
Python

"""Tests for the three-tier memory system.
Covers the public API of the memory subsystem:
- ``LongTermMemory`` — persistent, component-based (workspace, library_paths,
media_preferences, subtitle_preferences, library, following).
- ``ShortTermMemory`` — session-only conversation/workflow/entity state.
- ``EpisodicMemory`` — volatile event-driven state (search results, downloads,
errors, pending questions, background events).
- ``Memory`` — unified manager (load/save LTM, clear session).
- Context functions — ``init_memory`` / ``get_memory`` / ``has_memory`` /
``reset_memory``.
These tests target the current component-based LTM (no legacy ``set_config`` /
``add_to_library`` / ``follow_show`` aliases) and assert on observable
behavior, not implementation details.
"""
from datetime import datetime
import pytest
from alfred.infrastructure.persistence import (
EpisodicMemory,
LongTermMemory,
Memory,
ShortTermMemory,
get_memory,
has_memory,
init_memory,
)
from alfred.infrastructure.persistence.context import reset_memory
def _is_iso_timestamp(value: str) -> bool:
"""Return True if ``value`` parses as an ISO-8601 datetime."""
if not isinstance(value, str):
return False
try:
datetime.fromisoformat(value.replace("Z", "+00:00"))
return True
except (ValueError, TypeError):
return False
# ---------------------------------------------------------------------------
# LongTermMemory
# ---------------------------------------------------------------------------
class TestLongTermMemoryDefaults:
"""Default-state guarantees for a freshly constructed LTM."""
def test_workspace_paths_unset_by_default(self):
ltm = LongTermMemory()
assert ltm.workspace.download is None
assert ltm.workspace.torrent is None
assert ltm.workspace.trash is None
def test_library_paths_empty_by_default(self):
ltm = LongTermMemory()
assert ltm.library_paths.folders == {}
assert ltm.library_paths.get("movies") is None
def test_media_preferences_defaults(self):
ltm = LongTermMemory()
assert ltm.media_preferences.quality == "1080p"
assert "en" in ltm.media_preferences.audio_languages
def test_following_empty_by_default(self):
ltm = LongTermMemory()
assert ltm.following.shows == []
def test_library_empty_by_default(self):
ltm = LongTermMemory()
assert ltm.library.movies == []
assert ltm.library.tv_shows == []
class TestLibraryPaths:
"""LibraryPaths.set / get on the LTM component."""
def test_set_and_get_roundtrip(self):
ltm = LongTermMemory()
ltm.library_paths.set("movies", "/media/movies")
assert ltm.library_paths.get("movies") == "/media/movies"
def test_unknown_collection_returns_none(self):
ltm = LongTermMemory()
assert ltm.library_paths.get("anything") is None
def test_set_overwrites_existing_value(self):
ltm = LongTermMemory()
ltm.library_paths.set("movies", "/old/path")
ltm.library_paths.set("movies", "/new/path")
assert ltm.library_paths.get("movies") == "/new/path"
class TestLibrary:
"""Library.add / get on the LTM component."""
def test_add_new_movie_is_recorded(self):
ltm = LongTermMemory()
ltm.library.add("movies", {"imdb_id": "tt1375666", "title": "Inception"})
movies = ltm.library.get("movies")
assert len(movies) == 1
assert movies[0]["title"] == "Inception"
assert _is_iso_timestamp(movies[0]["added_at"])
def test_add_is_idempotent_on_imdb_id(self):
ltm = LongTermMemory()
movie = {"imdb_id": "tt1375666", "title": "Inception"}
ltm.library.add("movies", movie)
ltm.library.add("movies", movie)
assert len(ltm.library.get("movies")) == 1
def test_get_unknown_media_type_returns_empty_list(self):
ltm = LongTermMemory()
assert ltm.library.get("anything") == []
def test_add_unknown_media_type_is_a_no_op(self):
ltm = LongTermMemory()
ltm.library.add("podcasts", {"imdb_id": "x", "title": "y"})
# Nothing crashes; library state unchanged.
assert ltm.library.movies == []
assert ltm.library.tv_shows == []
class TestFollowing:
"""Following.add on the LTM component."""
def test_add_show_records_timestamp(self):
ltm = LongTermMemory()
ltm.following.add({"imdb_id": "tt0944947", "title": "Game of Thrones"})
assert len(ltm.following.shows) == 1
assert ltm.following.shows[0]["title"] == "Game of Thrones"
assert _is_iso_timestamp(ltm.following.shows[0]["followed_at"])
def test_add_is_idempotent_on_imdb_id(self):
ltm = LongTermMemory()
show = {"imdb_id": "tt0944947", "title": "Game of Thrones"}
ltm.following.add(show)
ltm.following.add(show)
assert len(ltm.following.shows) == 1
class TestLongTermMemorySerialization:
"""to_dict / from_dict roundtrip and legacy migration."""
def test_roundtrip_preserves_state(self):
ltm = LongTermMemory()
ltm.workspace.download = "/downloads"
ltm.library_paths.set("movies", "/media/movies")
ltm.library.add("movies", {"imdb_id": "tt1", "title": "Movie"})
ltm.following.add({"imdb_id": "tt2", "title": "Show"})
restored = LongTermMemory.from_dict(ltm.to_dict())
assert restored.workspace.download == "/downloads"
assert restored.library_paths.get("movies") == "/media/movies"
assert restored.library.get("movies")[0]["title"] == "Movie"
assert restored.following.shows[0]["title"] == "Show"
def test_from_dict_handles_empty_dict(self):
ltm = LongTermMemory.from_dict({})
assert ltm.workspace.download is None
assert ltm.library_paths.folders == {}
def test_from_dict_migrates_legacy_flat_workspace_keys(self):
"""Legacy snapshots had ``download_folder`` / ``torrent_folder`` at root."""
legacy = {"download_folder": "/dl", "torrent_folder": "/tt"}
ltm = LongTermMemory.from_dict(legacy)
assert ltm.workspace.download == "/dl"
assert ltm.workspace.torrent == "/tt"
# ---------------------------------------------------------------------------
# ShortTermMemory
# ---------------------------------------------------------------------------
class TestShortTermMemory:
"""Conversation, workflow, entity, and language state."""
def test_default_values(self):
stm = ShortTermMemory()
assert stm.conversation_history == []
assert stm.current_workflow is None
assert stm.extracted_entities == {}
assert stm.current_topic is None
assert stm.language == "en"
def test_add_message_records_timestamp(self):
stm = ShortTermMemory()
stm.add_message("user", "Hello")
history = stm.conversation_history
assert len(history) == 1
assert history[0]["role"] == "user"
assert history[0]["content"] == "Hello"
assert _is_iso_timestamp(history[0]["timestamp"])
def test_get_recent_history_caps_at_n(self):
stm = ShortTermMemory()
for i in range(10):
stm.add_message("user", f"Message {i}")
assert len(stm.get_recent_history(3)) == 3
def test_set_language_overrides_default(self):
stm = ShortTermMemory()
stm.set_language("fr")
assert stm.language == "fr"
def test_clear_resets_volatile_state(self):
stm = ShortTermMemory()
stm.add_message("user", "Hello")
stm.set_language("fr")
stm.set_entity("title", "Inception")
stm.clear()
assert stm.conversation_history == []
assert stm.extracted_entities == {}
# Language is volatile session-state too; clear() resets it.
assert stm.language == "en"
def test_entity_set_get_roundtrip(self):
stm = ShortTermMemory()
stm.set_entity("title", "Inception")
assert stm.get_entity("title") == "Inception"
assert stm.get_entity("missing") is None
assert stm.get_entity("missing", "fallback") == "fallback"
def test_workflow_lifecycle(self):
stm = ShortTermMemory()
assert stm.current_workflow is None
stm.start_workflow("organize_media", {"release_name": "X"})
assert stm.current_workflow is not None
assert stm.current_workflow["name"] == "organize_media"
assert stm.current_workflow["params"] == {"release_name": "X"}
stm.update_workflow_stage("moving")
assert stm.current_workflow["stage"] == "moving"
stm.end_workflow()
assert stm.current_workflow is None
# ---------------------------------------------------------------------------
# EpisodicMemory
# ---------------------------------------------------------------------------
class TestEpisodicMemorySearchResults:
"""Search-result storage and 1-based index retrieval."""
def test_store_records_timestamp_and_query(self):
ep = EpisodicMemory()
ep.store_search_results("Inception", [{"name": "r1"}])
last = ep.last_search_results
assert last["query"] == "Inception"
assert _is_iso_timestamp(last["timestamp"])
def test_get_result_by_index_is_one_based(self):
ep = EpisodicMemory()
ep.store_search_results("q", [{"name": "first"}, {"name": "second"}])
assert ep.get_result_by_index(1)["name"] == "first"
assert ep.get_result_by_index(2)["name"] == "second"
def test_get_result_by_out_of_range_index_returns_none(self):
ep = EpisodicMemory()
ep.store_search_results("q", [{"name": "only"}])
assert ep.get_result_by_index(0) is None
assert ep.get_result_by_index(99) is None
def test_get_result_by_index_with_no_search_returns_none(self):
assert EpisodicMemory().get_result_by_index(1) is None
class TestEpisodicMemoryErrors:
"""Recent error log with capped retention."""
def test_add_error_records_timestamp(self):
ep = EpisodicMemory()
ep.add_error("find_torrent", "API timeout")
errors = ep.recent_errors
assert len(errors) == 1
assert errors[0]["action"] == "find_torrent"
assert errors[0]["error"] == "API timeout"
assert _is_iso_timestamp(errors[0]["timestamp"])
def test_recent_errors_keep_latest_only(self):
"""When more errors are added than the limit, the oldest are dropped."""
ep = EpisodicMemory()
for i in range(60): # well over any sane retention
ep.add_error("action", f"Error {i}")
errors = ep.recent_errors
# Whatever the cap, the latest entry must always survive.
assert errors[-1]["error"] == "Error 59"
class TestEpisodicMemoryDownloads:
"""Active download tracking."""
def test_complete_download_moves_record_out(self):
ep = EpisodicMemory()
ep.add_active_download({"task_id": "t1", "name": "X"})
completed = ep.complete_download("t1", "/library/X.mkv")
assert completed is not None
assert completed["file_path"] == "/library/X.mkv"
assert ep.get_active_downloads() == []
def test_complete_unknown_download_returns_none(self):
ep = EpisodicMemory()
assert ep.complete_download("missing", "/x") is None
class TestEpisodicMemoryPendingQuestion:
"""Single-slot pending question."""
def test_set_and_resolve(self):
ep = EpisodicMemory()
ep.set_pending_question(
question="Which one?",
options=[
{"index": 1, "label": "A"},
{"index": 2, "label": "B"},
],
context={},
)
assert ep.get_pending_question() is not None
resolved = ep.resolve_pending_question(answer_index=1)
assert resolved == {"index": 1, "label": "A"}
assert ep.get_pending_question() is None
def test_resolve_without_pending_question_returns_none(self):
assert EpisodicMemory().resolve_pending_question(answer_index=1) is None
# ---------------------------------------------------------------------------
# Memory manager
# ---------------------------------------------------------------------------
class TestMemoryManager:
"""Memory orchestrator — disk I/O and session reset."""
def test_init_creates_storage_directory(self, temp_dir):
storage = temp_dir / "memory_data"
Memory(storage_dir=str(storage))
assert storage.exists()
def test_save_persists_ltm_across_instances(self, temp_dir):
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.workspace.download = "/dl"
memory.ltm.library_paths.set("movies", "/media/movies")
memory.save()
reloaded = Memory(storage_dir=str(temp_dir))
assert reloaded.ltm.workspace.download == "/dl"
assert reloaded.ltm.library_paths.get("movies") == "/media/movies"
def test_clear_session_preserves_ltm(self, memory):
memory.ltm.library_paths.set("movies", "/media/movies")
memory.stm.add_message("user", "Hello")
memory.episodic.add_error("action", "boom")
memory.clear_session()
assert memory.ltm.library_paths.get("movies") == "/media/movies"
assert memory.stm.conversation_history == []
assert memory.episodic.recent_errors == []
# ---------------------------------------------------------------------------
# Global memory singleton
# ---------------------------------------------------------------------------
class TestMemoryContext:
"""Global ``init_memory`` / ``get_memory`` / ``has_memory`` accessors."""
def test_get_memory_without_init_raises(self):
reset_memory()
with pytest.raises(RuntimeError, match="Memory not initialized"):
get_memory()
def test_init_memory_then_get_memory_returns_same_instance(self, temp_dir):
reset_memory()
memory = init_memory(str(temp_dir))
assert has_memory()
assert get_memory() is memory