Files
francwa e45465d52d feat: split resolve_destination, persona-driven prompts, qBittorrent relocation
Destination resolution
- Replace the single ResolveDestinationUseCase with four dedicated
  functions, one per release type:
    resolve_season_destination    (pack season, folder move)
    resolve_episode_destination   (single episode, file move)
    resolve_movie_destination     (movie, file move)
    resolve_series_destination    (multi-season pack, folder move)
- Each returns a dedicated DTO carrying only the fields relevant to
  that release type — no more polymorphic ResolvedDestination with
  half the fields unused depending on the case.
- Looser series folder matching: exact computed-name match is reused
  silently; any deviation (different group, multiple candidates) now
  prompts the user with all options including the computed name.

Agent tools
- Four new tools wrapping the use cases above; old resolve_destination
  removed from the registry.
- New move_to_destination tool: create_folder + move, chained — used
  after a resolve_* call to perform the actual relocation.
- Low-level filesystem_operations module (create_folder, move via mv)
  for instant same-FS renames (ZFS).

Prompt & persona
- New PromptBuilder (alfred/agent/prompt.py) replacing prompts.py:
  identity + personality block, situational expressions, memory
  schema, episodic/STM/config context, tool catalogue.
- Per-user expression system: knowledge/users/common.yaml +
  {username}.yaml are merged at runtime; one phrase per situation
  (greeting/success/error/...) is sampled into the system prompt.

qBittorrent integration
- Credentials now come from settings (qbittorrent_url/username/password)
  instead of hardcoded defaults.
- New client methods: find_by_name, set_location, recheck — the trio
  needed to update a torrent's save path and re-verify after a move.
- Host→container path translation settings (qbittorrent_host_path /
  qbittorrent_container_path) for docker-mounted setups.

Subtitles
- Identifier: strip parenthesized qualifiers (simplified, brazil…) at
  tokenization; new _tokenize_suffix used for the episode_subfolder
  pattern so episode-stem tokens no longer pollute language detection.
- Placer: extract _build_dest_name so it can be reused by the new
  dry_run path in ManageSubtitlesUseCase.
- Knowledge: add yue, ell, ind, msa, rus, vie, heb, tam, tel, tha,
  hin, ukr; add 'fre' to fra; add 'simplified'/'traditional' to zho.

Misc
- LTM workspace: add 'trash' folder slot.
- Default LLM provider switched to deepseek.
- testing/debug_release.py: CLI to parse a release, hit TMDB, and
  dry-run the destination resolution end-to-end.
2026-05-14 05:01:59 +02:00

