334 lines
12 KiB
Python
334 lines
12 KiB
Python
"""Prompt builder for the agent system."""
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
from alfred.infrastructure.persistence import get_memory
|
|
from alfred.infrastructure.persistence.memory import MemoryRegistry
|
|
|
|
from .expressions import build_expressions_context
|
|
from .registry import Tool
|
|
from .workflows import WorkflowLoader
|
|
|
|
# Tools that are always available, regardless of workflow scope.
|
|
# Kept small on purpose — the noyau is what the agent uses to either
|
|
# answer trivially or pivot into a workflow.
|
|
CORE_TOOLS: tuple[str, ...] = (
|
|
"set_language",
|
|
"set_path_for_folder",
|
|
"list_folder",
|
|
"read_release_metadata",
|
|
"query_library",
|
|
"start_workflow",
|
|
"end_workflow",
|
|
)
|
|
|
|
|
|
class PromptBuilder:
|
|
"""Builds system prompts for the agent with memory context."""
|
|
|
|
def __init__(
|
|
self,
|
|
tools: dict[str, Tool],
|
|
workflow_loader: WorkflowLoader | None = None,
|
|
):
|
|
self.tools = tools
|
|
self.workflow_loader = workflow_loader or WorkflowLoader()
|
|
self._memory_registry = MemoryRegistry()
|
|
|
|
def _active_workflow(self, memory) -> dict | None:
|
|
"""Return the YAML definition of the active workflow, or None."""
|
|
current = memory.stm.workflow.current
|
|
if current is None:
|
|
return None
|
|
return self.workflow_loader.get(current.get("name"))
|
|
|
|
def visible_tool_names(self) -> list[str]:
|
|
"""
|
|
Return the names of the tools currently in scope.
|
|
|
|
- Idle (no workflow): core noyau only. The LLM enters a workflow
|
|
via start_workflow to access more tools.
|
|
- Workflow active: core noyau + the workflow's declared tools.
|
|
"""
|
|
memory = get_memory()
|
|
visible = set(CORE_TOOLS)
|
|
workflow = self._active_workflow(memory)
|
|
if workflow is not None:
|
|
for name in workflow.get("tools", []):
|
|
visible.add(name)
|
|
# Only return tools that actually exist in the registry.
|
|
return [name for name in self.tools if name in visible]
|
|
|
|
def _format_identity(self, memory) -> str:
|
|
"""Build Alfred's identity and personality section."""
|
|
username = memory.stm.get_entity("username")
|
|
expr = build_expressions_context(username)
|
|
nickname = expr["nickname"]
|
|
samples = expr["samples"]
|
|
|
|
# Format expressions as situational guidance for the LLM
|
|
expr_lines = []
|
|
situation_labels = {
|
|
"greeting": "Salutation",
|
|
"success": "Succès",
|
|
"working": "En cours",
|
|
"error": "Erreur",
|
|
"unclear": "Demande floue",
|
|
"warning": "Avertissement",
|
|
"not_found": "Introuvable",
|
|
}
|
|
for situation, label in situation_labels.items():
|
|
phrase = samples.get(situation, "")
|
|
if phrase:
|
|
expr_lines.append(f' {label}: "{phrase}"')
|
|
|
|
expressions_block = "\n".join(expr_lines)
|
|
|
|
return f"""Tu t'appelles Alfred. Tu es un assistant d'organisation de médiathèque — direct, opérationnel, légèrement impertinent.
|
|
|
|
PERSONNALITÉ:
|
|
- Tu parles franglais : français avec des touches d'anglais quand ça colle mieux
|
|
- Tu es sarcastique et n'as pas peur de te moquer si l'user fait une connerie
|
|
- Tu envoies chier poliment (mais clairement) quand la demande est trop floue
|
|
- Tu ne fais pas de blabla inutile. Si "ok c'est fait" suffit, c'est tout ce que tu dis
|
|
- Tu peux jurer (putain, merde, con, ...) — c'est naturel, pas du remplissage
|
|
- Jamais de "Great question!" ou de politesse creuse
|
|
|
|
USER COURANT: {nickname}
|
|
|
|
EXPRESSIONS À UTILISER (une par situation, naturellement intégrées dans ta réponse) :
|
|
{expressions_block}"""
|
|
|
|
def build_tools_spec(self) -> list[dict[str, Any]]:
|
|
"""Build the tool specification for the LLM API (scope-filtered)."""
|
|
visible = set(self.visible_tool_names())
|
|
tool_specs = []
|
|
for tool in self.tools.values():
|
|
if tool.name not in visible:
|
|
continue
|
|
spec = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": tool.name,
|
|
"description": tool.description,
|
|
"parameters": tool.parameters,
|
|
},
|
|
}
|
|
tool_specs.append(spec)
|
|
return tool_specs
|
|
|
|
def _format_tools_description(self) -> str:
|
|
"""Format the currently-visible tools with description + params."""
|
|
visible = set(self.visible_tool_names())
|
|
visible_tools = [t for t in self.tools.values() if t.name in visible]
|
|
if not visible_tools:
|
|
return ""
|
|
return "\n".join(
|
|
f"- {tool.name}: {tool.description}\n"
|
|
f" Parameters: {json.dumps(tool.parameters, ensure_ascii=False)}"
|
|
for tool in visible_tools
|
|
)
|
|
|
|
def _format_workflow_scope(self, memory) -> str:
|
|
"""Describe the current workflow scope so the LLM has a plan."""
|
|
workflow = self._active_workflow(memory)
|
|
if workflow is None:
|
|
available = self.workflow_loader.names()
|
|
if not available:
|
|
return ""
|
|
lines = ["WORKFLOW SCOPE: idle (broad catalog narrowed to core noyau)."]
|
|
lines.append(
|
|
" Call start_workflow(workflow_name, params) to enter a scope."
|
|
)
|
|
lines.append(" Available workflows:")
|
|
for name in available:
|
|
wf = self.workflow_loader.get(name) or {}
|
|
desc = (wf.get("description") or "").strip().splitlines()
|
|
summary = desc[0] if desc else ""
|
|
lines.append(f" - {name}: {summary}")
|
|
return "\n".join(lines)
|
|
|
|
current = memory.stm.workflow.current or {}
|
|
lines = [
|
|
f"WORKFLOW SCOPE: active — {current.get('name')} "
|
|
f"(stage: {current.get('stage')})",
|
|
]
|
|
params = current.get("params")
|
|
if params:
|
|
lines.append(f" Params: {params}")
|
|
wf_desc = (workflow.get("description") or "").strip()
|
|
if wf_desc:
|
|
lines.append(f" Goal: {wf_desc}")
|
|
steps = workflow.get("steps", [])
|
|
if steps:
|
|
lines.append(" Steps:")
|
|
for step in steps:
|
|
step_id = step.get("id", "?")
|
|
step_tool = step.get("tool") or (
|
|
"ask_user" if step.get("ask_user") else "—"
|
|
)
|
|
lines.append(f" - {step_id} ({step_tool})")
|
|
lines.append(" Call end_workflow(reason) when done, cancelled, or off-topic.")
|
|
return "\n".join(lines)
|
|
|
|
def _format_episodic_context(self, memory) -> str:
|
|
"""Format episodic memory context for the prompt."""
|
|
lines = []
|
|
|
|
if memory.episodic.last_search_results:
|
|
results = memory.episodic.last_search_results
|
|
result_list = results.get("results", [])
|
|
lines.append(
|
|
f"\nLAST SEARCH: '{results.get('query')}' ({len(result_list)} results)"
|
|
)
|
|
# Show first 5 results
|
|
for i, result in enumerate(result_list[:5]):
|
|
name = result.get("name", "Unknown")
|
|
lines.append(f" {i + 1}. {name}")
|
|
if len(result_list) > 5:
|
|
lines.append(f" ... and {len(result_list) - 5} more")
|
|
|
|
if memory.episodic.pending_question:
|
|
question = memory.episodic.pending_question
|
|
lines.append(f"\nPENDING QUESTION: {question.get('question')}")
|
|
lines.append(f" Type: {question.get('type')}")
|
|
if question.get("options"):
|
|
lines.append(f" Options: {len(question.get('options'))}")
|
|
|
|
if memory.episodic.active_downloads:
|
|
lines.append(f"\nACTIVE DOWNLOADS: {len(memory.episodic.active_downloads)}")
|
|
for dl in memory.episodic.active_downloads[:3]:
|
|
lines.append(f" - {dl.get('name')}: {dl.get('progress', 0)}%")
|
|
|
|
if memory.episodic.recent_errors:
|
|
lines.append("\nRECENT ERRORS (up to 3):")
|
|
for error in memory.episodic.recent_errors[-3:]:
|
|
lines.append(
|
|
f" - Action '{error.get('action')}' failed: {error.get('error')}"
|
|
)
|
|
|
|
# Unread events
|
|
unread = [e for e in memory.episodic.background_events if not e.get("read")]
|
|
if unread:
|
|
lines.append(f"\nUNREAD EVENTS: {len(unread)}")
|
|
for event in unread[:3]:
|
|
lines.append(f" - {event.get('type')}: {event.get('data')}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def _format_stm_context(self, memory) -> str:
|
|
"""Format short-term memory context for the prompt."""
|
|
lines = []
|
|
|
|
if memory.stm.current_workflow:
|
|
workflow = memory.stm.current_workflow
|
|
lines.append(
|
|
f"CURRENT WORKFLOW: {workflow.get('name')} (stage: {workflow.get('stage')})"
|
|
)
|
|
if workflow.get("params"):
|
|
lines.append(f" Params: {workflow.get('params')}")
|
|
|
|
if memory.stm.current_topic:
|
|
lines.append(f"CURRENT TOPIC: {memory.stm.current_topic}")
|
|
|
|
if memory.stm.extracted_entities:
|
|
lines.append("EXTRACTED ENTITIES:")
|
|
for key, value in memory.stm.extracted_entities.items():
|
|
lines.append(f" - {key}: {value}")
|
|
|
|
if memory.stm.language:
|
|
lines.append(f"CONVERSATION LANGUAGE: {memory.stm.language}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def _format_memory_schema(self) -> str:
|
|
"""Describe available memory components so the agent knows what to read/write and when."""
|
|
schema = self._memory_registry.schema()
|
|
tier_labels = {
|
|
"ltm": "LONG-TERM (persisted)",
|
|
"stm": "SHORT-TERM (session)",
|
|
"episodic": "EPISODIC (volatile)",
|
|
}
|
|
lines = ["MEMORY COMPONENTS:"]
|
|
|
|
for tier, components in schema.items():
|
|
if not components:
|
|
continue
|
|
lines.append(f"\n [{tier_labels.get(tier, tier.upper())}]")
|
|
for c in components:
|
|
access = c.get("access", "read")
|
|
lines.append(f" {c['name']} ({access}): {c['description']}")
|
|
for field_name, field_desc in c.get("fields", {}).items():
|
|
lines.append(f" · {field_name}: {field_desc}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def _format_config_context(self, memory) -> str:
|
|
"""Format configuration context."""
|
|
lines = ["CURRENT CONFIGURATION:"]
|
|
folders = {
|
|
**memory.ltm.workspace.as_dict(),
|
|
**memory.ltm.library_paths.to_dict(),
|
|
}
|
|
if folders:
|
|
for key, value in folders.items():
|
|
lines.append(f" - {key}: {value}")
|
|
else:
|
|
lines.append(" (no configuration set)")
|
|
return "\n".join(lines)
|
|
|
|
def build_system_prompt(self) -> str:
|
|
"""Build the complete system prompt."""
|
|
memory = get_memory()
|
|
|
|
# Identity + personality
|
|
identity = self._format_identity(memory)
|
|
|
|
# Language instruction
|
|
language_instruction = (
|
|
"Si la langue de l'user est différente de la langue courante en STM, "
|
|
"appelle `set_language` en premier avant de répondre."
|
|
)
|
|
|
|
# Configuration
|
|
config_section = self._format_config_context(memory)
|
|
|
|
# STM context
|
|
stm_context = self._format_stm_context(memory)
|
|
|
|
# Episodic context
|
|
episodic_context = self._format_episodic_context(memory)
|
|
|
|
# Memory schema
|
|
memory_schema = self._format_memory_schema()
|
|
|
|
# Workflow scope (active workflow plan or list of options)
|
|
workflow_section = self._format_workflow_scope(memory)
|
|
|
|
# Available tools (already filtered by scope)
|
|
tools_desc = self._format_tools_description()
|
|
tools_section = f"\nOUTILS DISPONIBLES:\n{tools_desc}" if tools_desc else ""
|
|
|
|
rules = """
|
|
RÈGLES:
|
|
- Utilise les outils pour accomplir les tâches, pas pour décorer
|
|
- Si des résultats de recherche sont dispo en mémoire épisodique, référence-les par index
|
|
- Confirme toujours avant une opération destructive (move, delete, overwrite)
|
|
- Réponses courtes — si c'est fait, dis-le en une ligne
|
|
- Si la demande est floue, demande un éclaircissement AVANT de lancer quoi que ce soit
|
|
"""
|
|
|
|
sections = [
|
|
identity,
|
|
language_instruction,
|
|
config_section,
|
|
stm_context,
|
|
episodic_context,
|
|
memory_schema,
|
|
workflow_section,
|
|
tools_section,
|
|
rules,
|
|
]
|
|
return "\n\n".join(s for s in sections if s and s.strip())
|