"""Tests for the three-tier memory system. Covers the public API of the memory subsystem: - ``LongTermMemory`` — persistent, component-based (workspace, library_paths, media_preferences, subtitle_preferences, library, following). - ``ShortTermMemory`` — session-only conversation/workflow/entity state. - ``EpisodicMemory`` — volatile event-driven state (search results, downloads, errors, pending questions, background events). - ``Memory`` — unified manager (load/save LTM, clear session). - Context functions — ``init_memory`` / ``get_memory`` / ``has_memory`` / ``reset_memory``. These tests target the current component-based LTM (no legacy ``set_config`` / ``add_to_library`` / ``follow_show`` aliases) and assert on observable behavior, not implementation details. """ from datetime import datetime import pytest from alfred.infrastructure.persistence import ( EpisodicMemory, LongTermMemory, Memory, ShortTermMemory, get_memory, has_memory, init_memory, ) from alfred.infrastructure.persistence.context import reset_memory def _is_iso_timestamp(value: str) -> bool: """Return True if ``value`` parses as an ISO-8601 datetime.""" if not isinstance(value, str): return False try: datetime.fromisoformat(value.replace("Z", "+00:00")) return True except (ValueError, TypeError): return False # --------------------------------------------------------------------------- # LongTermMemory # --------------------------------------------------------------------------- class TestLongTermMemoryDefaults: """Default-state guarantees for a freshly constructed LTM.""" def test_workspace_paths_unset_by_default(self): ltm = LongTermMemory() assert ltm.workspace.download is None assert ltm.workspace.torrent is None assert ltm.workspace.trash is None def test_library_paths_empty_by_default(self): ltm = LongTermMemory() assert ltm.library_paths.folders == {} assert ltm.library_paths.get("movies") is None def test_media_preferences_defaults(self): ltm = LongTermMemory() assert ltm.media_preferences.quality == "1080p" assert "en" in ltm.media_preferences.audio_languages def test_following_empty_by_default(self): ltm = LongTermMemory() assert ltm.following.shows == [] def test_library_empty_by_default(self): ltm = LongTermMemory() assert ltm.library.movies == [] assert ltm.library.tv_shows == [] class TestLibraryPaths: """LibraryPaths.set / get on the LTM component.""" def test_set_and_get_roundtrip(self): ltm = LongTermMemory() ltm.library_paths.set("movies", "/media/movies") assert ltm.library_paths.get("movies") == "/media/movies" def test_unknown_collection_returns_none(self): ltm = LongTermMemory() assert ltm.library_paths.get("anything") is None def test_set_overwrites_existing_value(self): ltm = LongTermMemory() ltm.library_paths.set("movies", "/old/path") ltm.library_paths.set("movies", "/new/path") assert ltm.library_paths.get("movies") == "/new/path" class TestLibrary: """Library.add / get on the LTM component.""" def test_add_new_movie_is_recorded(self): ltm = LongTermMemory() ltm.library.add("movies", {"imdb_id": "tt1375666", "title": "Inception"}) movies = ltm.library.get("movies") assert len(movies) == 1 assert movies[0]["title"] == "Inception" assert _is_iso_timestamp(movies[0]["added_at"]) def test_add_is_idempotent_on_imdb_id(self): ltm = LongTermMemory() movie = {"imdb_id": "tt1375666", "title": "Inception"} ltm.library.add("movies", movie) ltm.library.add("movies", movie) assert len(ltm.library.get("movies")) == 1 def test_get_unknown_media_type_returns_empty_list(self): ltm = LongTermMemory() assert ltm.library.get("anything") == [] def test_add_unknown_media_type_is_a_no_op(self): ltm = LongTermMemory() ltm.library.add("podcasts", {"imdb_id": "x", "title": "y"}) # Nothing crashes; library state unchanged. assert ltm.library.movies == [] assert ltm.library.tv_shows == [] class TestFollowing: """Following.add on the LTM component.""" def test_add_show_records_timestamp(self): ltm = LongTermMemory() ltm.following.add({"imdb_id": "tt0944947", "title": "Game of Thrones"}) assert len(ltm.following.shows) == 1 assert ltm.following.shows[0]["title"] == "Game of Thrones" assert _is_iso_timestamp(ltm.following.shows[0]["followed_at"]) def test_add_is_idempotent_on_imdb_id(self): ltm = LongTermMemory() show = {"imdb_id": "tt0944947", "title": "Game of Thrones"} ltm.following.add(show) ltm.following.add(show) assert len(ltm.following.shows) == 1 class TestLongTermMemorySerialization: """to_dict / from_dict roundtrip and legacy migration.""" def test_roundtrip_preserves_state(self): ltm = LongTermMemory() ltm.workspace.download = "/downloads" ltm.library_paths.set("movies", "/media/movies") ltm.library.add("movies", {"imdb_id": "tt1", "title": "Movie"}) ltm.following.add({"imdb_id": "tt2", "title": "Show"}) restored = LongTermMemory.from_dict(ltm.to_dict()) assert restored.workspace.download == "/downloads" assert restored.library_paths.get("movies") == "/media/movies" assert restored.library.get("movies")[0]["title"] == "Movie" assert restored.following.shows[0]["title"] == "Show" def test_from_dict_handles_empty_dict(self): ltm = LongTermMemory.from_dict({}) assert ltm.workspace.download is None assert ltm.library_paths.folders == {} def test_from_dict_migrates_legacy_flat_workspace_keys(self): """Legacy snapshots had ``download_folder`` / ``torrent_folder`` at root.""" legacy = {"download_folder": "/dl", "torrent_folder": "/tt"} ltm = LongTermMemory.from_dict(legacy) assert ltm.workspace.download == "/dl" assert ltm.workspace.torrent == "/tt" # --------------------------------------------------------------------------- # ShortTermMemory # --------------------------------------------------------------------------- class TestShortTermMemory: """Conversation, workflow, entity, and language state.""" def test_default_values(self): stm = ShortTermMemory() assert stm.conversation_history == [] assert stm.current_workflow is None assert stm.extracted_entities == {} assert stm.current_topic is None assert stm.language == "en" def test_add_message_records_timestamp(self): stm = ShortTermMemory() stm.add_message("user", "Hello") history = stm.conversation_history assert len(history) == 1 assert history[0]["role"] == "user" assert history[0]["content"] == "Hello" assert _is_iso_timestamp(history[0]["timestamp"]) def test_get_recent_history_caps_at_n(self): stm = ShortTermMemory() for i in range(10): stm.add_message("user", f"Message {i}") assert len(stm.get_recent_history(3)) == 3 def test_set_language_overrides_default(self): stm = ShortTermMemory() stm.set_language("fr") assert stm.language == "fr" def test_clear_resets_volatile_state(self): stm = ShortTermMemory() stm.add_message("user", "Hello") stm.set_language("fr") stm.set_entity("title", "Inception") stm.clear() assert stm.conversation_history == [] assert stm.extracted_entities == {} # Language is volatile session-state too; clear() resets it. assert stm.language == "en" def test_entity_set_get_roundtrip(self): stm = ShortTermMemory() stm.set_entity("title", "Inception") assert stm.get_entity("title") == "Inception" assert stm.get_entity("missing") is None assert stm.get_entity("missing", "fallback") == "fallback" def test_workflow_lifecycle(self): stm = ShortTermMemory() assert stm.current_workflow is None stm.start_workflow("organize_media", {"release_name": "X"}) assert stm.current_workflow is not None assert stm.current_workflow["name"] == "organize_media" assert stm.current_workflow["params"] == {"release_name": "X"} stm.update_workflow_stage("moving") assert stm.current_workflow["stage"] == "moving" stm.end_workflow() assert stm.current_workflow is None # --------------------------------------------------------------------------- # EpisodicMemory # --------------------------------------------------------------------------- class TestEpisodicMemorySearchResults: """Search-result storage and 1-based index retrieval.""" def test_store_records_timestamp_and_query(self): ep = EpisodicMemory() ep.store_search_results("Inception", [{"name": "r1"}]) last = ep.last_search_results assert last["query"] == "Inception" assert _is_iso_timestamp(last["timestamp"]) def test_get_result_by_index_is_one_based(self): ep = EpisodicMemory() ep.store_search_results("q", [{"name": "first"}, {"name": "second"}]) assert ep.get_result_by_index(1)["name"] == "first" assert ep.get_result_by_index(2)["name"] == "second" def test_get_result_by_out_of_range_index_returns_none(self): ep = EpisodicMemory() ep.store_search_results("q", [{"name": "only"}]) assert ep.get_result_by_index(0) is None assert ep.get_result_by_index(99) is None def test_get_result_by_index_with_no_search_returns_none(self): assert EpisodicMemory().get_result_by_index(1) is None class TestEpisodicMemoryErrors: """Recent error log with capped retention.""" def test_add_error_records_timestamp(self): ep = EpisodicMemory() ep.add_error("find_torrent", "API timeout") errors = ep.recent_errors assert len(errors) == 1 assert errors[0]["action"] == "find_torrent" assert errors[0]["error"] == "API timeout" assert _is_iso_timestamp(errors[0]["timestamp"]) def test_recent_errors_keep_latest_only(self): """When more errors are added than the limit, the oldest are dropped.""" ep = EpisodicMemory() for i in range(60): # well over any sane retention ep.add_error("action", f"Error {i}") errors = ep.recent_errors # Whatever the cap, the latest entry must always survive. assert errors[-1]["error"] == "Error 59" class TestEpisodicMemoryDownloads: """Active download tracking.""" def test_complete_download_moves_record_out(self): ep = EpisodicMemory() ep.add_active_download({"task_id": "t1", "name": "X"}) completed = ep.complete_download("t1", "/library/X.mkv") assert completed is not None assert completed["file_path"] == "/library/X.mkv" assert ep.get_active_downloads() == [] def test_complete_unknown_download_returns_none(self): ep = EpisodicMemory() assert ep.complete_download("missing", "/x") is None class TestEpisodicMemoryPendingQuestion: """Single-slot pending question.""" def test_set_and_resolve(self): ep = EpisodicMemory() ep.set_pending_question( question="Which one?", options=[ {"index": 1, "label": "A"}, {"index": 2, "label": "B"}, ], context={}, ) assert ep.get_pending_question() is not None resolved = ep.resolve_pending_question(answer_index=1) assert resolved == {"index": 1, "label": "A"} assert ep.get_pending_question() is None def test_resolve_without_pending_question_returns_none(self): assert EpisodicMemory().resolve_pending_question(answer_index=1) is None # --------------------------------------------------------------------------- # Memory manager # --------------------------------------------------------------------------- class TestMemoryManager: """Memory orchestrator — disk I/O and session reset.""" def test_init_creates_storage_directory(self, temp_dir): storage = temp_dir / "memory_data" Memory(storage_dir=str(storage)) assert storage.exists() def test_save_persists_ltm_across_instances(self, temp_dir): memory = Memory(storage_dir=str(temp_dir)) memory.ltm.workspace.download = "/dl" memory.ltm.library_paths.set("movies", "/media/movies") memory.save() reloaded = Memory(storage_dir=str(temp_dir)) assert reloaded.ltm.workspace.download == "/dl" assert reloaded.ltm.library_paths.get("movies") == "/media/movies" def test_clear_session_preserves_ltm(self, memory): memory.ltm.library_paths.set("movies", "/media/movies") memory.stm.add_message("user", "Hello") memory.episodic.add_error("action", "boom") memory.clear_session() assert memory.ltm.library_paths.get("movies") == "/media/movies" assert memory.stm.conversation_history == [] assert memory.episodic.recent_errors == [] # --------------------------------------------------------------------------- # Global memory singleton # --------------------------------------------------------------------------- class TestMemoryContext: """Global ``init_memory`` / ``get_memory`` / ``has_memory`` accessors.""" def test_get_memory_without_init_raises(self): reset_memory() with pytest.raises(RuntimeError, match="Memory not initialized"): get_memory() def test_init_memory_then_get_memory_returns_same_instance(self, temp_dir): reset_memory() memory = init_memory(str(temp_dir)) assert has_memory() assert get_memory() is memory