644 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""
run_workflow.py — Simulate an Alfred workflow step by step (dry-run or live).
Usage:
uv run testing/workflows/run_workflow.py organize_media [options]
Options:
--dry-run Print what each step would do without executing tools (default).
--live Actually execute the tools (uses real filesystem + memory).
--source PATH Source video file (download folder).
--dest PATH Destination video file (library path).
--download-folder P Original download folder (for create_seed_links).
--imdb-id ID IMDb ID for identify_media step (tt1234567).
--seed Answer "yes" to the seeding question.
--no-color Disable ANSI colours.
Examples:
uv run testing/workflows/run_workflow.py organize_media --dry-run \\
--source "/downloads/Breaking.Bad.S01E01.mkv" \\
--dest "/tv/Breaking Bad/Season 01/Breaking Bad.S01E01.mkv"
uv run testing/workflows/run_workflow.py organize_media --live \\
--source "/downloads/BB/Breaking.Bad.S01E01.mkv" \\
--dest "/tv/Breaking Bad/Season 01/Breaking Bad.S01E01.mkv" \\
--download-folder "/downloads/BB" --seed
"""
import argparse
import sys
import textwrap
from pathlib import Path
from typing import Any
# Project root on sys.path
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Colours
# ---------------------------------------------------------------------------
USE_COLOR = True
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
def c(text: str, *codes: str) -> str:
if not USE_COLOR:
return text
return "".join(codes) + str(text) + RESET
def section(title: str) -> None:
print()
print(c("" * 70, DIM))
print(c(f" {title}", BOLD, CYAN))
print(c("" * 70, DIM))
def ok(msg: str) -> None:
print(c("", GREEN, BOLD) + msg)
def warn(msg: str) -> None:
print(c("", YELLOW, BOLD) + msg)
def err(msg: str) -> None:
print(c("", RED, BOLD) + msg)
def info(msg: str) -> None:
print(f" {msg}")
def kv(key: str, val: str) -> None:
print(f" {c(key + ':', BOLD)} {val}")
# ---------------------------------------------------------------------------
# Dry-run tool stubs
# ---------------------------------------------------------------------------
def _real_list_folder(folder_type: str, path: str = ".") -> dict[str, Any]:
"""Call the real list_folder (read-only, safe in dry-run)."""
# TODO: remove hardcoded fallback once download path is configured in LTM
_HARDCODED_DOWNLOAD_ROOT = "/mnt/testipool/downloads"
try:
from alfred.infrastructure.persistence import get_memory, init_memory
try:
get_memory()
except Exception:
init_memory()
from alfred.agent.tools.filesystem import list_folder
result = list_folder(folder_type=folder_type, path=path)
if result.get("status") == "error" and folder_type == "download":
raise RuntimeError(result.get("message", "not configured"))
return result
except Exception as e:
if folder_type == "download":
warn(
f"list_folder: {e} — using hardcoded download root: {_HARDCODED_DOWNLOAD_ROOT}"
)
import os
resolved = (
os.path.join(_HARDCODED_DOWNLOAD_ROOT, path)
if path != "."
else _HARDCODED_DOWNLOAD_ROOT
)
try:
entries = sorted(os.listdir(resolved))
except OSError as oe:
return {"status": "error", "error": "os_error", "message": str(oe)}
return {
"status": "ok",
"folder_type": folder_type,
"path": resolved,
"entries": entries,
"count": len(entries),
}
warn(f"list_folder: filesystem unavailable ({e}), falling back to stub")
return {
"status": "ok",
"folder_type": folder_type,
"path": path,
"entries": ["[stub — filesystem unavailable]"],
"count": 1,
}
def _real_find_media_imdb_id(media_title: str, **kwargs) -> dict[str, Any]:
"""Call the real TMDB API even in dry-run (read-only, no filesystem side effects)."""
try:
from alfred.infrastructure.persistence import get_memory, init_memory
try:
get_memory()
except Exception:
init_memory()
from alfred.agent.tools.api import find_media_imdb_id
return find_media_imdb_id(media_title=media_title)
except Exception as e:
warn(f"find_media_imdb_id: TMDB unavailable ({e}), falling back to stub")
return {
"status": "ok",
"imdb_id": "tt0000000",
"title": media_title,
"media_type": "tv_show",
"year": 2024,
}
def _dry_resolve_destination(
release_name: str,
source_file: str,
tmdb_title: str,
tmdb_year: int,
tmdb_episode_title: str | None = None,
confirmed_folder: str | None = None,
) -> dict[str, Any]:
from alfred.domain.release import parse_release
parsed = parse_release(release_name)
ext = Path(source_file).suffix
if parsed.is_movie:
folder = parsed.movie_folder_name(tmdb_title, tmdb_year)
fname = parsed.movie_filename(tmdb_title, tmdb_year, ext)
return {
"status": "ok",
"library_file": f"/movies/{folder}/{fname}",
"series_folder": f"/movies/{folder}",
"series_folder_name": folder,
"season_folder": None,
"season_folder_name": None,
"filename": fname,
"is_new_series_folder": True,
}
season_folder = parsed.season_folder_name()
show_folder = confirmed_folder or parsed.show_folder_name(tmdb_title, tmdb_year)
fname = (
parsed.episode_filename(tmdb_episode_title, ext)
if not parsed.is_season_pack
else season_folder + ext
)
return {
"status": "ok",
"library_file": f"/tv/{show_folder}/{season_folder}/{fname}",
"series_folder": f"/tv/{show_folder}",
"season_folder": f"/tv/{show_folder}/{season_folder}",
"series_folder_name": show_folder,
"season_folder_name": season_folder,
"filename": fname,
"is_new_series_folder": confirmed_folder is None,
}
def _dry_move_media(source: str, destination: str) -> dict[str, Any]:
return {
"status": "ok",
"source": source,
"destination": destination,
"filename": Path(destination).name,
"size": 0,
}
def _dry_manage_subtitles(source_video: str, destination_video: str) -> dict[str, Any]:
return {
"status": "ok",
"video_path": destination_video,
"placed": [],
"placed_count": 0,
"skipped_count": 0,
}
def _dry_create_seed_links(
library_file: str, original_download_folder: str
) -> dict[str, Any]:
return {
"status": "ok",
"torrent_subfolder": f"/torrents/{Path(original_download_folder).name}",
"linked_file": f"/torrents/{Path(original_download_folder).name}/{Path(library_file).name}",
"copied_files": ["[dry-run — no real copy]"],
"copied_count": 1,
"skipped": [],
}
DRY_RUN_TOOLS: dict[str, Any] = {
"list_folder": _real_list_folder,
"find_media_imdb_id": _real_find_media_imdb_id,
"resolve_destination": _dry_resolve_destination,
"move_media": _dry_move_media,
"manage_subtitles": _dry_manage_subtitles,
"create_seed_links": _dry_create_seed_links,
}
# ---------------------------------------------------------------------------
# Live tools
# ---------------------------------------------------------------------------
def _load_live_tools() -> dict[str, Any]:
from alfred.agent.tools.filesystem import (
create_seed_links,
list_folder,
manage_subtitles,
move_media,
)
# find_media_imdb_id lives in the api tools
try:
from alfred.agent.tools.api import find_media_imdb_id
except ImportError:
def find_media_imdb_id(**kwargs): # type: ignore[misc]
return {
"status": "error",
"error": "not_available",
"message": "api tools not loaded",
}
return {
"list_folder": list_folder,
"find_media_imdb_id": find_media_imdb_id,
"move_media": move_media,
"manage_subtitles": manage_subtitles,
"create_seed_links": create_seed_links,
}
# ---------------------------------------------------------------------------
# Workflow runner
# ---------------------------------------------------------------------------
class WorkflowRunner:
def __init__(
self,
workflow: dict,
tools: dict[str, Any],
live: bool,
args: argparse.Namespace,
):
self.workflow = workflow
self.tools = tools
self.live = live
self.args = args
self.context: dict[str, Any] = {} # step results accumulate here
self.step_results: list[dict] = []
def run(self) -> None:
name = self.workflow.get("name", "?")
desc = self.workflow.get("description", "").strip()
mode = c("LIVE", RED, BOLD) if self.live else c("DRY-RUN", YELLOW, BOLD)
print()
print(c("" * 70, BOLD))
print(c(f" Alfred — Workflow Simulator [{mode}]", BOLD, MAGENTA))
print(c("" * 70, BOLD))
kv("Workflow", c(name, CYAN, BOLD))
kv("Description", desc)
kv("Tools allowed", ", ".join(self.workflow.get("tools", [])))
steps = self.workflow.get("steps", [])
for step in steps:
self._run_step(step)
section("SIMULATION TERMINÉE")
ok(f"{len(self.step_results)} step(s) exécuté(s)")
errors = [
r for r in self.step_results if r.get("result", {}).get("status") == "error"
]
if errors:
warn(f"{len(errors)} step(s) en erreur")
for r in errors:
err(
f" {r['id']}: {r['result'].get('error')}{r['result'].get('message')}"
)
print()
print(c("" * 70, BOLD))
print()
def _run_step(self, step: dict) -> None:
step_id = step.get("id", "?")
# --- ask_user step ---
if "ask_user" in step:
section(f"STEP [{step_id}] — ask_user")
q = step["ask_user"].get("question", "")
answers = step["ask_user"].get("answers", {})
info(c(f'Question: "{q}"', BOLD))
info(f"Réponses possibles: {', '.join(str(k) for k in answers.keys())}")
answer = "yes" if self.args.seed else "no"
# PyYAML parses bare yes/no as booleans — normalise keys to str
answers_str = {str(k): v for k, v in answers.items()}
next_step = answers_str.get(answer, {}).get("next_step", "update_library")
ok(f"Réponse simulée: {c(answer, CYAN)} → next: {c(next_step, CYAN)}")
self.context["seeding"] = answer == "yes"
self.context["ask_seeding_answer"] = answer
self.context["next_after_ask"] = next_step
# If "no", skip create_seed_links
if answer == "no":
self.context["skip_create_seed_links"] = True
return
# --- memory_write step ---
if "memory_write" in step:
section(f"STEP [{step_id}] — memory_write ({step['memory_write']})")
if self.live:
warn("memory_write: pas encore implémenté dans le simulator live")
else:
ok("(dry-run) Library entry would be written to LTM")
self.step_results.append({"id": step_id, "result": {"status": "ok"}})
return
# --- tool step ---
tool_name = step.get("tool")
if not tool_name:
warn(f"Step '{step_id}' has no tool or ask_user — skipped")
return
# Skip create_seed_links if user said no to seeding
if tool_name == "create_seed_links" and self.context.get(
"skip_create_seed_links"
):
section(f"STEP [{step_id}] — {tool_name}")
warn("Skipped (user chose not to seed)")
return
section(f"STEP [{step_id}] — {c(tool_name, CYAN, BOLD)}")
desc = step.get("description", "").strip()
if desc:
info(c(desc, DIM))
kwargs = self._build_kwargs(tool_name, step)
for k, v in kwargs.items():
kv(k, str(v))
if tool_name not in self.tools:
err(f"Tool '{tool_name}' not found in tool registry")
self.step_results.append(
{"id": step_id, "result": {"status": "error", "error": "unknown_tool"}}
)
return
try:
result = self.tools[tool_name](**kwargs)
except Exception as e:
err(f"Tool raised an exception: {e}")
self.step_results.append(
{"id": step_id, "result": {"status": "error", "error": str(e)}}
)
return
self._print_result(result, tool_name=tool_name)
self.context[step_id] = result
self.step_results.append({"id": step_id, "result": result})
# After list_downloads: confirm the requested media folder exists in downloads
if (
tool_name == "list_folder"
and result.get("status") == "ok"
and self.args.source
):
folder_path = result.get("path", "")
entries = result.get("entries", [])
if self.args.source in entries:
media_folder = str(Path(folder_path) / self.args.source)
self.context["media_folder"] = media_folder
print()
print(
f" {c('Dossier media trouvé:', BOLD, GREEN)} {c(media_folder, CYAN, BOLD)}"
)
else:
warn(f"Dossier '{self.args.source}' introuvable dans {folder_path}")
def _build_kwargs(self, tool_name: str, step: dict) -> dict[str, Any]:
"""Build tool kwargs from step params + CLI args + previous context."""
# Start from step-level params (static defaults from YAML)
kwargs: dict[str, Any] = dict(step.get("params") or {})
a = self.args
if tool_name == "list_folder":
kwargs.setdefault("folder_type", "download")
elif tool_name == "find_media_imdb_id":
if a.imdb_id:
kwargs["imdb_id"] = a.imdb_id
elif tool_name == "resolve_destination":
media_folder = self.context.get("media_folder")
if a.release:
kwargs["release_name"] = a.release
elif a.source:
kwargs.setdefault("release_name", a.source)
if media_folder:
kwargs["source_file"] = media_folder
if a.tmdb_title:
kwargs["tmdb_title"] = a.tmdb_title
if a.tmdb_year:
kwargs["tmdb_year"] = a.tmdb_year
if a.episode_title:
kwargs["tmdb_episode_title"] = a.episode_title
elif tool_name == "move_media":
# If resolve_destination ran, use its library_file as destination
resolved = self.context.get("resolve_destination", {})
media_folder = self.context.get("media_folder")
if media_folder:
kwargs["source"] = media_folder
dest = a.dest or resolved.get("library_file")
if dest:
kwargs["destination"] = dest
elif tool_name == "manage_subtitles":
resolved = self.context.get("resolve_destination", {})
media_folder = self.context.get("media_folder")
if media_folder:
kwargs["source_video"] = media_folder
dest = a.dest or resolved.get("library_file")
if dest:
kwargs["destination_video"] = dest
elif tool_name == "create_seed_links":
resolved = self.context.get("resolve_destination", {})
library_file = a.dest or resolved.get("library_file")
if library_file:
kwargs["library_file"] = library_file
if a.download_folder:
kwargs["original_download_folder"] = a.download_folder
else:
# Use the resolved folder path from list_downloads context
list_result = self.context.get("list_downloads", {})
folder_path = list_result.get("path")
if folder_path:
kwargs.setdefault("original_download_folder", folder_path)
return kwargs
def _print_result(self, result: dict, tool_name: str = "") -> None:
status = result.get("status", "?")
if status == "ok":
ok(f"status={c('ok', GREEN)}")
elif status == "needs_clarification":
warn(f"status={c('needs_clarification', YELLOW)}")
else:
err(
f"status={c(status, RED)} error={result.get('error')} msg={result.get('message')}"
)
return
# Highlight resolved folder path for list_folder
if tool_name == "list_folder" and result.get("path"):
print()
print(
f" {c('Dossier résolu:', BOLD, GREEN)} {c(result['path'], CYAN, BOLD)}"
)
# Pretty-print notable fields
skip = {"status", "error", "message"}
for k, v in result.items():
if k in skip:
continue
if isinstance(v, list):
if v:
info(c(f"{k}:", BOLD))
for item in v[:10]:
info(f"{item}")
if len(v) > 10:
info(c(f" … and {len(v) - 10} more", DIM))
else:
info(f"{c(k + ':', BOLD)} (empty)")
else:
kv(k, str(v))
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Alfred workflow simulator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(__doc__ or ""),
)
parser.add_argument("workflow", help="Workflow name (e.g. organize_media)")
parser.add_argument(
"--dry-run",
dest="dry_run",
action="store_true",
default=True,
help="Simulate steps without executing tools (default)",
)
parser.add_argument(
"--live",
action="store_true",
help="Actually execute tools against the real filesystem",
)
parser.add_argument(
"--source",
metavar="FOLDER_NAME",
help="Release folder name inside the download root (e.g. Oz.S03.1080p.WEBRip.x265-KONTRAST)",
)
parser.add_argument(
"--dest",
metavar="PATH",
help="Destination video file (in library, overrides resolve_destination)",
)
parser.add_argument(
"--download-folder",
metavar="PATH",
help="Original download folder (for create_seed_links)",
)
parser.add_argument(
"--imdb-id", metavar="ID", help="IMDb ID for identify_media (tt1234567)"
)
parser.add_argument(
"--release",
metavar="NAME",
help="Release name (e.g. Oz.S03.1080p.WEBRip.x265-KONTRAST)",
)
parser.add_argument(
"--tmdb-title", metavar="TITLE", help="Canonical title from TMDB (e.g. 'Oz')"
)
parser.add_argument(
"--tmdb-year",
metavar="YEAR",
type=int,
help="Start/release year from TMDB (e.g. 1997)",
)
parser.add_argument(
"--episode-title",
metavar="TITLE",
help="Episode title from TMDB for single-episode releases",
)
parser.add_argument(
"--seed", action="store_true", help='Answer "yes" to the seeding question'
)
parser.add_argument("--no-color", action="store_true")
return parser.parse_args()
def main() -> None:
global USE_COLOR
args = parse_args()
if args.no_color or not sys.stdout.isatty():
USE_COLOR = False
if args.live:
args.dry_run = False
# Load workflow
from alfred.agent.workflows.loader import WorkflowLoader
loader = WorkflowLoader()
workflow = loader.get(args.workflow)
if not workflow:
print(f"Erreur: workflow '{args.workflow}' introuvable.", file=sys.stderr)
print(f"Disponibles: {', '.join(loader.names())}", file=sys.stderr)
sys.exit(1)
# Load tools
if args.live:
try:
tools = _load_live_tools()
except Exception as e:
print(f"Erreur chargement des tools live: {e}", file=sys.stderr)
sys.exit(1)
else:
tools = DRY_RUN_TOOLS
runner = WorkflowRunner(workflow, tools, live=args.live, args=args)
runner.run()
if __name__ == "__main__":
main()