Files
alfred/agent/tools/filesystem.py
T
2025-12-01 03:22:44 +01:00

449 lines
14 KiB
Python

"""Filesystem tools for managing folders and files with security."""
from typing import Dict, Any
from enum import Enum
from pathlib import Path
import logging
import os
from ..memory import Memory
logger = logging.getLogger(__name__)
class FolderName(Enum):
"""Types of folders that can be managed."""
DOWNLOAD = "download"
TVSHOW = "tvshow"
MOVIE = "movie"
TORRENT = "torrent"
class FilesystemError(Exception):
"""Base exception for filesystem operations."""
pass
class PathTraversalError(FilesystemError):
"""Raised when path traversal attack is detected."""
pass
def _validate_folder_name(folder_name: str) -> bool:
"""
Validate folder name against allowed values.
Args:
folder_name: Name to validate
Returns:
True if valid
Raises:
ValueError: If folder name is invalid
"""
valid_names = [fn.value for fn in FolderName]
if folder_name not in valid_names:
raise ValueError(
f"Invalid folder_name '{folder_name}'. Must be one of: {', '.join(valid_names)}"
)
return True
def _sanitize_path(path: str) -> str:
"""
Sanitize path to prevent path traversal attacks.
Args:
path: Path to sanitize
Returns:
Sanitized path
Raises:
PathTraversalError: If path contains dangerous patterns
"""
# Normalize path
normalized = os.path.normpath(path)
# Check for absolute paths
if os.path.isabs(normalized):
raise PathTraversalError("Absolute paths are not allowed")
# Check for parent directory references
if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized:
raise PathTraversalError("Parent directory references are not allowed")
# Check for null bytes
if "\x00" in normalized:
raise PathTraversalError("Null bytes in path are not allowed")
return normalized
def _is_safe_path(base_path: Path, target_path: Path) -> bool:
"""
Check if target path is within base path (prevents path traversal).
Args:
base_path: Base directory path
target_path: Target path to check
Returns:
True if safe, False otherwise
"""
try:
# Resolve both paths to absolute paths
base_resolved = base_path.resolve()
target_resolved = target_path.resolve()
# Check if target is relative to base
target_resolved.relative_to(base_resolved)
return True
except (ValueError, OSError):
return False
def set_path_for_folder(memory: Memory, folder_name: str, path_value: str) -> Dict[str, Any]:
"""
Set a path in the config with validation.
Args:
memory: Memory instance to store the configuration
folder_name: Name of folder to set (download, tvshow, movie, torrent)
path_value: Absolute path to the folder
Returns:
Dict with status or error information
"""
try:
# Validate folder name
_validate_folder_name(folder_name)
# Convert to Path object for better handling
path_obj = Path(path_value).resolve()
# Validate path exists and is a directory
if not path_obj.exists():
logger.warning(f"Path does not exist: {path_value}")
return {
"error": "invalid_path",
"message": f"Path does not exist: {path_value}"
}
if not path_obj.is_dir():
logger.warning(f"Path is not a directory: {path_value}")
return {
"error": "invalid_path",
"message": f"Path is not a directory: {path_value}"
}
# Check if path is readable
if not os.access(path_obj, os.R_OK):
logger.warning(f"Path is not readable: {path_value}")
return {
"error": "permission_denied",
"message": f"Path is not readable: {path_value}"
}
# Store in memory
config = memory.get("config", {})
config[f"{folder_name}_folder"] = str(path_obj)
memory.set("config", config)
logger.info(f"Set {folder_name}_folder to: {path_obj}")
return {
"status": "ok",
"folder_name": folder_name,
"path": str(path_obj)
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {"error": "validation_failed", "message": str(e)}
except Exception as e:
logger.error(f"Unexpected error setting path: {e}", exc_info=True)
return {"error": "internal_error", "message": "Failed to set path"}
def list_folder(memory: Memory, folder_type: str, path: str = ".") -> Dict[str, Any]:
"""
List contents of a folder with security checks.
Args:
memory: Memory instance to retrieve the configuration
folder_type: Type of folder to list (download, tvshow, movie, torrent)
path: Relative path within the folder (default: ".")
Returns:
Dict with folder contents or error information
"""
try:
# Validate folder type
_validate_folder_name(folder_type)
# Sanitize the path
safe_path = _sanitize_path(path)
# Get root folder from config
folder_key = f"{folder_type}_folder"
config = memory.get("config", {})
if folder_key not in config or not config[folder_key]:
logger.warning(f"Folder not configured: {folder_type}")
return {
"error": "folder_not_set",
"message": f"{folder_type.capitalize()} folder not set in config."
}
root = Path(config[folder_key])
target = root / safe_path
# Security check: ensure target is within root
if not _is_safe_path(root, target):
logger.warning(f"Path traversal attempt detected: {path}")
return {
"error": "forbidden",
"message": "Access denied: path outside allowed directory"
}
# Check if target exists
if not target.exists():
logger.warning(f"Path does not exist: {target}")
return {
"error": "not_found",
"message": f"Path does not exist: {safe_path}"
}
# Check if target is a directory
if not target.is_dir():
logger.warning(f"Path is not a directory: {target}")
return {
"error": "not_a_directory",
"message": f"Path is not a directory: {safe_path}"
}
# List directory contents
try:
entries = [entry.name for entry in target.iterdir()]
logger.debug(f"Listed {len(entries)} entries in {target}")
return {
"status": "ok",
"folder_type": folder_type,
"path": safe_path,
"entries": sorted(entries),
"count": len(entries)
}
except PermissionError:
logger.warning(f"Permission denied accessing: {target}")
return {
"error": "permission_denied",
"message": f"Permission denied accessing: {safe_path}"
}
except PathTraversalError as e:
logger.warning(f"Path traversal attempt: {e}")
return {
"error": "forbidden",
"message": str(e)
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {"error": "validation_failed", "message": str(e)}
except Exception as e:
logger.error(f"Unexpected error listing folder: {e}", exc_info=True)
return {"error": "internal_error", "message": "Failed to list folder"}
def move_file(path: str, destination: str) -> Dict[str, Any]:
"""
Move a file from one location to another with safety checks.
This function is designed to safely move files from downloads to movies/series
folders with comprehensive validation and error handling to prevent data loss.
Args:
path: Source file path (absolute or relative)
destination: Destination file path (absolute or relative)
Returns:
Dict with status or error information:
- Success: {"status": "ok", "source": str, "destination": str, "size": int}
- Error: {"error": str, "message": str}
Safety features:
- Validates source file exists and is readable
- Validates destination directory exists and is writable
- Prevents overwriting existing files
- Verifies file integrity after move (size check)
- Atomic operation using shutil.move
- Comprehensive logging
Example:
>>> result = move_file(
... "/downloads/movie.mkv",
... "/movies/Inception (2010)/movie.mkv"
... )
>>> print(result)
{'status': 'ok', 'source': '...', 'destination': '...', 'size': 1234567890}
"""
import shutil
try:
# Convert to Path objects
source_path = Path(path).resolve()
dest_path = Path(destination).resolve()
logger.info(f"Moving file from {source_path} to {dest_path}")
# === VALIDATION: Source file ===
# Check source exists
if not source_path.exists():
logger.error(f"Source file does not exist: {source_path}")
return {
"error": "source_not_found",
"message": f"Source file does not exist: {path}"
}
# Check source is a file (not a directory)
if not source_path.is_file():
logger.error(f"Source is not a file: {source_path}")
return {
"error": "source_not_file",
"message": f"Source is not a file: {path}"
}
# Check source is readable
if not os.access(source_path, os.R_OK):
logger.error(f"Source file is not readable: {source_path}")
return {
"error": "permission_denied",
"message": f"Source file is not readable: {path}"
}
# Get source file size for verification
source_size = source_path.stat().st_size
logger.debug(f"Source file size: {source_size} bytes")
# === VALIDATION: Destination ===
# Check destination parent directory exists
dest_parent = dest_path.parent
if not dest_parent.exists():
logger.error(f"Destination directory does not exist: {dest_parent}")
return {
"error": "destination_dir_not_found",
"message": f"Destination directory does not exist: {dest_parent}"
}
# Check destination parent is a directory
if not dest_parent.is_dir():
logger.error(f"Destination parent is not a directory: {dest_parent}")
return {
"error": "destination_not_dir",
"message": f"Destination parent is not a directory: {dest_parent}"
}
# Check destination parent is writable
if not os.access(dest_parent, os.W_OK):
logger.error(f"Destination directory is not writable: {dest_parent}")
return {
"error": "permission_denied",
"message": f"Destination directory is not writable: {dest_parent}"
}
# Check destination file doesn't already exist
if dest_path.exists():
logger.warning(f"Destination file already exists: {dest_path}")
return {
"error": "destination_exists",
"message": f"Destination file already exists: {destination}"
}
# === SAFETY CHECK: Prevent moving to same location ===
if source_path == dest_path:
logger.warning("Source and destination are the same")
return {
"error": "same_location",
"message": "Source and destination are the same"
}
# === PERFORM MOVE ===
logger.info(f"Moving file: {source_path.name} ({source_size} bytes)")
try:
# Use shutil.move for atomic operation
# This handles cross-filesystem moves automatically
shutil.move(str(source_path), str(dest_path))
logger.info(f"File moved successfully to {dest_path}")
except Exception as e:
logger.error(f"Failed to move file: {e}", exc_info=True)
return {
"error": "move_failed",
"message": f"Failed to move file: {str(e)}"
}
# === VERIFICATION: Ensure file was moved correctly ===
# Check destination file exists
if not dest_path.exists():
logger.error("Destination file does not exist after move!")
# Try to recover by checking if source still exists
if source_path.exists():
logger.info("Source file still exists, move may have failed")
return {
"error": "move_verification_failed",
"message": "File was not moved successfully (destination not found)"
}
else:
logger.critical("Both source and destination missing after move!")
return {
"error": "file_lost",
"message": "CRITICAL: File missing after move operation"
}
# Check destination file size matches source
dest_size = dest_path.stat().st_size
if dest_size != source_size:
logger.error(f"File size mismatch! Source: {source_size}, Dest: {dest_size}")
return {
"error": "size_mismatch",
"message": f"File size mismatch after move (expected {source_size}, got {dest_size})"
}
# Check source file no longer exists
if source_path.exists():
logger.warning("Source file still exists after move (copy instead of move?)")
# This is not necessarily an error (shutil.move copies across filesystems)
# but we should log it
# === SUCCESS ===
logger.info(f"File successfully moved and verified: {dest_path.name}")
return {
"status": "ok",
"source": str(source_path),
"destination": str(dest_path),
"filename": dest_path.name,
"size": dest_size
}
except PermissionError as e:
logger.error(f"Permission denied: {e}")
return {
"error": "permission_denied",
"message": f"Permission denied: {str(e)}"
}
except OSError as e:
logger.error(f"OS error during move: {e}", exc_info=True)
return {
"error": "os_error",
"message": f"OS error: {str(e)}"
}