#!/usr/bin/env python3 """ run_workflow.py — Simulate an Alfred workflow step by step (dry-run or live). Usage: uv run testing/workflows/run_workflow.py organize_media [options] Options: --dry-run Print what each step would do without executing tools (default). --live Actually execute the tools (uses real filesystem + memory). --source PATH Source video file (download folder). --dest PATH Destination video file (library path). --download-folder P Original download folder (for create_seed_links). --imdb-id ID IMDb ID for identify_media step (tt1234567). --seed Answer "yes" to the seeding question. --no-color Disable ANSI colours. Examples: uv run testing/workflows/run_workflow.py organize_media --dry-run \\ --source "/downloads/Breaking.Bad.S01E01.mkv" \\ --dest "/tv/Breaking Bad/Season 01/Breaking Bad.S01E01.mkv" uv run testing/workflows/run_workflow.py organize_media --live \\ --source "/downloads/BB/Breaking.Bad.S01E01.mkv" \\ --dest "/tv/Breaking Bad/Season 01/Breaking Bad.S01E01.mkv" \\ --download-folder "/downloads/BB" --seed """ import argparse import sys import textwrap from pathlib import Path from typing import Any # Project root on sys.path _PROJECT_ROOT = Path(__file__).resolve().parents[2] if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) # --------------------------------------------------------------------------- # Colours # --------------------------------------------------------------------------- USE_COLOR = True RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" GREEN = "\033[32m" YELLOW = "\033[33m" RED = "\033[31m" CYAN = "\033[36m" BLUE = "\033[34m" MAGENTA = "\033[35m" def c(text: str, *codes: str) -> str: if not USE_COLOR: return text return "".join(codes) + str(text) + RESET def section(title: str) -> None: print() print(c("─" * 70, DIM)) print(c(f" {title}", BOLD, CYAN)) print(c("─" * 70, DIM)) def ok(msg: str) -> None: print(c(" ✓ ", GREEN, BOLD) + msg) def warn(msg: str) -> None: print(c(" ⚠ ", YELLOW, BOLD) + msg) def err(msg: str) -> None: print(c(" ✗ ", RED, BOLD) + msg) def info(msg: str) -> None: print(f" {msg}") def kv(key: str, val: str) -> None: print(f" {c(key + ':', BOLD)} {val}") # --------------------------------------------------------------------------- # Dry-run tool stubs # --------------------------------------------------------------------------- def _dry_list_folder(folder_type: str, path: str = ".") -> dict[str, Any]: return { "status": "ok", "folder_type": folder_type, "path": path, "entries": ["[dry-run — no real listing]"], "count": 1, } def _dry_find_media_imdb_id(**kwargs) -> dict[str, Any]: return { "status": "ok", "imdb_id": kwargs.get("imdb_id") or "tt0000000", "title": "Dry Run Show", "type": "tv_show", "year": 2024, } def _dry_resolve_destination( release_name: str, source_file: str, tmdb_title: str, tmdb_year: int, tmdb_episode_title: str | None = None, confirmed_folder: str | None = None, ) -> dict[str, Any]: from alfred.domain.media.release_parser import parse_release parsed = parse_release(release_name) ext = Path(source_file).suffix if parsed.is_movie: folder = parsed.movie_folder_name(tmdb_title, tmdb_year) fname = parsed.movie_filename(tmdb_title, tmdb_year, ext) return { "status": "ok", "library_file": f"/movies/{folder}/{fname}", "series_folder": f"/movies/{folder}", "series_folder_name": folder, "season_folder": None, "season_folder_name": None, "filename": fname, "is_new_series_folder": True, } season_folder = parsed.season_folder_name() show_folder = confirmed_folder or parsed.show_folder_name(tmdb_title, tmdb_year) fname = parsed.episode_filename(tmdb_episode_title, ext) if not parsed.is_season_pack else season_folder + ext return { "status": "ok", "library_file": f"/tv/{show_folder}/{season_folder}/{fname}", "series_folder": f"/tv/{show_folder}", "season_folder": f"/tv/{show_folder}/{season_folder}", "series_folder_name": show_folder, "season_folder_name": season_folder, "filename": fname, "is_new_series_folder": confirmed_folder is None, } def _dry_move_media(source: str, destination: str) -> dict[str, Any]: return { "status": "ok", "source": source, "destination": destination, "filename": Path(destination).name, "size": 0, } def _dry_manage_subtitles(source_video: str, destination_video: str) -> dict[str, Any]: return { "status": "ok", "video_path": destination_video, "placed": [], "placed_count": 0, "skipped_count": 0, } def _dry_create_seed_links(library_file: str, original_download_folder: str) -> dict[str, Any]: return { "status": "ok", "torrent_subfolder": f"/torrents/{Path(original_download_folder).name}", "linked_file": f"/torrents/{Path(original_download_folder).name}/{Path(library_file).name}", "copied_files": ["[dry-run — no real copy]"], "copied_count": 1, "skipped": [], } DRY_RUN_TOOLS: dict[str, Any] = { "list_folder": _dry_list_folder, "find_media_imdb_id": _dry_find_media_imdb_id, "resolve_destination": _dry_resolve_destination, "move_media": _dry_move_media, "manage_subtitles": _dry_manage_subtitles, "create_seed_links": _dry_create_seed_links, } # --------------------------------------------------------------------------- # Live tools # --------------------------------------------------------------------------- def _load_live_tools() -> dict[str, Any]: from alfred.agent.tools.filesystem import ( create_seed_links, list_folder, manage_subtitles, move_media, ) # find_media_imdb_id lives in the api tools try: from alfred.agent.tools.api import find_media_imdb_id except ImportError: def find_media_imdb_id(**kwargs): # type: ignore[misc] return {"status": "error", "error": "not_available", "message": "api tools not loaded"} return { "list_folder": list_folder, "find_media_imdb_id": find_media_imdb_id, "move_media": move_media, "manage_subtitles": manage_subtitles, "create_seed_links": create_seed_links, } # --------------------------------------------------------------------------- # Workflow runner # --------------------------------------------------------------------------- class WorkflowRunner: def __init__(self, workflow: dict, tools: dict[str, Any], live: bool, args: argparse.Namespace): self.workflow = workflow self.tools = tools self.live = live self.args = args self.context: dict[str, Any] = {} # step results accumulate here self.step_results: list[dict] = [] def run(self) -> None: name = self.workflow.get("name", "?") desc = self.workflow.get("description", "").strip() mode = c("LIVE", RED, BOLD) if self.live else c("DRY-RUN", YELLOW, BOLD) print() print(c("━" * 70, BOLD)) print(c(f" Alfred — Workflow Simulator [{mode}]", BOLD, MAGENTA)) print(c("━" * 70, BOLD)) kv("Workflow", c(name, CYAN, BOLD)) kv("Description", desc) kv("Tools allowed", ", ".join(self.workflow.get("tools", []))) steps = self.workflow.get("steps", []) for step in steps: self._run_step(step) section("SIMULATION TERMINÉE") ok(f"{len(self.step_results)} step(s) exécuté(s)") errors = [r for r in self.step_results if r.get("result", {}).get("status") == "error"] if errors: warn(f"{len(errors)} step(s) en erreur") for r in errors: err(f" {r['id']}: {r['result'].get('error')} — {r['result'].get('message')}") print() print(c("━" * 70, BOLD)) print() def _run_step(self, step: dict) -> None: step_id = step.get("id", "?") # --- ask_user step --- if "ask_user" in step: section(f"STEP [{step_id}] — ask_user") q = step["ask_user"].get("question", "") answers = step["ask_user"].get("answers", {}) info(c(f'Question: "{q}"', BOLD)) info(f"Réponses possibles: {', '.join(str(k) for k in answers.keys())}") answer = "yes" if self.args.seed else "no" # PyYAML parses bare yes/no as booleans — normalise keys to str answers_str = {str(k): v for k, v in answers.items()} next_step = answers_str.get(answer, {}).get("next_step", "update_library") ok(f"Réponse simulée: {c(answer, CYAN)} → next: {c(next_step, CYAN)}") self.context["seeding"] = (answer == "yes") self.context["ask_seeding_answer"] = answer self.context["next_after_ask"] = next_step # If "no", skip create_seed_links if answer == "no": self.context["skip_create_seed_links"] = True return # --- memory_write step --- if "memory_write" in step: section(f"STEP [{step_id}] — memory_write ({step['memory_write']})") if self.live: warn("memory_write: pas encore implémenté dans le simulator live") else: ok("(dry-run) Library entry would be written to LTM") self.step_results.append({"id": step_id, "result": {"status": "ok"}}) return # --- tool step --- tool_name = step.get("tool") if not tool_name: warn(f"Step '{step_id}' has no tool or ask_user — skipped") return # Skip create_seed_links if user said no to seeding if tool_name == "create_seed_links" and self.context.get("skip_create_seed_links"): section(f"STEP [{step_id}] — {tool_name}") warn("Skipped (user chose not to seed)") return section(f"STEP [{step_id}] — {c(tool_name, CYAN, BOLD)}") desc = step.get("description", "").strip() if desc: info(c(desc, DIM)) kwargs = self._build_kwargs(tool_name, step) for k, v in kwargs.items(): kv(k, str(v)) if tool_name not in self.tools: err(f"Tool '{tool_name}' not found in tool registry") self.step_results.append({"id": step_id, "result": {"status": "error", "error": "unknown_tool"}}) return try: result = self.tools[tool_name](**kwargs) except Exception as e: err(f"Tool raised an exception: {e}") self.step_results.append({"id": step_id, "result": {"status": "error", "error": str(e)}}) return self._print_result(result) self.context[step_id] = result self.step_results.append({"id": step_id, "result": result}) def _build_kwargs(self, tool_name: str, step: dict) -> dict[str, Any]: """Build tool kwargs from step params + CLI args + previous context.""" # Start from step-level params (static defaults from YAML) kwargs: dict[str, Any] = dict(step.get("params") or {}) a = self.args if tool_name == "list_folder": kwargs.setdefault("folder_type", "download") elif tool_name == "find_media_imdb_id": if a.imdb_id: kwargs["imdb_id"] = a.imdb_id elif tool_name == "resolve_destination": if a.release: kwargs["release_name"] = a.release elif a.source: kwargs.setdefault("release_name", Path(a.source).parent.name) if a.source: kwargs["source_file"] = a.source if a.tmdb_title: kwargs["tmdb_title"] = a.tmdb_title if a.tmdb_year: kwargs["tmdb_year"] = a.tmdb_year if a.episode_title: kwargs["tmdb_episode_title"] = a.episode_title elif tool_name == "move_media": # If resolve_destination ran, use its library_file as destination resolved = self.context.get("resolve_destination", {}) if a.source: kwargs["source"] = a.source dest = a.dest or resolved.get("library_file") if dest: kwargs["destination"] = dest elif tool_name == "manage_subtitles": resolved = self.context.get("resolve_destination", {}) if a.source: kwargs["source_video"] = a.source dest = a.dest or resolved.get("library_file") if dest: kwargs["destination_video"] = dest elif tool_name == "create_seed_links": resolved = self.context.get("resolve_destination", {}) library_file = a.dest or resolved.get("library_file") if library_file: kwargs["library_file"] = library_file if a.download_folder: kwargs["original_download_folder"] = a.download_folder elif a.source: kwargs.setdefault("original_download_folder", str(Path(a.source).parent)) return kwargs def _print_result(self, result: dict) -> None: status = result.get("status", "?") if status == "ok": ok(f"status={c('ok', GREEN)}") elif status == "needs_clarification": warn(f"status={c('needs_clarification', YELLOW)}") else: err(f"status={c(status, RED)} error={result.get('error')} msg={result.get('message')}") return # Pretty-print notable fields skip = {"status", "error", "message"} for k, v in result.items(): if k in skip: continue if isinstance(v, list): if v: info(c(f"{k}:", BOLD)) for item in v[:10]: info(f" • {item}") if len(v) > 10: info(c(f" … and {len(v) - 10} more", DIM)) else: info(f"{c(k + ':', BOLD)} (empty)") else: kv(k, str(v)) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Alfred workflow simulator", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent(__doc__ or ""), ) parser.add_argument("workflow", help="Workflow name (e.g. organize_media)") parser.add_argument("--dry-run", dest="dry_run", action="store_true", default=True, help="Simulate steps without executing tools (default)") parser.add_argument("--live", action="store_true", help="Actually execute tools against the real filesystem") parser.add_argument("--source", metavar="PATH", help="Source video file (in download folder)") parser.add_argument("--dest", metavar="PATH", help="Destination video file (in library, overrides resolve_destination)") parser.add_argument("--download-folder", metavar="PATH", help="Original download folder (for create_seed_links)") parser.add_argument("--imdb-id", metavar="ID", help="IMDb ID for identify_media (tt1234567)") parser.add_argument("--release", metavar="NAME", help="Release name (e.g. Oz.S03.1080p.WEBRip.x265-KONTRAST)") parser.add_argument("--tmdb-title", metavar="TITLE", help="Canonical title from TMDB (e.g. 'Oz')") parser.add_argument("--tmdb-year", metavar="YEAR", type=int, help="Start/release year from TMDB (e.g. 1997)") parser.add_argument("--episode-title", metavar="TITLE", help="Episode title from TMDB for single-episode releases") parser.add_argument("--seed", action="store_true", help='Answer "yes" to the seeding question') parser.add_argument("--no-color", action="store_true") return parser.parse_args() def main() -> None: global USE_COLOR args = parse_args() if args.no_color or not sys.stdout.isatty(): USE_COLOR = False if args.live: args.dry_run = False # Load workflow from alfred.agent.workflows.loader import WorkflowLoader loader = WorkflowLoader() workflow = loader.get(args.workflow) if not workflow: print(f"Erreur: workflow '{args.workflow}' introuvable.", file=sys.stderr) print(f"Disponibles: {', '.join(loader.names())}", file=sys.stderr) sys.exit(1) # Load tools if args.live: try: tools = _load_live_tools() except Exception as e: print(f"Erreur chargement des tools live: {e}", file=sys.stderr) sys.exit(1) else: tools = DRY_RUN_TOOLS runner = WorkflowRunner(workflow, tools, live=args.live, args=args) runner.run() if __name__ == "__main__": main()