e07c9ec77b
Several weeks of work accumulated without being committed. Grouped here for clarity; see CHANGELOG.md [Unreleased] for the user-facing summary. Highlights ---------- P1 #2 — ISO 639-2/B canonical migration - New Language VO + LanguageRegistry (alfred/domain/shared/knowledge/). - iso_languages.yaml as single source of truth for language codes. - SubtitleKnowledgeBase now delegates lookup to LanguageRegistry; subtitles.yaml only declares subtitle-specific tokens (vostfr, vf, vff, …). - SubtitlePreferences default → ["fre", "eng"]; subtitle filenames written as {iso639_2b}.srt (legacy fr.srt still read via alias). - Scanner: dropped _LANG_KEYWORDS / _SDH_TOKENS / _FORCED_TOKENS / SUBTITLE_EXTENSIONS hardcoded dicts. - Fixed: 'hi' token no longer marks SDH (conflicted with Hindi alias). - Added settings.min_movie_size_bytes (was a module constant). P1 #3 — Release parser unification + data-driven tokenizer - parse_release() is now the single source of truth for release-name parsing. - alfred/knowledge/release/separators.yaml declares the token separators used by the tokenizer (., space, [, ], (, ), _). New conventions can be added without code changes. - Tokenizer now splits on any configured separator instead of name.split('.'). Releases like 'The Father (2020) [1080p] [WEBRip] [5.1] [YTS.MX]' parse via the direct path without sanitization fallback. - Site-tag extraction always runs first; well-formedness only rejects truly forbidden chars. - _parse_season_episode() extended with NxNN / NxNNxNN alt forms. - Removed dead helpers: _sanitize, _normalize. Domain cleanup - Deleted fossil services with zero production callers: alfred/domain/movies/services.py alfred/domain/tv_shows/services.py alfred/domain/subtitles/services.py (replaced by subtitles/services/ package) alfred/domain/subtitles/repositories.py - Split monolithic subtitle services into a package (identifier, matcher, placer, pattern_detector, utils) + dedicated knowledge/ package. - MediaInfo split into dedicated package (alfred/domain/shared/media/: audio, video, subtitle, info, matching). Persistence cleanup - Removed dead JSON repositories (movie/subtitle/tvshow_repository.py). Tests - Major expansion of the test suite organized to mirror the source tree. - Removed obsolete *_edge_cases test files superseded by structured tests. - Suite: 990 passed, 8 skipped. Misc - .gitignore: exclude env_backup/ and *.bak. - Adjustments across agent/llm, app.py, application/filesystem, and infrastructure/filesystem to align with the new domain layout.
393 lines
13 KiB
Python
393 lines
13 KiB
Python
"""Edge-case tests for ``alfred.agent.agent.Agent``.
|
|
|
|
Covers pathological tool-call inputs and unusual control flow:
|
|
|
|
- **TestExecuteToolCallEdgeCases** — malformed JSON arguments, unknown
|
|
tools, extra/wrong-typed args, and propagation of ``KeyboardInterrupt``
|
|
(must not be swallowed by the tool executor).
|
|
- **TestStepEdgeCases** — empty input, oversize input, unicode input.
|
|
- **TestAgentConcurrencyEdgeCases** — mid-step memory mutations through
|
|
``set_path_for_folder``.
|
|
- **TestAgentErrorRecovery** — recovery from tool errors during the loop.
|
|
|
|
The KeyboardInterrupt test patches ``visible_tool_names`` so the injected
|
|
test tool is in scope; otherwise the agent's workflow-scope guard would
|
|
short-circuit before ``tool.func()`` runs and the exception would never be
|
|
raised.
|
|
"""
|
|
|
|
from unittest.mock import Mock
|
|
|
|
import pytest
|
|
|
|
from alfred.agent.agent import Agent
|
|
from alfred.infrastructure.persistence import get_memory
|
|
from alfred.settings import settings
|
|
|
|
|
|
class TestExecuteToolCallEdgeCases:
|
|
"""Edge case tests for _execute_tool_call."""
|
|
|
|
def test_tool_returns_none(self, memory, mock_llm):
|
|
"""Should handle tool returning None."""
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
# Mock a tool that returns None
|
|
from alfred.agent.registry import Tool
|
|
|
|
agent.tools["test_tool"] = Tool(
|
|
name="test_tool", description="Test", func=lambda: None, parameters={}
|
|
)
|
|
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {"name": "test_tool", "arguments": "{}"},
|
|
}
|
|
result = agent._execute_tool_call(tool_call)
|
|
|
|
assert result is None or isinstance(result, dict)
|
|
|
|
def test_tool_raises_keyboard_interrupt(self, memory, mock_llm, monkeypatch):
|
|
"""KeyboardInterrupt raised by a tool must propagate up, not be swallowed."""
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
from alfred.agent.registry import Tool
|
|
|
|
def raise_interrupt():
|
|
raise KeyboardInterrupt()
|
|
|
|
agent.tools["test_tool"] = Tool(
|
|
name="test_tool", description="Test", func=raise_interrupt, parameters={}
|
|
)
|
|
# The scope guard (``visible_tool_names``) would otherwise short-circuit
|
|
# the call before reaching ``tool.func()``. Make our injected tool
|
|
# visible to reach the exception path under test.
|
|
monkeypatch.setattr(
|
|
agent.prompt_builder, "visible_tool_names", lambda: ["test_tool"]
|
|
)
|
|
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {"name": "test_tool", "arguments": "{}"},
|
|
}
|
|
|
|
with pytest.raises(KeyboardInterrupt):
|
|
agent._execute_tool_call(tool_call)
|
|
|
|
def test_tool_with_extra_args(self, memory, mock_llm, real_folder):
|
|
"""Should handle extra arguments gracefully."""
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
memory.ltm.workspace.download = str(real_folder["downloads"])
|
|
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {
|
|
"name": "list_folder",
|
|
"arguments": '{"folder_type": "download", "extra_arg": "ignored"}',
|
|
},
|
|
}
|
|
|
|
result = agent._execute_tool_call(tool_call)
|
|
|
|
assert result.get("error") == "bad_args"
|
|
|
|
def test_tool_with_wrong_type_args(self, memory, mock_llm):
|
|
"""Should handle wrong argument types."""
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
tool_call = {
|
|
"id": "call_123",
|
|
"function": {
|
|
"name": "get_torrent_by_index",
|
|
"arguments": '{"index": "not an int"}',
|
|
},
|
|
}
|
|
|
|
result = agent._execute_tool_call(tool_call)
|
|
|
|
assert "error" in result or "status" in result
|
|
|
|
|
|
class TestStepEdgeCases:
|
|
"""Edge case tests for step method."""
|
|
|
|
def test_step_with_empty_input(self, memory, mock_llm):
|
|
"""Should handle empty user input."""
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
response = agent.step("")
|
|
|
|
assert response is not None
|
|
|
|
def test_step_with_very_long_input(self, memory, mock_llm):
|
|
"""Should handle very long user input."""
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
long_input = "x" * 100000
|
|
response = agent.step(long_input)
|
|
|
|
assert response is not None
|
|
|
|
def test_step_with_unicode_input(self, memory, mock_llm):
|
|
"""Should handle unicode input."""
|
|
|
|
def mock_complete(messages, tools=None):
|
|
return {"role": "assistant", "content": "日本語の応答"}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
response = agent.step("日本語の質問")
|
|
|
|
assert response == "日本語の応答"
|
|
|
|
def test_step_llm_returns_empty(self, memory, mock_llm):
|
|
"""Should handle LLM returning empty string."""
|
|
|
|
def mock_complete(messages, tools=None):
|
|
return {"role": "assistant", "content": ""}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
response = agent.step("Hello")
|
|
|
|
assert response == ""
|
|
|
|
def test_step_llm_raises_exception(self, memory, mock_llm):
|
|
"""Should propagate LLM exceptions."""
|
|
mock_llm.complete.side_effect = Exception("LLM Error")
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
with pytest.raises(Exception, match="LLM Error"):
|
|
agent.step("Hello")
|
|
|
|
def test_step_tool_loop_with_same_tool(self, memory, mock_llm):
|
|
"""Should handle tool calling same tool repeatedly."""
|
|
call_count = [0]
|
|
|
|
def mock_complete(messages, tools=None):
|
|
call_count[0] += 1
|
|
if call_count[0] <= 3:
|
|
return {
|
|
"role": "assistant",
|
|
"content": None,
|
|
"tool_calls": [
|
|
{
|
|
"id": f"call_{call_count[0]}",
|
|
"function": {
|
|
"name": "list_folder",
|
|
"arguments": '{"folder_type": "download"}',
|
|
},
|
|
}
|
|
],
|
|
}
|
|
return {"role": "assistant", "content": "Done looping"}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=settings, llm=mock_llm, max_tool_iterations=3)
|
|
|
|
agent.step("Loop test")
|
|
|
|
assert call_count[0] == 4
|
|
|
|
def test_step_preserves_history_order(self, memory, mock_llm):
|
|
"""Should preserve message order in history."""
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
agent.step("First")
|
|
agent.step("Second")
|
|
agent.step("Third")
|
|
|
|
mem = get_memory()
|
|
history = mem.stm.get_recent_history(10)
|
|
|
|
user_messages = [h["content"] for h in history if h["role"] == "user"]
|
|
assert user_messages == ["First", "Second", "Third"]
|
|
|
|
def test_step_with_pending_question(self, memory, mock_llm):
|
|
"""Should include pending question in context."""
|
|
memory.episodic.set_pending_question(
|
|
"Which one?",
|
|
[{"index": 1, "label": "Option 1"}],
|
|
{},
|
|
)
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
agent.step("Hello")
|
|
|
|
call_args = mock_llm.complete.call_args[0][0]
|
|
system_prompt = call_args[0]["content"]
|
|
assert "PENDING QUESTION" in system_prompt
|
|
|
|
def test_step_with_active_downloads(self, memory, mock_llm):
|
|
"""Should include active downloads in context."""
|
|
memory.episodic.add_active_download(
|
|
{
|
|
"task_id": "123",
|
|
"name": "Movie.mkv",
|
|
"progress": 50,
|
|
}
|
|
)
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
agent.step("Hello")
|
|
|
|
call_args = mock_llm.complete.call_args[0][0]
|
|
system_prompt = call_args[0]["content"]
|
|
assert "ACTIVE DOWNLOADS" in system_prompt
|
|
|
|
def test_step_clears_events_after_notification(self, memory, mock_llm):
|
|
"""Should mark events as read after notification."""
|
|
memory.episodic.add_background_event("test_event", {"data": "test"})
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
agent.step("Hello")
|
|
|
|
unread = memory.episodic.get_unread_events()
|
|
assert len(unread) == 0
|
|
|
|
|
|
class TestAgentConcurrencyEdgeCases:
|
|
"""Edge case tests for concurrent access."""
|
|
|
|
def test_multiple_agents_same_memory(self, memory, mock_llm):
|
|
"""Should handle multiple agents with same memory."""
|
|
agent1 = Agent(settings=settings, llm=mock_llm)
|
|
agent2 = Agent(settings=settings, llm=mock_llm)
|
|
|
|
agent1.step("From agent 1")
|
|
agent2.step("From agent 2")
|
|
|
|
mem = get_memory()
|
|
history = mem.stm.get_recent_history(10)
|
|
|
|
assert len(history) == 4
|
|
|
|
def test_tool_modifies_memory_during_step(self, memory, mock_llm, real_folder):
|
|
"""A tool invocation must persist its mutation into LTM."""
|
|
memory.ltm.workspace.download = str(real_folder["downloads"])
|
|
|
|
call_count = [0]
|
|
|
|
def mock_complete(messages, tools=None):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
return {
|
|
"role": "assistant",
|
|
"content": None,
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_1",
|
|
"function": {
|
|
"name": "set_path_for_folder",
|
|
"arguments": f'{{"folder_name": "movies", "path_value": "{str(real_folder["movies"])}"}}',
|
|
},
|
|
}
|
|
],
|
|
}
|
|
return {"role": "assistant", "content": "Path set successfully."}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
agent.step("Set movie folder")
|
|
|
|
mem = get_memory()
|
|
# ``movies`` is a library collection (not download/torrent) → stored in
|
|
# library_paths, not as a flat attribute.
|
|
assert mem.ltm.library_paths.get("movies") == str(real_folder["movies"])
|
|
|
|
|
|
class TestAgentErrorRecovery:
|
|
"""Tests for agent error recovery."""
|
|
|
|
def test_recovers_from_tool_error(self, memory, mock_llm):
|
|
"""Should recover from tool error and continue."""
|
|
call_count = [0]
|
|
|
|
def mock_complete(messages, tools=None):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
return {
|
|
"role": "assistant",
|
|
"content": None,
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_1",
|
|
"function": {
|
|
"name": "list_folder",
|
|
"arguments": '{"folder_type": "download"}',
|
|
},
|
|
}
|
|
],
|
|
}
|
|
return {"role": "assistant", "content": "The folder is not configured."}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
response = agent.step("List downloads")
|
|
|
|
assert "not configured" in response.lower() or len(response) > 0
|
|
|
|
def test_error_tracked_in_memory(self, memory, mock_llm):
|
|
"""Should track errors in episodic memory."""
|
|
call_count = [0]
|
|
|
|
def mock_complete(messages, tools=None):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
return {
|
|
"role": "assistant",
|
|
"content": None,
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_1",
|
|
"function": {
|
|
"name": "set_path_for_folder",
|
|
"arguments": "{}", # Missing required args
|
|
},
|
|
}
|
|
],
|
|
}
|
|
return {"role": "assistant", "content": "Error occurred."}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=settings, llm=mock_llm)
|
|
|
|
agent.step("Set folder")
|
|
|
|
mem = get_memory()
|
|
assert len(mem.episodic.recent_errors) > 0
|
|
|
|
def test_multiple_errors_in_sequence(self, memory, mock_llm):
|
|
"""Should track multiple errors."""
|
|
call_count = [0]
|
|
|
|
def mock_complete(messages, tools=None):
|
|
call_count[0] += 1
|
|
if call_count[0] <= 3:
|
|
return {
|
|
"role": "assistant",
|
|
"content": None,
|
|
"tool_calls": [
|
|
{
|
|
"id": f"call_{call_count[0]}",
|
|
"function": {
|
|
"name": "set_path_for_folder",
|
|
"arguments": "{}", # Missing required args - will error
|
|
},
|
|
}
|
|
],
|
|
}
|
|
return {"role": "assistant", "content": "All attempts failed."}
|
|
|
|
mock_llm.complete = Mock(side_effect=mock_complete)
|
|
agent = Agent(settings=settings, llm=mock_llm, max_tool_iterations=3)
|
|
|
|
agent.step("Try multiple times")
|
|
|
|
mem = get_memory()
|
|
assert len(mem.episodic.recent_errors) >= 1
|