e07c9ec77b
Several weeks of work accumulated without being committed. Grouped here for clarity; see CHANGELOG.md [Unreleased] for the user-facing summary. Highlights ---------- P1 #2 — ISO 639-2/B canonical migration - New Language VO + LanguageRegistry (alfred/domain/shared/knowledge/). - iso_languages.yaml as single source of truth for language codes. - SubtitleKnowledgeBase now delegates lookup to LanguageRegistry; subtitles.yaml only declares subtitle-specific tokens (vostfr, vf, vff, …). - SubtitlePreferences default → ["fre", "eng"]; subtitle filenames written as {iso639_2b}.srt (legacy fr.srt still read via alias). - Scanner: dropped _LANG_KEYWORDS / _SDH_TOKENS / _FORCED_TOKENS / SUBTITLE_EXTENSIONS hardcoded dicts. - Fixed: 'hi' token no longer marks SDH (conflicted with Hindi alias). - Added settings.min_movie_size_bytes (was a module constant). P1 #3 — Release parser unification + data-driven tokenizer - parse_release() is now the single source of truth for release-name parsing. - alfred/knowledge/release/separators.yaml declares the token separators used by the tokenizer (., space, [, ], (, ), _). New conventions can be added without code changes. - Tokenizer now splits on any configured separator instead of name.split('.'). Releases like 'The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]' parse via the direct path without sanitization fallback. - Site-tag extraction always runs first; well-formedness only rejects truly forbidden chars. - _parse_season_episode() extended with NxNN / NxNNxNN alt forms. - Removed dead helpers: _sanitize, _normalize. Domain cleanup - Deleted fossil services with zero production callers: alfred/domain/movies/services.py alfred/domain/tv_shows/services.py alfred/domain/subtitles/services.py (replaced by subtitles/services/ package) alfred/domain/subtitles/repositories.py - Split monolithic subtitle services into a package (identifier, matcher, placer, pattern_detector, utils) + dedicated knowledge/ package. - MediaInfo split into dedicated package (alfred/domain/shared/media/: audio, video, subtitle, info, matching). Persistence cleanup - Removed dead JSON repositories (movie/subtitle/tvshow_repository.py). Tests - Major expansion of the test suite organized to mirror the source tree. - Removed obsolete *_edge_cases test files superseded by structured tests. - Suite: 990 passed, 8 skipped. Misc - .gitignore: exclude env_backup/ and *.bak. - Adjustments across agent/llm, app.py, application/filesystem, and infrastructure/filesystem to align with the new domain layout.
303 lines
11 KiB
Python
303 lines
11 KiB
Python
"""Tests for ``alfred.agent.agent.Agent`` — the LLM orchestration layer.
|
|
|
|
Covers the public agent surface used by the FastAPI handlers:
|
|
|
|
- **Construction** — ``Agent(settings, llm, max_tool_iterations)`` wires the
|
|
prompt builder, the tool registry, and the in-memory tool catalogue.
|
|
- **Tool execution** — ``_execute_tool_call`` parses an OpenAI-shaped
|
|
tool-call dict, validates the tool exists and is in scope for the current
|
|
workflow, executes it, and surfaces errors as structured dicts.
|
|
- **Step loop** — ``step(user_input)`` records the user message, builds the
|
|
system prompt, runs the LLM/tool loop up to ``max_tool_iterations``, and
|
|
returns the final assistant text.
|
|
|
|
These tests use the current component-based LTM API
|
|
(``memory.ltm.workspace.download``, ``memory.ltm.library_paths.set(...)``).
|
|
The legacy flat attributes (``download_folder``, ``movie_folder``, …) no
|
|
longer exist.
|
|
"""
|
|
|
|
from unittest.mock import Mock
|
|
|
|
from alfred.agent.agent import Agent
|
|
from alfred.infrastructure.persistence import get_memory
|
|
|
|
|
|
class TestAgentInit:
|
|
"""Tests for Agent initialization."""
|
|
|
|
def test_init(self, memory, mock_settings, mock_llm):
|
|
"""Should initialize agent with LLM."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm, max_tool_iterations=10)
|
|
|
|
assert agent.llm is mock_llm
|
|
assert agent.tools is not None
|
|
assert agent.prompt_builder is not None
|
|
assert agent.max_tool_iterations == 10
|
|
|
|
def test_init_custom_iterations(self, memory, mock_settings, mock_llm):
|
|
"""Should accept custom max iterations."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm, max_tool_iterations=10)
|
|
|
|
assert agent.max_tool_iterations == 10
|
|
|
|
def test_tools_registered(self, memory, mock_settings, mock_llm):
|
|
"""Should register all tools."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
expected_tools = [
|
|
"set_path_for_folder",
|
|
"list_folder",
|
|
"find_media_imdb_id",
|
|
"find_torrent",
|
|
"add_torrent_by_index",
|
|
"add_torrent_to_qbittorrent",
|
|
"get_torrent_by_index",
|
|
"set_language",
|
|
]
|
|
|
|
for tool_name in expected_tools:
|
|
assert tool_name in agent.tools
|
|
|
|
|
|
class TestExecuteToolCall:
|
|
"""Tests for _execute_tool_call method."""
|
|
|
|
def test_execute_known_tool(self, memory, mock_settings, mock_llm, real_folder):
|
|
"""Should execute known tool."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
memory.ltm.workspace.download = str(real_folder["downloads"])
|
|
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {
|
|
"name": "list_folder",
|
|
"arguments": '{"folder_type": "download"}',
|
|
},
|
|
}
|
|
result = agent._execute_tool_call(tool_call)
|
|
|
|
assert result["status"] == "ok"
|
|
|
|
def test_execute_unknown_tool(self, memory, mock_settings, mock_llm):
|
|
"""Should return error for unknown tool."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {"name": "unknown_tool", "arguments": "{}"},
|
|
}
|
|
result = agent._execute_tool_call(tool_call)
|
|
|
|
assert result["error"] == "unknown_tool"
|
|
assert "available_tools" in result
|
|
|
|
def test_execute_with_bad_args(self, memory, mock_settings, mock_llm):
|
|
"""Should return error for bad arguments."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {"name": "set_path_for_folder", "arguments": "{}"},
|
|
}
|
|
result = agent._execute_tool_call(tool_call)
|
|
|
|
assert result["error"] == "bad_args"
|
|
|
|
def test_execute_tracks_errors(self, memory, mock_settings, mock_llm):
|
|
"""Should track errors in episodic memory."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
# Use invalid arguments to trigger a TypeError
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {
|
|
"name": "set_path_for_folder",
|
|
"arguments": '{"folder_name": 123}', # Wrong type
|
|
},
|
|
}
|
|
agent._execute_tool_call(tool_call)
|
|
|
|
mem = get_memory()
|
|
assert len(mem.episodic.recent_errors) > 0
|
|
|
|
def test_execute_with_invalid_json(self, memory, mock_settings, mock_llm):
|
|
"""Should handle invalid JSON arguments."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {"name": "list_folder", "arguments": "{invalid json}"},
|
|
}
|
|
result = agent._execute_tool_call(tool_call)
|
|
|
|
assert result["error"] == "bad_args"
|
|
|
|
|
|
class TestStep:
|
|
"""Tests for step method."""
|
|
|
|
def test_step_text_response(self, memory, mock_settings, mock_llm):
|
|
"""Should return text response when no tool call."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
response = agent.step("Hello")
|
|
|
|
assert response == "I found what you're looking for!"
|
|
|
|
def test_step_saves_to_history(self, memory, mock_settings, mock_llm):
|
|
"""Should save conversation to STM history."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
agent.step("Hi there")
|
|
|
|
mem = get_memory()
|
|
history = mem.stm.get_recent_history(10)
|
|
assert len(history) == 2
|
|
assert history[0]["role"] == "user"
|
|
assert history[0]["content"] == "Hi there"
|
|
assert history[1]["role"] == "assistant"
|
|
|
|
def test_step_with_tool_call(
|
|
self, memory, mock_settings, mock_llm_with_tool_call, real_folder
|
|
):
|
|
"""Should execute tool and continue."""
|
|
memory.ltm.workspace.download = str(real_folder["downloads"])
|
|
|
|
agent = Agent(settings=mock_settings, llm=mock_llm_with_tool_call)
|
|
|
|
response = agent.step("List my downloads")
|
|
|
|
assert "found" in response.lower() or "torrent" in response.lower()
|
|
assert mock_llm_with_tool_call.complete.call_count == 2
|
|
|
|
# CRITICAL: Verify tools were passed to LLM
|
|
first_call_args = mock_llm_with_tool_call.complete.call_args_list[0]
|
|
assert first_call_args[1]["tools"] is not None, "Tools not passed to LLM!"
|
|
assert len(first_call_args[1]["tools"]) > 0, "Tools list is empty!"
|
|
|
|
def test_step_max_iterations(self, memory, mock_settings, mock_llm):
|
|
"""Should stop after max iterations."""
|
|
call_count = [0]
|
|
|
|
def mock_complete(messages, tools=None):
|
|
call_count[0] += 1
|
|
# CRITICAL: Verify tools are passed (except on forced final call)
|
|
if call_count[0] <= 3:
|
|
assert tools is not None, f"Tools not passed on call {call_count[0]}!"
|
|
|
|
if call_count[0] <= 3:
|
|
return {
|
|
"role": "assistant",
|
|
"content": None,
|
|
"tool_calls": [
|
|
{
|
|
"id": f"call_{call_count[0]}",
|
|
"function": {
|
|
"name": "list_folder",
|
|
"arguments": '{"folder_type": "download"}',
|
|
},
|
|
}
|
|
],
|
|
}
|
|
else:
|
|
return {"role": "assistant", "content": "I couldn't complete the task."}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=mock_settings, llm=mock_llm, max_tool_iterations=3)
|
|
|
|
agent.step("Do something")
|
|
|
|
assert call_count[0] == 4
|
|
|
|
def test_step_includes_history(self, memory_with_history, mock_settings, mock_llm):
|
|
"""Should include conversation history in prompt."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
agent.step("New message")
|
|
|
|
call_args = mock_llm.complete.call_args[0][0]
|
|
messages_content = [m.get("content", "") for m in call_args]
|
|
assert any("Hello" in str(c) for c in messages_content)
|
|
|
|
def test_step_includes_events(self, memory, mock_settings, mock_llm):
|
|
"""Should include unread events in prompt."""
|
|
memory.episodic.add_background_event("download_complete", {"name": "Movie.mkv"})
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
agent.step("What's new?")
|
|
|
|
call_args = mock_llm.complete.call_args[0][0]
|
|
messages_content = [m.get("content", "") for m in call_args]
|
|
assert any("download" in str(c).lower() for c in messages_content)
|
|
|
|
def test_step_saves_ltm(self, memory, mock_settings, mock_llm, temp_dir):
|
|
"""Should save LTM after step."""
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
agent.step("Hello")
|
|
|
|
ltm_file = temp_dir / "ltm.json"
|
|
assert ltm_file.exists()
|
|
|
|
|
|
class TestAgentIntegration:
|
|
"""Integration tests for Agent."""
|
|
|
|
def test_multiple_tool_calls(self, memory, mock_settings, mock_llm, real_folder):
|
|
"""Should handle multiple tool calls in sequence."""
|
|
memory.ltm.workspace.download = str(real_folder["downloads"])
|
|
memory.ltm.library_paths.set("movies", str(real_folder["movies"]))
|
|
|
|
call_count = [0]
|
|
|
|
def mock_complete(messages, tools=None):
|
|
call_count[0] += 1
|
|
# CRITICAL: Verify tools are passed on every call
|
|
assert tools is not None, f"Tools not passed on call {call_count[0]}!"
|
|
|
|
if call_count[0] == 1:
|
|
return {
|
|
"role": "assistant",
|
|
"content": None,
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_1",
|
|
"function": {
|
|
"name": "list_folder",
|
|
"arguments": '{"folder_type": "download"}',
|
|
},
|
|
}
|
|
],
|
|
}
|
|
elif call_count[0] == 2:
|
|
# CRITICAL: Verify tool result was sent back
|
|
tool_messages = [m for m in messages if m.get("role") == "tool"]
|
|
assert len(tool_messages) > 0, "Tool result not sent back to LLM!"
|
|
|
|
return {
|
|
"role": "assistant",
|
|
"content": None,
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_2",
|
|
"function": {
|
|
"name": "list_folder",
|
|
"arguments": '{"folder_type": "movie"}',
|
|
},
|
|
}
|
|
],
|
|
}
|
|
else:
|
|
return {
|
|
"role": "assistant",
|
|
"content": "I listed both folders for you.",
|
|
}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=mock_settings, llm=mock_llm)
|
|
|
|
agent.step("List my downloads and movies")
|
|
|
|
assert call_count[0] == 3
|