"""Filesystem tools for managing folders and files with security.""" from typing import Dict, Any from enum import Enum from pathlib import Path import logging import os from ..memory import Memory logger = logging.getLogger(__name__) class FolderName(Enum): """Types of folders that can be managed.""" DOWNLOAD = "download" TVSHOW = "tvshow" MOVIE = "movie" TORRENT = "torrent" class FilesystemError(Exception): """Base exception for filesystem operations.""" pass class PathTraversalError(FilesystemError): """Raised when path traversal attack is detected.""" pass def _validate_folder_name(folder_name: str) -> bool: """ Validate folder name against allowed values. Args: folder_name: Name to validate Returns: True if valid Raises: ValueError: If folder name is invalid """ valid_names = [fn.value for fn in FolderName] if folder_name not in valid_names: raise ValueError( f"Invalid folder_name '{folder_name}'. Must be one of: {', '.join(valid_names)}" ) return True def _sanitize_path(path: str) -> str: """ Sanitize path to prevent path traversal attacks. Args: path: Path to sanitize Returns: Sanitized path Raises: PathTraversalError: If path contains dangerous patterns """ # Normalize path normalized = os.path.normpath(path) # Check for absolute paths if os.path.isabs(normalized): raise PathTraversalError("Absolute paths are not allowed") # Check for parent directory references if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized: raise PathTraversalError("Parent directory references are not allowed") # Check for null bytes if "\x00" in normalized: raise PathTraversalError("Null bytes in path are not allowed") return normalized def _is_safe_path(base_path: Path, target_path: Path) -> bool: """ Check if target path is within base path (prevents path traversal). Args: base_path: Base directory path target_path: Target path to check Returns: True if safe, False otherwise """ try: # Resolve both paths to absolute paths base_resolved = base_path.resolve() target_resolved = target_path.resolve() # Check if target is relative to base target_resolved.relative_to(base_resolved) return True except (ValueError, OSError): return False def set_path_for_folder(memory: Memory, folder_name: str, path_value: str) -> Dict[str, Any]: """ Set a path in the config with validation. Args: memory: Memory instance to store the configuration folder_name: Name of folder to set (download, tvshow, movie, torrent) path_value: Absolute path to the folder Returns: Dict with status or error information """ try: # Validate folder name _validate_folder_name(folder_name) # Convert to Path object for better handling path_obj = Path(path_value).resolve() # Validate path exists and is a directory if not path_obj.exists(): logger.warning(f"Path does not exist: {path_value}") return { "error": "invalid_path", "message": f"Path does not exist: {path_value}" } if not path_obj.is_dir(): logger.warning(f"Path is not a directory: {path_value}") return { "error": "invalid_path", "message": f"Path is not a directory: {path_value}" } # Check if path is readable if not os.access(path_obj, os.R_OK): logger.warning(f"Path is not readable: {path_value}") return { "error": "permission_denied", "message": f"Path is not readable: {path_value}" } # Store in memory config = memory.get("config", {}) config[f"{folder_name}_folder"] = str(path_obj) memory.set("config", config) logger.info(f"Set {folder_name}_folder to: {path_obj}") return { "status": "ok", "folder_name": folder_name, "path": str(path_obj) } except ValueError as e: logger.error(f"Validation error: {e}") return {"error": "validation_failed", "message": str(e)} except Exception as e: logger.error(f"Unexpected error setting path: {e}", exc_info=True) return {"error": "internal_error", "message": "Failed to set path"} def list_folder(memory: Memory, folder_type: str, path: str = ".") -> Dict[str, Any]: """ List contents of a folder with security checks. Args: memory: Memory instance to retrieve the configuration folder_type: Type of folder to list (download, tvshow, movie, torrent) path: Relative path within the folder (default: ".") Returns: Dict with folder contents or error information """ try: # Validate folder type _validate_folder_name(folder_type) # Sanitize the path safe_path = _sanitize_path(path) # Get root folder from config folder_key = f"{folder_type}_folder" config = memory.get("config", {}) if folder_key not in config or not config[folder_key]: logger.warning(f"Folder not configured: {folder_type}") return { "error": "folder_not_set", "message": f"{folder_type.capitalize()} folder not set in config." } root = Path(config[folder_key]) target = root / safe_path # Security check: ensure target is within root if not _is_safe_path(root, target): logger.warning(f"Path traversal attempt detected: {path}") return { "error": "forbidden", "message": "Access denied: path outside allowed directory" } # Check if target exists if not target.exists(): logger.warning(f"Path does not exist: {target}") return { "error": "not_found", "message": f"Path does not exist: {safe_path}" } # Check if target is a directory if not target.is_dir(): logger.warning(f"Path is not a directory: {target}") return { "error": "not_a_directory", "message": f"Path is not a directory: {safe_path}" } # List directory contents try: entries = [entry.name for entry in target.iterdir()] logger.debug(f"Listed {len(entries)} entries in {target}") return { "status": "ok", "folder_type": folder_type, "path": safe_path, "entries": sorted(entries), "count": len(entries) } except PermissionError: logger.warning(f"Permission denied accessing: {target}") return { "error": "permission_denied", "message": f"Permission denied accessing: {safe_path}" } except PathTraversalError as e: logger.warning(f"Path traversal attempt: {e}") return { "error": "forbidden", "message": str(e) } except ValueError as e: logger.error(f"Validation error: {e}") return {"error": "validation_failed", "message": str(e)} except Exception as e: logger.error(f"Unexpected error listing folder: {e}", exc_info=True) return {"error": "internal_error", "message": "Failed to list folder"}