"""API tools for interacting with external services.""" import logging from typing import Any from application.movies import SearchMovieUseCase from application.torrents import AddTorrentUseCase, SearchTorrentsUseCase from infrastructure.api.knaben import knaben_client from infrastructure.api.qbittorrent import qbittorrent_client from infrastructure.api.tmdb import tmdb_client from infrastructure.persistence import get_memory logger = logging.getLogger(__name__) def find_media_imdb_id(media_title: str) -> dict[str, Any]: """ Find the IMDb ID for a given media title using TMDB API. Args: media_title: Title of the media to search for. Returns: Dict with IMDb ID and media info, or error details. """ use_case = SearchMovieUseCase(tmdb_client) response = use_case.execute(media_title) result = response.to_dict() if result.get("status") == "ok": memory = get_memory() memory.stm.set_entity( "last_media_search", { "title": result.get("title"), "imdb_id": result.get("imdb_id"), "media_type": result.get("media_type"), "tmdb_id": result.get("tmdb_id"), }, ) memory.stm.set_topic("searching_media") logger.debug(f"Stored media search result in STM: {result.get('title')}") return result def find_torrent(media_title: str) -> dict[str, Any]: """ Find torrents for a given media title using Knaben API. Results are stored in episodic memory so the user can reference them by index (e.g., "download the 3rd one"). Args: media_title: Title of the media to search for. Returns: Dict with torrent list or error details. """ logger.info(f"Searching torrents for: {media_title}") use_case = SearchTorrentsUseCase(knaben_client) response = use_case.execute(media_title, limit=10) result = response.to_dict() if result.get("status") == "ok": memory = get_memory() torrents = result.get("torrents", []) memory.episodic.store_search_results( query=media_title, results=torrents, search_type="torrent" ) memory.stm.set_topic("selecting_torrent") logger.info(f"Stored {len(torrents)} torrent results in episodic memory") return result def get_torrent_by_index(index: int) -> dict[str, Any]: """ Get a torrent from the last search results by its index. Allows the user to reference results by number after a search. Args: index: 1-based index of the torrent in the search results. Returns: Dict with torrent data or error if not found. """ logger.info(f"Getting torrent at index: {index}") memory = get_memory() if memory.episodic.last_search_results: results_count = len(memory.episodic.last_search_results.get("results", [])) query = memory.episodic.last_search_results.get("query", "unknown") logger.debug(f"Episodic memory has {results_count} results from: {query}") else: logger.warning("No search results in episodic memory") result = memory.episodic.get_result_by_index(index) if result: logger.info(f"Found torrent at index {index}: {result.get('name', 'unknown')}") return {"status": "ok", "torrent": result} logger.warning(f"No torrent found at index {index}") return { "status": "error", "error": "not_found", "message": f"No torrent found at index {index}. Search for torrents first.", } def add_torrent_to_qbittorrent(magnet_link: str) -> dict[str, Any]: """ Add a torrent to qBittorrent using a magnet link. Args: magnet_link: Magnet link of the torrent to add. Returns: Dict with success status or error details. """ logger.info("Adding torrent to qBittorrent") use_case = AddTorrentUseCase(qbittorrent_client) response = use_case.execute(magnet_link) result = response.to_dict() if result.get("status") == "ok": memory = get_memory() last_search = memory.episodic.get_search_results() torrent_name = "Unknown" if last_search: for t in last_search.get("results", []): if t.get("magnet") == magnet_link: torrent_name = t.get("name", "Unknown") break memory.episodic.add_active_download( { "task_id": magnet_link[:20], "name": torrent_name, "magnet": magnet_link, "progress": 0, "status": "queued", } ) memory.stm.set_topic("downloading") memory.stm.end_workflow() logger.info(f"Added download to episodic memory: {torrent_name}") return result def add_torrent_by_index(index: int) -> dict[str, Any]: """ Add a torrent from the last search results by its index. Combines get_torrent_by_index and add_torrent_to_qbittorrent. Args: index: 1-based index of the torrent in the search results. Returns: Dict with success status or error details. """ logger.info(f"Adding torrent by index: {index}") torrent_result = get_torrent_by_index(index) if torrent_result.get("status") != "ok": return torrent_result torrent = torrent_result.get("torrent", {}) magnet = torrent.get("magnet") if not magnet: logger.error("Torrent has no magnet link") return { "status": "error", "error": "no_magnet", "message": "The selected torrent has no magnet link", } logger.info(f"Adding torrent: {torrent.get('name', 'unknown')}") result = add_torrent_to_qbittorrent(magnet) if result.get("status") == "ok": result["torrent_name"] = torrent.get("name", "Unknown") return result