1 Commits

Author SHA1 Message Date
francwa afc0024b37 infra!: added CI/CD pipeline and made various improvements
CI/CD Awesome Pipeline / Test (push) Successful in 1m37s
CI/CD Awesome Pipeline / Build & Push to Registry (push) Has been skipped
2025-12-21 08:51:31 +01:00
434 changed files with 10656 additions and 43229 deletions
-80
View File
@@ -1,80 +0,0 @@
# --- IMPORTANT ---
# Settings are split across multiple files for clarity.
# Files (loaded in this order, last wins):
# .env.alfred — app config and service addresses (safe to commit)
# .env.secrets — generated secrets, passwords, URIs and API keys (DO NOT COMMIT)
# .env.make — build metadata synced from pyproject.toml (safe to commit)
#
# To customize: edit .env.alfred for config, .env.secrets for secrets.
# --- Alfred ---
MAX_HISTORY_MESSAGES=10
MAX_TOOL_ITERATIONS=10
REQUEST_TIMEOUT=30
# LLM Settings
LLM_TEMPERATURE=0.2
# Persistence
DATA_STORAGE_DIR=data
# Network
HOST=0.0.0.0
PORT=3080
# --- DATABASES ---
# Passwords and connection URIs are auto-generated in .env.secrets.
# Edit host/port/user/dbname here if needed.
# MongoDB (Application Data)
MONGO_HOST=mongodb
MONGO_PORT=27017
MONGO_USER=alfred
MONGO_DB_NAME=alfred
# PostgreSQL (Vector Database / RAG)
POSTGRES_HOST=vectordb
POSTGRES_PORT=5432
POSTGRES_USER=alfred
POSTGRES_DB_NAME=alfred
# --- EXTERNAL SERVICES ---
# TMDB — Media metadata (required). Get your key at https://www.themoviedb.org/
# → TMDB_API_KEY goes in .env.secrets
TMDB_BASE_URL=https://api.themoviedb.org/3
# qBittorrent
# → QBITTORRENT_PASSWORD goes in .env.secrets
QBITTORRENT_URL=https://qb.lan.anustart.top
QBITTORRENT_USERNAME=letmein
QBITTORRENT_PORT=16140
# Path translation: host-side prefix → container-side prefix
QBITTORRENT_HOST_PATH=/mnt/testipool
QBITTORRENT_CONTAINER_PATH=/mnt/data
# Meilisearch
# → MEILI_MASTER_KEY goes in .env.secrets
# MEILI_ENABLED=false # KEY DOESN'T EXISTS => SEARCH IS THE PROPER KEY
SEARCH=false
MEILI_NO_ANALYTICS=true
MEILI_HOST=http://meilisearch:7700
# --- LLM CONFIGURATION ---
# Providers: local, openai, anthropic, deepseek, google, kimi
# → API keys go in .env.secrets
DEFAULT_LLM_PROVIDER=deepseek
# Local LLM (Ollama)
#OLLAMA_BASE_URL=http://ollama:11434
#OLLAMA_MODEL=llama3.3:latest
OLLAMA_BASE_URL=http://10.0.0.11:11434
OLLAMA_MODEL=glm-4.7-flash:latest
# --- RAG ENGINE ---
RAG_ENABLED=TRUE
RAG_API_URL=http://rag_api:8000
RAG_API_PORT=8000
EMBEDDINGS_PROVIDER=ollama
EMBEDDINGS_MODEL=nomic-embed-text
+57 -61
View File
@@ -1,73 +1,69 @@
# --- IMPORTANT --- # Agent Media - Environment Variables
# Settings are split across multiple files for clarity.
# Files (loaded in this order, last wins):
# .env.alfred — app config and service addresses (safe to commit)
# .env.secrets — generated secrets, passwords, URIs and API keys (DO NOT COMMIT)
# .env.make — build metadata synced from pyproject.toml (safe to commit)
#
# To customize: edit .env.alfred for config, .env.secrets for secrets.
# --- Alfred --- # LibreChat Security Keys
MAX_HISTORY_MESSAGES=10 # Generate secure keys with: openssl rand -base64 32
MAX_TOOL_ITERATIONS=10 JWT_SECRET=your-super-secret-jwt-key-change-this-in-production
REQUEST_TIMEOUT=30 JWT_REFRESH_SECRET=your-super-secret-refresh-key-change-this-too
# LLM Settings # Generate with: openssl rand -hex 16 (for CREDS_KEY)
LLM_TEMPERATURE=0.2 CREDS_KEY=your-32-character-secret-key-here
# Persistence # Generate with: openssl rand -hex 8 (for CREDS_IV)
DATA_STORAGE_DIR=data CREDS_IV=your-16-character-iv-here
# Network # LibreChat Configuration
HOST=0.0.0.0 DOMAIN_CLIENT=http://localhost:3080
PORT=3080 DOMAIN_SERVER=http://localhost:3080
# --- DATABASES --- # Session expiry (in milliseconds)
# Passwords and connection URIs are auto-generated in .env.secrets. # Default: 15 minutes
# Edit host/port/user/dbname here if needed. SESSION_EXPIRY=900000
# MongoDB (Application Data) # Refresh token expiry (in milliseconds)
MONGO_HOST=mongodb # Default: 7 days
MONGO_PORT=27017 REFRESH_TOKEN_EXPIRY=604800000
MONGO_USER=alfred
MONGO_DB_NAME=LibreChat
# PostgreSQL (Vector Database / RAG) # Meilisearch Configuration
POSTGRES_HOST=vectordb # Master key for Meilisearch (generate with: openssl rand -base64 32)
POSTGRES_PORT=5432 MEILI_MASTER_KEY=DrhYf7zENyR6AlUCKmnz0eYASOQdl6zxH7s7MKFSfFU
POSTGRES_USER=alfred
POSTGRES_DB_NAME=alfred
# --- EXTERNAL SERVICES --- # PostgreSQL Configuration (for RAG API)
POSTGRES_DB=librechat_rag
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
# TMDB — Media metadata (required). Get your key at https://www.themoviedb.org/ # RAG API Configuration (Vector Database)
# → TMDB_API_KEY goes in .env.secrets RAG_COLLECTION_NAME=testcollection
TMDB_BASE_URL=https://api.themoviedb.org/3 RAG_EMBEDDINGS_PROVIDER=openai
RAG_EMBEDDINGS_MODEL=text-embedding-3-small
# qBittorrent # API Keys
# → QBITTORRENT_PASSWORD goes in .env.secrets # OpenAI API Key (required for RAG embeddings)
QBITTORRENT_URL=http://qbittorrent:16140 OPENAI_API_KEY=your-openai-api-key-here
# Deepseek API Key (for LLM in agent-brain)
DEEPSEEK_API_KEY=your-deepseek-api-key-here
# Agent Brain Configuration
# LLM Provider (deepseek or ollama)
LLM_PROVIDER=deepseek
# Memory storage directory (inside container)
MEMORY_STORAGE_DIR=/data/memory
# API Key for agent-brain (used by LibreChat custom endpoint)
AGENT_BRAIN_API_KEY=agent-brain-secret-key
# External Services (Optional)
# TMDB API Key (for movie metadata)
TMDB_API_KEY=your-tmdb-key
# qBittorrent Configuration
QBITTORRENT_URL=http://localhost:8080
QBITTORRENT_USERNAME=admin QBITTORRENT_USERNAME=admin
QBITTORRENT_PORT=16140 QBITTORRENT_PASSWORD=adminpass
# Meilisearch # Debug Options
# → MEILI_MASTER_KEY goes in .env.secrets DEBUG_LOGGING=false
MEILI_ENABLED=FALSE DEBUG_CONSOLE=false
MEILI_NO_ANALYTICS=TRUE
MEILI_HOST=http://meilisearch:7700
# --- LLM CONFIGURATION ---
# Providers: local, openai, anthropic, deepseek, google, kimi
# → API keys go in .env.secrets
DEFAULT_LLM_PROVIDER=local
# Local LLM (Ollama)
OLLAMA_BASE_URL=http://ollama:11434
OLLAMA_MODEL=llama3.3:latest
# --- RAG ENGINE ---
RAG_ENABLED=TRUE
RAG_API_URL=http://rag_api:8000
RAG_API_PORT=8000
EMBEDDINGS_PROVIDER=ollama
EMBEDDINGS_MODEL=nomic-embed-text
-878
View File
@@ -1,878 +0,0 @@
#=====================================================================#
# LibreChat Configuration #
#=====================================================================#
# Please refer to the reference documentation for assistance #
# with configuring your LibreChat environment. #
# #
# https://www.librechat.ai/docs/configuration/dotenv #
#=====================================================================#
#==================================================#
# Server Configuration #
#==================================================#
HOST=localhost
PORT=3080
MONGO_URI=mongodb://127.0.0.1:27017/LibreChat
#The maximum number of connections in the connection pool. */
MONGO_MAX_POOL_SIZE=
#The minimum number of connections in the connection pool. */
MONGO_MIN_POOL_SIZE=
#The maximum number of connections that may be in the process of being established concurrently by the connection pool. */
MONGO_MAX_CONNECTING=
#The maximum number of milliseconds that a connection can remain idle in the pool before being removed and closed. */
MONGO_MAX_IDLE_TIME_MS=
#The maximum time in milliseconds that a thread can wait for a connection to become available. */
MONGO_WAIT_QUEUE_TIMEOUT_MS=
# Set to false to disable automatic index creation for all models associated with this connection. */
MONGO_AUTO_INDEX=
# Set to `false` to disable Mongoose automatically calling `createCollection()` on every model created on this connection. */
MONGO_AUTO_CREATE=
DOMAIN_CLIENT=http://localhost:3080
DOMAIN_SERVER=http://localhost:3080
NO_INDEX=true
# Use the address that is at most n number of hops away from the Express application.
# req.socket.remoteAddress is the first hop, and the rest are looked for in the X-Forwarded-For header from right to left.
# A value of 0 means that the first untrusted address would be req.socket.remoteAddress, i.e. there is no reverse proxy.
# Defaulted to 1.
TRUST_PROXY=1
# Minimum password length for user authentication
# Default: 8
# Note: When using LDAP authentication, you may want to set this to 1
# to bypass local password validation, as LDAP servers handle their own
# password policies.
# MIN_PASSWORD_LENGTH=8
# When enabled, the app will continue running after encountering uncaught exceptions
# instead of exiting the process. Not recommended for production unless necessary.
# CONTINUE_ON_UNCAUGHT_EXCEPTION=false
#===============#
# JSON Logging #
#===============#
# Use when process console logs in cloud deployment like GCP/AWS
CONSOLE_JSON=false
#===============#
# Debug Logging #
#===============#
DEBUG_LOGGING=true
DEBUG_CONSOLE=false
# Set to true to enable agent debug logging
AGENT_DEBUG_LOGGING=false
# Enable memory diagnostics (logs heap/RSS snapshots every 60s, auto-enabled with --inspect)
# MEM_DIAG=true
#=============#
# Permissions #
#=============#
# UID=1000
# GID=1000
#==============#
# Node Options #
#==============#
# NOTE: NODE_MAX_OLD_SPACE_SIZE is NOT recognized by Node.js directly.
# This variable is used as a build argument for Docker or CI/CD workflows,
# and is NOT used by Node.js to set the heap size at runtime.
# To configure Node.js memory, use NODE_OPTIONS, e.g.:
# NODE_OPTIONS="--max-old-space-size=6144"
# See: https://nodejs.org/api/cli.html#--max-old-space-sizesize-in-mib
NODE_MAX_OLD_SPACE_SIZE=6144
#===============#
# Configuration #
#===============#
# Use an absolute path, a relative path, or a URL
# CONFIG_PATH="/alternative/path/to/librechat.yaml"
#==================#
# Langfuse Tracing #
#==================#
# Get Langfuse API keys for your project from the project settings page: https://cloud.langfuse.com
# LANGFUSE_PUBLIC_KEY=
# LANGFUSE_SECRET_KEY=
# LANGFUSE_BASE_URL=
#===================================================#
# Endpoints #
#===================================================#
# ENDPOINTS=openAI,assistants,azureOpenAI,google,anthropic
PROXY=
#===================================#
# Known Endpoints - librechat.yaml #
#===================================#
# https://www.librechat.ai/docs/configuration/librechat_yaml/ai_endpoints
# ANYSCALE_API_KEY=
# APIPIE_API_KEY=
# COHERE_API_KEY=
# DEEPSEEK_API_KEY=
# DATABRICKS_API_KEY=
# FIREWORKS_API_KEY=
# GROQ_API_KEY=
# HUGGINGFACE_TOKEN=
# MISTRAL_API_KEY=
# OPENROUTER_KEY=
# PERPLEXITY_API_KEY=
# SHUTTLEAI_API_KEY=
# TOGETHERAI_API_KEY=
# UNIFY_API_KEY=
# XAI_API_KEY=
#============#
# Anthropic #
#============#
ANTHROPIC_API_KEY=user_provided
# ANTHROPIC_MODELS=claude-sonnet-4-6,claude-opus-4-6,claude-opus-4-20250514,claude-sonnet-4-20250514,claude-3-7-sonnet-20250219,claude-3-5-sonnet-20241022,claude-3-5-haiku-20241022,claude-3-opus-20240229,claude-3-sonnet-20240229,claude-3-haiku-20240307
# ANTHROPIC_REVERSE_PROXY=
# Set to true to use Anthropic models through Google Vertex AI instead of direct API
# ANTHROPIC_USE_VERTEX=
# ANTHROPIC_VERTEX_REGION=us-east5
#============#
# Azure #
#============#
# Note: these variables are DEPRECATED
# Use the `librechat.yaml` configuration for `azureOpenAI` instead
# You may also continue to use them if you opt out of using the `librechat.yaml` configuration
# AZURE_OPENAI_DEFAULT_MODEL=gpt-3.5-turbo # Deprecated
# AZURE_OPENAI_MODELS=gpt-3.5-turbo,gpt-4 # Deprecated
# AZURE_USE_MODEL_AS_DEPLOYMENT_NAME=TRUE # Deprecated
# AZURE_API_KEY= # Deprecated
# AZURE_OPENAI_API_INSTANCE_NAME= # Deprecated
# AZURE_OPENAI_API_DEPLOYMENT_NAME= # Deprecated
# AZURE_OPENAI_API_VERSION= # Deprecated
# AZURE_OPENAI_API_COMPLETIONS_DEPLOYMENT_NAME= # Deprecated
# AZURE_OPENAI_API_EMBEDDINGS_DEPLOYMENT_NAME= # Deprecated
#=================#
# AWS Bedrock #
#=================#
# BEDROCK_AWS_DEFAULT_REGION=us-east-1 # A default region must be provided
# BEDROCK_AWS_ACCESS_KEY_ID=someAccessKey
# BEDROCK_AWS_SECRET_ACCESS_KEY=someSecretAccessKey
# BEDROCK_AWS_SESSION_TOKEN=someSessionToken
# Note: This example list is not meant to be exhaustive. If omitted, all known, supported model IDs will be included for you.
# BEDROCK_AWS_MODELS=anthropic.claude-sonnet-4-6,anthropic.claude-opus-4-6-v1,anthropic.claude-3-5-sonnet-20240620-v1:0,meta.llama3-1-8b-instruct-v1:0
# Cross-region inference model IDs: us.anthropic.claude-sonnet-4-6,us.anthropic.claude-opus-4-6-v1,global.anthropic.claude-opus-4-6-v1
# See all Bedrock model IDs here: https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html#model-ids-arns
# Notes on specific models:
# The following models are not support due to not supporting streaming:
# ai21.j2-mid-v1
# The following models are not support due to not supporting conversation history:
# ai21.j2-ultra-v1, cohere.command-text-v14, cohere.command-light-text-v14
#============#
# Google #
#============#
GOOGLE_KEY=user_provided
# GOOGLE_REVERSE_PROXY=
# Some reverse proxies do not support the X-goog-api-key header, uncomment to pass the API key in Authorization header instead.
# GOOGLE_AUTH_HEADER=true
# Gemini API (AI Studio)
# GOOGLE_MODELS=gemini-3.1-pro-preview,gemini-3.1-pro-preview-customtools,gemini-3.1-flash-lite-preview,gemini-2.5-pro,gemini-2.5-flash,gemini-2.5-flash-lite,gemini-2.0-flash,gemini-2.0-flash-lite
# Vertex AI
# GOOGLE_MODELS=gemini-3.1-pro-preview,gemini-3.1-pro-preview-customtools,gemini-3.1-flash-lite-preview,gemini-2.5-pro,gemini-2.5-flash,gemini-2.5-flash-lite,gemini-2.0-flash-001,gemini-2.0-flash-lite-001
# GOOGLE_TITLE_MODEL=gemini-2.0-flash-lite-001
# Google Cloud region for Vertex AI (used by both chat and image generation)
# GOOGLE_LOC=us-central1
# Alternative region env var for Gemini Image Generation
# GOOGLE_CLOUD_LOCATION=global
# Vertex AI Service Account Configuration
# Path to your Google Cloud service account JSON file
# GOOGLE_SERVICE_KEY_FILE=/path/to/service-account.json
# Google Safety Settings
# NOTE: These settings apply to both Vertex AI and Gemini API (AI Studio)
#
# For Vertex AI:
# To use the BLOCK_NONE setting, you need either:
# (a) Access through an allowlist via your Google account team, or
# (b) Switch to monthly invoiced billing: https://cloud.google.com/billing/docs/how-to/invoiced-billing
#
# For Gemini API (AI Studio):
# BLOCK_NONE is available by default, no special account requirements.
#
# Available options: BLOCK_NONE, BLOCK_ONLY_HIGH, BLOCK_MEDIUM_AND_ABOVE, BLOCK_LOW_AND_ABOVE
#
# GOOGLE_SAFETY_SEXUALLY_EXPLICIT=BLOCK_ONLY_HIGH
# GOOGLE_SAFETY_HATE_SPEECH=BLOCK_ONLY_HIGH
# GOOGLE_SAFETY_HARASSMENT=BLOCK_ONLY_HIGH
# GOOGLE_SAFETY_DANGEROUS_CONTENT=BLOCK_ONLY_HIGH
# GOOGLE_SAFETY_CIVIC_INTEGRITY=BLOCK_ONLY_HIGH
#========================#
# Gemini Image Generation #
#========================#
# Gemini Image Generation Tool (for Agents)
# Supports multiple authentication methods in priority order:
# 1. User-provided API key (via GUI)
# 2. GEMINI_API_KEY env var (admin-configured)
# 3. GOOGLE_KEY env var (shared with Google chat endpoint)
# 4. Vertex AI service account (via GOOGLE_SERVICE_KEY_FILE)
# Option A: Use dedicated Gemini API key for image generation
# GEMINI_API_KEY=your-gemini-api-key
# Vertex AI model for image generation (defaults to gemini-2.5-flash-image)
# GEMINI_IMAGE_MODEL=gemini-2.5-flash-image
#============#
# OpenAI #
#============#
OPENAI_API_KEY=user_provided
# OPENAI_MODELS=gpt-5,gpt-5-codex,gpt-5-mini,gpt-5-nano,o3-pro,o3,o4-mini,gpt-4.1,gpt-4.1-mini,gpt-4.1-nano,o3-mini,o1-pro,o1,gpt-4o,gpt-4o-mini
DEBUG_OPENAI=false
# TITLE_CONVO=false
# OPENAI_TITLE_MODEL=gpt-4o-mini
# OPENAI_SUMMARIZE=true
# OPENAI_SUMMARY_MODEL=gpt-4o-mini
# OPENAI_FORCE_PROMPT=true
# OPENAI_REVERSE_PROXY=
# OPENAI_ORGANIZATION=
#====================#
# Assistants API #
#====================#
ASSISTANTS_API_KEY=user_provided
# ASSISTANTS_BASE_URL=
# ASSISTANTS_MODELS=gpt-4o,gpt-4o-mini,gpt-3.5-turbo-0125,gpt-3.5-turbo-16k-0613,gpt-3.5-turbo-16k,gpt-3.5-turbo,gpt-4,gpt-4-0314,gpt-4-32k-0314,gpt-4-0613,gpt-3.5-turbo-0613,gpt-3.5-turbo-1106,gpt-4-0125-preview,gpt-4-turbo-preview,gpt-4-1106-preview
#==========================#
# Azure Assistants API #
#==========================#
# Note: You should map your credentials with custom variables according to your Azure OpenAI Configuration
# The models for Azure Assistants are also determined by your Azure OpenAI configuration.
# More info, including how to enable use of Assistants with Azure here:
# https://www.librechat.ai/docs/configuration/librechat_yaml/ai_endpoints/azure#using-assistants-with-azure
CREDS_KEY=f34be427ebb29de8d88c107a71546019685ed8b241d8f2ed00c3df97ad2566f0
CREDS_IV=e2341419ec3dd3d19b13a1a87fafcbfb
# Azure AI Search
#-----------------
AZURE_AI_SEARCH_SERVICE_ENDPOINT=
AZURE_AI_SEARCH_INDEX_NAME=
AZURE_AI_SEARCH_API_KEY=
AZURE_AI_SEARCH_API_VERSION=
AZURE_AI_SEARCH_SEARCH_OPTION_QUERY_TYPE=
AZURE_AI_SEARCH_SEARCH_OPTION_TOP=
AZURE_AI_SEARCH_SEARCH_OPTION_SELECT=
# OpenAI Image Tools Customization
#----------------
# IMAGE_GEN_OAI_API_KEY= # Create or reuse OpenAI API key for image generation tool
# IMAGE_GEN_OAI_BASEURL= # Custom OpenAI base URL for image generation tool
# IMAGE_GEN_OAI_AZURE_API_VERSION= # Custom Azure OpenAI deployments
# IMAGE_GEN_OAI_MODEL=gpt-image-1 # OpenAI image model (e.g., gpt-image-1, gpt-image-1.5)
# IMAGE_GEN_OAI_DESCRIPTION=
# IMAGE_GEN_OAI_DESCRIPTION_WITH_FILES=Custom description for image generation tool when files are present
# IMAGE_GEN_OAI_DESCRIPTION_NO_FILES=Custom description for image generation tool when no files are present
# IMAGE_EDIT_OAI_DESCRIPTION=Custom description for image editing tool
# IMAGE_GEN_OAI_PROMPT_DESCRIPTION=Custom prompt description for image generation tool
# IMAGE_EDIT_OAI_PROMPT_DESCRIPTION=Custom prompt description for image editing tool
# DALL·E
#----------------
# DALLE_API_KEY=
# DALLE3_API_KEY=
# DALLE2_API_KEY=
# DALLE3_SYSTEM_PROMPT=
# DALLE2_SYSTEM_PROMPT=
# DALLE_REVERSE_PROXY=
# DALLE3_BASEURL=
# DALLE2_BASEURL=
# DALL·E (via Azure OpenAI)
# Note: requires some of the variables above to be set
#----------------
# DALLE3_AZURE_API_VERSION=
# DALLE2_AZURE_API_VERSION=
# Flux
#-----------------
FLUX_API_BASE_URL=https://api.us1.bfl.ai
# FLUX_API_BASE_URL = 'https://api.bfl.ml';
# Get your API key at https://api.us1.bfl.ai/auth/profile
# FLUX_API_KEY=
# Google
#-----------------
GOOGLE_SEARCH_API_KEY=
GOOGLE_CSE_ID=
# Stable Diffusion
#-----------------
SD_WEBUI_URL=http://host.docker.internal:7860
# Tavily
#-----------------
TAVILY_API_KEY=
# Traversaal
#-----------------
TRAVERSAAL_API_KEY=
# WolframAlpha
#-----------------
WOLFRAM_APP_ID=
# Zapier
#-----------------
ZAPIER_NLA_API_KEY=
#==================================================#
# Search #
#==================================================#
SEARCH=true
MEILI_NO_ANALYTICS=true
MEILI_HOST=http://0.0.0.0:7700
MEILI_MASTER_KEY=DrhYf7zENyR6AlUCKmnz0eYASOQdl6zxH7s7MKFSfFCt
# Optional: Disable indexing, useful in a multi-node setup
# where only one instance should perform an index sync.
# MEILI_NO_SYNC=true
#==================================================#
# Speech to Text & Text to Speech #
#==================================================#
STT_API_KEY=
TTS_API_KEY=
#==================================================#
# RAG #
#==================================================#
# More info: https://www.librechat.ai/docs/configuration/rag_api
# RAG_OPENAI_BASEURL=
# RAG_OPENAI_API_KEY=
# RAG_USE_FULL_CONTEXT=
# EMBEDDINGS_PROVIDER=openai
# EMBEDDINGS_MODEL=text-embedding-3-small
#===================================================#
# User System #
#===================================================#
#========================#
# Moderation #
#========================#
OPENAI_MODERATION=false
OPENAI_MODERATION_API_KEY=
# OPENAI_MODERATION_REVERSE_PROXY=
BAN_VIOLATIONS=true
BAN_DURATION=1000 * 60 * 60 * 2
BAN_INTERVAL=20
LOGIN_VIOLATION_SCORE=1
REGISTRATION_VIOLATION_SCORE=1
CONCURRENT_VIOLATION_SCORE=1
MESSAGE_VIOLATION_SCORE=1
NON_BROWSER_VIOLATION_SCORE=20
TTS_VIOLATION_SCORE=0
STT_VIOLATION_SCORE=0
FORK_VIOLATION_SCORE=0
IMPORT_VIOLATION_SCORE=0
FILE_UPLOAD_VIOLATION_SCORE=0
LOGIN_MAX=7
LOGIN_WINDOW=5
REGISTER_MAX=5
REGISTER_WINDOW=60
LIMIT_CONCURRENT_MESSAGES=true
CONCURRENT_MESSAGE_MAX=2
LIMIT_MESSAGE_IP=true
MESSAGE_IP_MAX=40
MESSAGE_IP_WINDOW=1
LIMIT_MESSAGE_USER=false
MESSAGE_USER_MAX=40
MESSAGE_USER_WINDOW=1
ILLEGAL_MODEL_REQ_SCORE=5
#========================#
# Balance #
#========================#
# CHECK_BALANCE=false
# START_BALANCE=20000 # note: the number of tokens that will be credited after registration.
#========================#
# Registration and Login #
#========================#
ALLOW_EMAIL_LOGIN=true
ALLOW_REGISTRATION=true
ALLOW_SOCIAL_LOGIN=false
ALLOW_SOCIAL_REGISTRATION=false
ALLOW_PASSWORD_RESET=false
# ALLOW_ACCOUNT_DELETION=true # note: enabled by default if omitted/commented out
ALLOW_UNVERIFIED_EMAIL_LOGIN=true
SESSION_EXPIRY=1000 * 60 * 15
REFRESH_TOKEN_EXPIRY=(1000 * 60 * 60 * 24) * 7
JWT_SECRET=16f8c0ef4a5d391b26034086c628469d3f9f497f08163ab9b40137092f2909ef
JWT_REFRESH_SECRET=eaa5191f2914e30b9387fd84e254e4ba6fc51b4654968a9b0803b456a54b8418
# Discord
DISCORD_CLIENT_ID=
DISCORD_CLIENT_SECRET=
DISCORD_CALLBACK_URL=/oauth/discord/callback
# Facebook
FACEBOOK_CLIENT_ID=
FACEBOOK_CLIENT_SECRET=
FACEBOOK_CALLBACK_URL=/oauth/facebook/callback
# GitHub
GITHUB_CLIENT_ID=
GITHUB_CLIENT_SECRET=
GITHUB_CALLBACK_URL=/oauth/github/callback
# GitHub Enterprise
# GITHUB_ENTERPRISE_BASE_URL=
# GITHUB_ENTERPRISE_USER_AGENT=
# Google
GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=
GOOGLE_CALLBACK_URL=/oauth/google/callback
# Apple
APPLE_CLIENT_ID=
APPLE_TEAM_ID=
APPLE_KEY_ID=
APPLE_PRIVATE_KEY_PATH=
APPLE_CALLBACK_URL=/oauth/apple/callback
# OpenID
OPENID_CLIENT_ID=
OPENID_CLIENT_SECRET=
OPENID_ISSUER=
OPENID_SESSION_SECRET=
OPENID_SCOPE="openid profile email"
OPENID_CALLBACK_URL=/oauth/openid/callback
OPENID_REQUIRED_ROLE=
OPENID_REQUIRED_ROLE_TOKEN_KIND=
OPENID_REQUIRED_ROLE_PARAMETER_PATH=
OPENID_ADMIN_ROLE=
OPENID_ADMIN_ROLE_PARAMETER_PATH=
OPENID_ADMIN_ROLE_TOKEN_KIND=
# Set to determine which user info property returned from OpenID Provider to store as the User's username
OPENID_USERNAME_CLAIM=
# Set to determine which user info property returned from OpenID Provider to store as the User's name
OPENID_NAME_CLAIM=
# Set to determine which user info claim to use as the email/identifier for user matching (e.g., "upn" for Entra ID)
# When not set, defaults to: email -> preferred_username -> upn
OPENID_EMAIL_CLAIM=
# Optional audience parameter for OpenID authorization requests
OPENID_AUDIENCE=
OPENID_BUTTON_LABEL=
OPENID_IMAGE_URL=
# Set to true to automatically redirect to the OpenID provider when a user visits the login page
# This will bypass the login form completely for users, only use this if OpenID is your only authentication method
OPENID_AUTO_REDIRECT=false
# Set to true to use PKCE (Proof Key for Code Exchange) for OpenID authentication
OPENID_USE_PKCE=false
#Set to true to reuse openid tokens for authentication management instead of using the mongodb session and the custom refresh token.
OPENID_REUSE_TOKENS=
#By default, signing key verification results are cached in order to prevent excessive HTTP requests to the JWKS endpoint.
#If a signing key matching the kid is found, this will be cached and the next time this kid is requested the signing key will be served from the cache.
#Default is true.
OPENID_JWKS_URL_CACHE_ENABLED=
OPENID_JWKS_URL_CACHE_TIME= # 600000 ms eq to 10 minutes leave empty to disable caching
#Set to true to trigger token exchange flow to acquire access token for the userinfo endpoint.
OPENID_ON_BEHALF_FLOW_FOR_USERINFO_REQUIRED=
OPENID_ON_BEHALF_FLOW_USERINFO_SCOPE="user.read" # example for Scope Needed for Microsoft Graph API
# Set to true to use the OpenID Connect end session endpoint for logout
OPENID_USE_END_SESSION_ENDPOINT=
# URL to redirect to after OpenID logout (defaults to ${DOMAIN_CLIENT}/login)
OPENID_POST_LOGOUT_REDIRECT_URI=
# Maximum logout URL length before using logout_hint instead of id_token_hint (default: 2000)
OPENID_MAX_LOGOUT_URL_LENGTH=
#========================#
# SharePoint Integration #
#========================#
# Requires Entra ID (OpenID) authentication to be configured
# Enable SharePoint file picker in chat and agent panels
# ENABLE_SHAREPOINT_FILEPICKER=true
# SharePoint tenant base URL (e.g., https://yourtenant.sharepoint.com)
# SHAREPOINT_BASE_URL=https://yourtenant.sharepoint.com
# Microsoft Graph API And SharePoint scopes for file picker
# SHAREPOINT_PICKER_SHAREPOINT_SCOPE==https://yourtenant.sharepoint.com/AllSites.Read
# SHAREPOINT_PICKER_GRAPH_SCOPE=Files.Read.All
#========================#
# SAML
# Note: If OpenID is enabled, SAML authentication will be automatically disabled.
SAML_ENTRY_POINT=
SAML_ISSUER=
SAML_CERT=
SAML_CALLBACK_URL=/oauth/saml/callback
SAML_SESSION_SECRET=
# Attribute mappings (optional)
SAML_EMAIL_CLAIM=
SAML_USERNAME_CLAIM=
SAML_GIVEN_NAME_CLAIM=
SAML_FAMILY_NAME_CLAIM=
SAML_PICTURE_CLAIM=
SAML_NAME_CLAIM=
# Logint buttion settings (optional)
SAML_BUTTON_LABEL=
SAML_IMAGE_URL=
# Whether the SAML Response should be signed.
# - If "true", the entire `SAML Response` will be signed.
# - If "false" or unset, only the `SAML Assertion` will be signed (default behavior).
# SAML_USE_AUTHN_RESPONSE_SIGNED=
#===============================================#
# Microsoft Graph API / Entra ID Integration #
#===============================================#
# Enable Entra ID people search integration in permissions/sharing system
# When enabled, the people picker will search both local database and Entra ID
USE_ENTRA_ID_FOR_PEOPLE_SEARCH=false
# When enabled, entra id groups owners will be considered as members of the group
ENTRA_ID_INCLUDE_OWNERS_AS_MEMBERS=false
# Microsoft Graph API scopes needed for people/group search
# Default scopes provide access to user profiles and group memberships
OPENID_GRAPH_SCOPES=User.Read,People.Read,GroupMember.Read.All
# LDAP
LDAP_URL=
LDAP_BIND_DN=
LDAP_BIND_CREDENTIALS=
LDAP_USER_SEARCH_BASE=
#LDAP_SEARCH_FILTER="mail="
LDAP_CA_CERT_PATH=
# LDAP_TLS_REJECT_UNAUTHORIZED=
# LDAP_STARTTLS=
# LDAP_LOGIN_USES_USERNAME=true
# LDAP_ID=
# LDAP_USERNAME=
# LDAP_EMAIL=
# LDAP_FULL_NAME=
#========================#
# Email Password Reset #
#========================#
EMAIL_SERVICE=
EMAIL_HOST=
EMAIL_PORT=25
EMAIL_ENCRYPTION=
EMAIL_ENCRYPTION_HOSTNAME=
EMAIL_ALLOW_SELFSIGNED=
# Leave both empty for SMTP servers that do not require authentication
EMAIL_USERNAME=
EMAIL_PASSWORD=
EMAIL_FROM_NAME=
EMAIL_FROM=noreply@librechat.ai
#========================#
# Mailgun API #
#========================#
# MAILGUN_API_KEY=your-mailgun-api-key
# MAILGUN_DOMAIN=mg.yourdomain.com
# EMAIL_FROM=noreply@yourdomain.com
# EMAIL_FROM_NAME="LibreChat"
# # Optional: For EU region
# MAILGUN_HOST=https://api.eu.mailgun.net
#========================#
# Firebase CDN #
#========================#
FIREBASE_API_KEY=
FIREBASE_AUTH_DOMAIN=
FIREBASE_PROJECT_ID=
FIREBASE_STORAGE_BUCKET=
FIREBASE_MESSAGING_SENDER_ID=
FIREBASE_APP_ID=
#========================#
# S3 AWS Bucket #
#========================#
AWS_ENDPOINT_URL=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_REGION=
AWS_BUCKET_NAME=
# Required for path-style S3-compatible providers (MinIO, Hetzner, Backblaze B2, etc.)
# that don't support virtual-hosted-style URLs (bucket.endpoint). Not needed for AWS S3.
# AWS_FORCE_PATH_STYLE=false
#========================#
# Azure Blob Storage #
#========================#
AZURE_STORAGE_CONNECTION_STRING=
AZURE_STORAGE_PUBLIC_ACCESS=false
AZURE_CONTAINER_NAME=files
#========================#
# Shared Links #
#========================#
ALLOW_SHARED_LINKS=true
# Allows unauthenticated access to shared links. Defaults to false (auth required) if not set.
ALLOW_SHARED_LINKS_PUBLIC=false
#==============================#
# Static File Cache Control #
#==============================#
# Leave commented out to use defaults: 1 day (86400 seconds) for s-maxage and 2 days (172800 seconds) for max-age
# NODE_ENV must be set to production for these to take effect
# STATIC_CACHE_MAX_AGE=172800
# STATIC_CACHE_S_MAX_AGE=86400
# If you have another service in front of your LibreChat doing compression, disable express based compression here
# DISABLE_COMPRESSION=true
# If you have gzipped version of uploaded image images in the same folder, this will enable gzip scan and serving of these images
# Note: The images folder will be scanned on startup and a ma kept in memory. Be careful for large number of images.
# ENABLE_IMAGE_OUTPUT_GZIP_SCAN=true
#===================================================#
# UI #
#===================================================#
APP_TITLE=LibreChat
# CUSTOM_FOOTER="My custom footer"
HELP_AND_FAQ_URL=https://librechat.ai
# SHOW_BIRTHDAY_ICON=true
# Google tag manager id
#ANALYTICS_GTM_ID=user provided google tag manager id
# limit conversation file imports to a certain number of bytes in size to avoid the container
# maxing out memory limitations by unremarking this line and supplying a file size in bytes
# such as the below example of 250 mib
# CONVERSATION_IMPORT_MAX_FILE_SIZE_BYTES=262144000
#===============#
# REDIS Options #
#===============#
# Enable Redis for caching and session storage
# USE_REDIS=true
# Enable Redis for resumable LLM streams (defaults to USE_REDIS value if not set)
# Set to false to use in-memory storage for streams while keeping Redis for other caches
# USE_REDIS_STREAMS=true
# Single Redis instance
# REDIS_URI=redis://127.0.0.1:6379
# Redis cluster (multiple nodes)
# REDIS_URI=redis://127.0.0.1:7001,redis://127.0.0.1:7002,redis://127.0.0.1:7003
# Redis with TLS/SSL encryption and CA certificate
# REDIS_URI=rediss://127.0.0.1:6380
# REDIS_CA=/path/to/ca-cert.pem
# Elasticache may need to use an alternate dnsLookup for TLS connections. see "Special Note: Aws Elasticache Clusters with TLS" on this webpage: https://www.npmjs.com/package/ioredis
# Enable alternative dnsLookup for redis
# REDIS_USE_ALTERNATIVE_DNS_LOOKUP=true
# Redis authentication (if required)
# REDIS_USERNAME=your_redis_username
# REDIS_PASSWORD=your_redis_password
# Redis key prefix configuration
# Use environment variable name for dynamic prefix (recommended for cloud deployments)
# REDIS_KEY_PREFIX_VAR=K_REVISION
# Or use static prefix directly
# REDIS_KEY_PREFIX=librechat
# Redis connection limits
# REDIS_MAX_LISTENERS=40
# Redis ping interval in seconds (0 = disabled, >0 = enabled)
# When set to a positive integer, Redis clients will ping the server at this interval to keep connections alive
# When unset or 0, no pinging is performed (recommended for most use cases)
# REDIS_PING_INTERVAL=300
# Force specific cache namespaces to use in-memory storage even when Redis is enabled
# Comma-separated list of CacheKeys
# Defaults to CONFIG_STORE,APP_CONFIG so YAML-derived config stays per-container (safe for blue/green deployments)
# Set to empty string to force all namespaces through Redis: FORCED_IN_MEMORY_CACHE_NAMESPACES=
# FORCED_IN_MEMORY_CACHE_NAMESPACES=CONFIG_STORE,APP_CONFIG
# Leader Election Configuration (for multi-instance deployments with Redis)
# Duration in seconds that the leader lease is valid before it expires (default: 25)
# LEADER_LEASE_DURATION=25
# Interval in seconds at which the leader renews its lease (default: 10)
# LEADER_RENEW_INTERVAL=10
# Maximum number of retry attempts when renewing the lease fails (default: 3)
# LEADER_RENEW_ATTEMPTS=3
# Delay in seconds between retry attempts when renewing the lease (default: 0.5)
# LEADER_RENEW_RETRY_DELAY=0.5
#==================================================#
# Others #
#==================================================#
# You should leave the following commented out #
# NODE_ENV=
# E2E_USER_EMAIL=
# E2E_USER_PASSWORD=
#=====================================================#
# Cache Headers #
#=====================================================#
# Headers that control caching of the index.html #
# Default configuration prevents caching to ensure #
# users always get the latest version. Customize #
# only if you understand caching implications. #
# INDEX_CACHE_CONTROL=no-cache, no-store, must-revalidate
# INDEX_PRAGMA=no-cache
# INDEX_EXPIRES=0
# no-cache: Forces validation with server before using cached version
# no-store: Prevents storing the response entirely
# must-revalidate: Prevents using stale content when offline
#=====================================================#
# OpenWeather #
#=====================================================#
OPENWEATHER_API_KEY=
#====================================#
# LibreChat Code Interpreter API #
#====================================#
# https://code.librechat.ai
# LIBRECHAT_CODE_API_KEY=your-key
#======================#
# Web Search #
#======================#
# Note: All of the following variable names can be customized.
# Omit values to allow user to provide them.
# For more information on configuration values, see:
# https://librechat.ai/docs/features/web_search
# Search Provider (Required)
# SERPER_API_KEY=your_serper_api_key
# Scraper (Required)
# FIRECRAWL_API_KEY=your_firecrawl_api_key
# Optional: Custom Firecrawl API URL
# FIRECRAWL_API_URL=your_firecrawl_api_url
# Reranker (Required)
# JINA_API_KEY=your_jina_api_key
# or
# COHERE_API_KEY=your_cohere_api_key
#======================#
# MCP Configuration #
#======================#
# Treat 401/403 responses as OAuth requirement when no oauth metadata found
# MCP_OAUTH_ON_AUTH_ERROR=true
# Timeout for OAuth detection requests in milliseconds
# MCP_OAUTH_DETECTION_TIMEOUT=5000
# Cache connection status checks for this many milliseconds to avoid expensive verification
# MCP_CONNECTION_CHECK_TTL=60000
# Skip code challenge method validation (e.g., for AWS Cognito that supports S256 but doesn't advertise it)
# When set to true, forces S256 code challenge even if not advertised in .well-known/openid-configuration
# MCP_SKIP_CODE_CHALLENGE_CHECK=false
# Circuit breaker: max connect/disconnect cycles before tripping (per server)
# MCP_CB_MAX_CYCLES=7
# Circuit breaker: sliding window (ms) for counting cycles
# MCP_CB_CYCLE_WINDOW_MS=45000
# Circuit breaker: cooldown (ms) after the cycle breaker trips
# MCP_CB_CYCLE_COOLDOWN_MS=15000
# Circuit breaker: max consecutive failed connection rounds before backoff
# MCP_CB_MAX_FAILED_ROUNDS=3
# Circuit breaker: sliding window (ms) for counting failed rounds
# MCP_CB_FAILED_WINDOW_MS=120000
# Circuit breaker: base backoff (ms) after failed round threshold is reached
# MCP_CB_BASE_BACKOFF_MS=30000
# Circuit breaker: max backoff cap (ms) for exponential backoff
# MCP_CB_MAX_BACKOFF_MS=300000
-8
View File
@@ -1,8 +0,0 @@
# Auto-generated from pyproject.toml — do not edit manually
ALFRED_VERSION=0.1.7
PYTHON_VERSION=3.14.3
IMAGE_NAME=alfred_media_organizer
SERVICE_NAME=alfred
LIBRECHAT_VERSION=v0.8.4
RAG_VERSION=v0.7.3
UV_VERSION=0.11.6
+19 -57
View File
@@ -2,10 +2,11 @@ name: CI/CD Awesome Pipeline
on: on:
push: push:
branches: [main]
tags: tags:
- 'v*.*.*' - 'v*.*.*'
pull_request:
workflow_dispatch: branches: [main]
env: env:
REGISTRY_URL: ${{ vars.REGISTRY_URL || 'gitea.iswearihadsomethingforthis.net' }} REGISTRY_URL: ${{ vars.REGISTRY_URL || 'gitea.iswearihadsomethingforthis.net' }}
@@ -20,73 +21,34 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Build and run tests - name: Build and run tests
env:
DEEPSEEK_API_KEY: ${{ secrets.DEEPSEEK_API_KEY }}
TMDB_API_KEY: ${{ secrets.TMDB_API_KEY }}
run: make _ci-run-tests run: make _ci-run-tests
build-and-push: build-and-push:
name: Build & Push to Registry name: Build & Push to Registry
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: test needs: test
if: startsWith(github.ref, 'refs/tags/v')
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Generate build variables
run: python scripts/generate_build_vars.py
- name: Load config from Makefile - name: Load config from Makefile
id: config id: config
run: make -s _ci-dump-config >> $GITHUB_OUTPUT run: |
eval "$(make _ci-image-name)"
echo "image_name=${IMAGE_NAME}" >> $GITHUB_OUTPUT
- name: 🏷️ Docker Metadata (Tags & Labels) - name: Extract version from tag
id: meta id: version
uses: docker/metadata-action@v5 run: echo "version=${GITHUB_REF#refs/tags/v}" >> $GITHUB_OUTPUT
with:
images: gitea.iswearihadsomethingforthis.net/francwa/${{ steps.config.outputs.image_name }}
tags: |
# Tagged (v1.2.3)
type=semver,pattern={{ version }}
# Latest (main)
type=raw,value=latest,enable={{ is_default_branch }}
# Feature branches
type=ref,event=branch
- name: Login to Gitea Registry - name: Build production image
uses: docker/login-action@v3 run: make build
with:
registry: gitea.iswearihadsomethingforthis.net
username: ${{ gitea.actor }}
password: ${{ secrets.G1T34_TOKEN }}
- name: Build and push - name: Tag and push to registry
id: docker_build run: |
uses: docker/build-push-action@v5 docker tag ${{ steps.config.outputs.image_name }}:latest ${{ env.REGISTRY_URL }}/${{ env.REGISTRY_USER }}/${{ steps.config.outputs.image_name }}:${{ steps.version.outputs.version }}
with: docker tag ${{ steps.config.outputs.image_name }}:latest ${{ env.REGISTRY_URL }}/${{ env.REGISTRY_USER }}/${{ steps.config.outputs.image_name }}:latest
context: . echo "${{ secrets.GITEA_TOKEN }}" | docker login ${{ env.REGISTRY_URL }} -u ${{ env.REGISTRY_USER }} --password-stdin
push: true docker push ${{ env.REGISTRY_URL }}/${{ env.REGISTRY_USER }}/${{ steps.config.outputs.image_name }}:${{ steps.version.outputs.version }}
tags: ${{ steps.meta.outputs.tags }} docker push ${{ env.REGISTRY_URL }}/${{ env.REGISTRY_USER }}/${{ steps.config.outputs.image_name }}:latest
labels: ${{ steps.meta.outputs.labels }}
build-args: |
PYTHON_VERSION=${{ steps.config.outputs.python_version }}
PYTHON_VERSION_SHORT=${{ steps.config.outputs.python_version_short }}
RUNNER=${{ steps.config.outputs.runner }}
- name: 🛡️ Run Trivy Vulnerability Scanner
uses: docker://aquasec/trivy:latest
env:
TRIVY_USERNAME: ${{ gitea.actor }}
TRIVY_PASSWORD: ${{ secrets.G1T34_TOKEN }}
# Unset the fake GITHUB_TOKEN injected by Gitea
GITHUB_TOKEN: ""
with:
args: image --format table --output trivy-report.txt --exit-code 0 --ignore-unfixed --severity CRITICAL,HIGH gitea.iswearihadsomethingforthis.net/francwa/${{ steps.config.outputs.image_name }}:latest
- name: 📤 Upload Security Report
uses: actions/upload-artifact@v3
with:
name: security-report
path: trivy-report.txt
retention-days: 7
-22
View File
@@ -1,22 +0,0 @@
name: Renovate Bot
on:
schedule:
# Every Monday 4AM
- cron: '0 4 * * 1'
workflow_dispatch:
jobs:
renovate:
runs-on: ubuntu-latest
steps:
- name: Run Renovate
uses: docker://renovate/renovate:latest
env:
RENOVATE_PLATFORM: "gitea"
RENOVATE_ENDPOINT: "https://gitea.iswearihadsomethingforthis.net/api/v1"
RENOVATE_TOKEN: "${{ secrets.RENOVATE_TOKEN }}"
RENOVATE_REPOSITORIES: '["${{ gitea.repository }}"]'
RENOVATE_GIT_AUTHOR: "Renovate Bot <renovate@bot.local>"
# Might need a free github token if lots of depencies
# RENOVATE_GITHUB_TOKEN: "${{ secrets.GITHUB_COM_TOKEN }}"
+1 -24
View File
@@ -55,30 +55,7 @@ coverage.xml
Thumbs.db Thumbs.db
# Secrets # Secrets
.env.secrets .env
# Backup files # Backup files
*.backup *.backup
*.bak
env_backup/
# Application data dir
data/*
# Application logs
logs/*
# Documentation folder
docs/
# .md files (project-level Markdown is brol-y; allow-list the ones we track)
*.md
!CHANGELOG.md
!/README.md
!specs/
!specs/**/*.md
# Private dev docs (separate git repo inside; see .claude/CLAUDE.md)
/.claude/
#
-1224
View File
File diff suppressed because it is too large Load Diff
-91
View File
@@ -1,91 +0,0 @@
# syntax=docker/dockerfile:1
# check=skip=InvalidDefaultArgInFrom
ARG PYTHON_VERSION
ARG UV_VERSION
# Stage 0: uv binary (workaround — --from doesn't support ARG expansion)
FROM ghcr.io/astral-sh/uv:${UV_VERSION} AS uv-bin
# ===========================================
# Stage 1: Builder
# ===========================================
FROM python:${PYTHON_VERSION}-slim-bookworm AS builder
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
UV_PROJECT_ENVIRONMENT=/venv
# Install build dependencies
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
&& apt-get install -y --no-install-recommends build-essential
# Install uv globally
COPY --from=uv-bin /uv /usr/local/bin/uv
WORKDIR /tmp
COPY pyproject.toml uv.lock Makefile ./
# Install dependencies into /venv
RUN --mount=type=cache,target=/root/.cache/uv uv sync
COPY scripts/ ./scripts/
COPY .env.example ./
# ===========================================
# Stage 2: Testing
# ===========================================
FROM builder AS test
RUN --mount=type=cache,target=/root/.cache/uv uv sync --group dev
COPY alfred/ ./alfred
COPY scripts ./scripts
COPY tests/ ./tests
# ===========================================
# Stage 3: Runtime
# ===========================================
FROM python:${PYTHON_VERSION}-slim-bookworm AS runtime
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/home/appuser \
PATH="/venv/bin:$PATH"
# Install runtime dependencies
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates
# Create non-root user
RUN useradd -m -u 1000 -s /bin/bash appuser
# Create data directories
RUN mkdir -p /data /logs \
&& chown -R appuser:appuser /data /logs
USER appuser
WORKDIR /home/appuser
# Copy venv from builder stage
COPY --from=builder /venv /venv
# Copy application code
COPY --chown=appuser:appuser alfred/ ./alfred
COPY --chown=appuser:appuser scripts/ ./scripts
COPY --chown=appuser:appuser .env.example ./
COPY --chown=appuser:appuser pyproject.toml ./
VOLUME ["/data", "/logs"]
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health', timeout=5).raise_for_status()" || exit 1
CMD ["python", "-m", "uvicorn", "alfred.app:app", "--host", "0.0.0.0", "--port", "8000"]
+191 -163
View File
@@ -1,187 +1,215 @@
.POSIX:
.SUFFIXES:
.DEFAULT_GOAL := help .DEFAULT_GOAL := help
# --- Load Config from pyproject.toml --- # --- SETTINGS ---
export # Change to 'uv' when ready.
-include .env.make RUNNER ?= poetry
export RUNNER
# --- Profiles management --- # --- VARIABLES ---
# Usage: make up p=rag,meili CORE_DIR = brain
p ?= full SERVICE_NAME = agent_media
PROFILES_PARAM := COMPOSE_PROFILES=$(p) IMAGE_NAME = agent_media
# --- Commands --- # --- ADAPTERS ---
DOCKER_COMPOSE := docker compose \ # UV uses "sync", Poetry uses "install". Both install DEV deps by default.
--env-file .env.alfred \ INSTALL_CMD = $(if $(filter uv,$(RUNNER)),sync,install)
--env-file .env.secrets \
--env-file .env.make
DOCKER_BUILD := DOCKER_BUILDKIT=1 docker build \
--build-arg PYTHON_VERSION=$(PYTHON_VERSION) \
--build-arg UV_VERSION=$(UV_VERSION)
# --- Phony --- # --- MACROS ---
.PHONY: bootstrap up down restart logs ps shell build build-test install \ ARGS = $(filter-out $@,$(MAKECMDGOALS))
update install-hooks test coverage lint format clean major minor patch help BUMP_CMD = cd $(CORE_DIR) && $(RUNNER) run bump-my-version bump
COMPOSE_CMD = docker-compose
DOCKER_CMD = cd $(CORE_DIR) && docker build --build-arg RUNNER=$(RUNNER) -t $(IMAGE_NAME):latest .
# --- Setup --- RUNNER_ADD = cd $(CORE_DIR) && $(RUNNER) add
.env.alfred .env.librechat .env.secrets .env.make: RUNNER_HOOKS = cd $(CORE_DIR) && $(RUNNER) run pre-commit install -c ../.pre-commit-config.yaml
@echo "Initializing environment..." RUNNER_INSTALL = cd $(CORE_DIR) && $(RUNNER) $(INSTALL_CMD)
@uv run python scripts/bootstrap.py \ RUNNER_RUN = cd $(CORE_DIR) && $(RUNNER) run
&& echo "✓ Environment ready" \ RUNNER_UPDATE = cd $(CORE_DIR) && $(RUNNER) update
|| (echo "✗ Environment setup failed" && exit 1)
bootstrap: .env.alfred .env.librechat .env.secrets .env.make # --- STYLES ---
B = \033[1m
G = \033[32m
T = \033[36m
R = \033[0m
# --- Docker --- # --- TARGETS ---
up: .env.alfred .env.secrets .PHONY: add build check-docker check-runner clean coverage down format help init-env install install-hooks lint logs major minor patch prune ps restart run shell test up update _check_branch
@echo "Starting containers with profiles: [full]..."
@$(PROFILES_PARAM) $(DOCKER_COMPOSE) up -d --remove-orphans \
&& echo "✓ Containers started" \
|| (echo "✗ Failed to start containers" && exit 1)
down: # Catch-all for args
@echo "Stopping containers..." %:
@$(PROFILES_PARAM) $(DOCKER_COMPOSE) down \ @:
&& echo "✓ Containers stopped" \
|| (echo "✗ Failed to stop containers" && exit 1)
restart: add: check-runner
@echo "Restarting containers..." @echo "$(T) Adding dependency ($(RUNNER)): $(ARGS)$(R)"
@$(PROFILES_PARAM) $(DOCKER_COMPOSE) restart \ $(RUNNER_ADD) $(ARGS)
&& echo "✓ Containers restarted" \
|| (echo "✗ Failed to restart containers" && exit 1)
logs: build: check-docker
@echo "Following logs (Ctrl+C to exit)..." @echo "$(T)🐳 Building Docker image...$(R)"
@$(PROFILES_PARAM) $(DOCKER_COMPOSE) logs -f $(DOCKER_CMD)
@echo "✅ Image $(IMAGE_NAME):latest ready."
ps: check-docker:
@echo "Container status:" @command -v docker >/dev/null 2>&1 || { echo "$(R)❌ Docker not installed$(R)"; exit 1; }
@$(PROFILES_PARAM) $(DOCKER_COMPOSE) ps @docker info >/dev/null 2>&1 || { echo "$(R)❌ Docker daemon not running$(R)"; exit 1; }
shell: check-runner:
@echo "Opening shell in $(SERVICE_NAME)..." @command -v $(RUNNER) >/dev/null 2>&1 || { echo "$(R)$(RUNNER) not installed$(R)"; exit 1; }
@$(DOCKER_COMPOSE) exec $(SERVICE_NAME) /bin/bash
# --- Build ---
build: .env.make
@echo "Building image $(IMAGE_NAME):latest ..."
@$(DOCKER_BUILD) -t $(IMAGE_NAME):latest . \
&& echo "✓ Build complete" \
|| (echo "✗ Build failed" && exit 1)
build-test: .env.make
@echo "Building test image $(IMAGE_NAME):test..."
@$(DOCKER_BUILD) --target test -t $(IMAGE_NAME):test . \
&& echo "✓ Test image built" \
|| (echo "✗ Build failed" && exit 1)
# --- Dependencies ---
install:
@echo "Installing dependencies with uv..."
@uv install \
&& echo "✓ Dependencies installed" \
|| (echo "✗ Installation failed" && exit 1)
install-hooks:
@echo "Installing pre-commit hooks..."
@uv run pre-commit install \
&& echo "✓ Hooks installed" \
|| (echo "✗ Hook installation failed" && exit 1)
update:
@echo "Updating dependencies with uv..."
@uv update \
&& echo "✓ Dependencies updated" \
|| (echo "✗ Update failed" && exit 1)
# --- Quality ---
test:
@echo "Running tests..."
@uv run pytest \
&& echo "✓ Tests passed" \
|| (echo "✗ Tests failed" && exit 1)
coverage:
@echo "Running tests with coverage..."
@uv run pytest --cov=. --cov-report=html --cov-report=term \
&& echo "✓ Coverage report generated" \
|| (echo "✗ Coverage failed" && exit 1)
lint:
@echo "Linting code..."
@uv run ruff check --fix . \
&& echo "✓ Linting complete" \
|| (echo "✗ Linting failed" && exit 1)
format:
@echo "Formatting code..."
@uv run ruff format . && uv run ruff check --fix . \
&& echo "✓ Code formatted" \
|| (echo "✗ Formatting failed" && exit 1)
clean: clean:
@echo "Cleaning build artifacts..." @echo "$(T)🧹 Cleaning caches...$(R)"
@rm -rf .ruff_cache __pycache__ .pytest_cache htmlcov .coverage cd $(CORE_DIR) && rm -rf .ruff_cache __pycache__ .pytest_cache
@find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true find $(CORE_DIR) -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true
@echo "✓ Cleanup complete" find $(CORE_DIR) -type d -name ".pytest_cache" -exec rm -rf {} + 2>/dev/null || true
find $(CORE_DIR) -type f -name "*.pyc" -delete 2>/dev/null || true
@echo "✅ Caches cleaned."
# --- Versioning --- coverage: check-runner
major minor patch: _check-main @echo "$(T)📊 Running tests with coverage...$(R)"
@echo "Bumping $@ version..." $(RUNNER_RUN) pytest --cov=. --cov-report=html --cov-report=term $(ARGS)
@uv run bump-my-version bump $@ \ @echo "✅ Report generated in htmlcov/"
&& echo "✓ Version bumped" \
|| (echo "✗ Version bump failed" && exit 1)
@echo "Pushing tags..." down: check-docker
@git push --tags \ @echo "$(T)🛑 Stopping containers...$(R)"
&& echo "✓ Tags pushed" \ $(COMPOSE_CMD) down
|| (echo "✗ Push failed" && exit 1) @echo "✅ System stopped."
# CI/CD helpers format: check-runner
_ci-dump-config: @echo "$(T)✨ Formatting with Ruff...$(R)"
@echo "image_name=$(IMAGE_NAME)" $(RUNNER_RUN) ruff format .
@echo "python_version=$(PYTHON_VERSION)" $(RUNNER_RUN) ruff check --fix .
@echo "uv_version=$(UV_VERSION)" @echo "✅ Code cleaned."
@echo "service_name=$(SERVICE_NAME)"
_ci-run-tests:build-test
@echo "Running tests in Docker..."
docker run --rm \
-e DEEPSEEK_API_KEY \
-e TMDB_API_KEY \
-e QBITTORRENT_URL \
$(IMAGE_NAME):test pytest
@echo "✓ Tests passed."
_check-main:
@test "$$(git rev-parse --abbrev-ref HEAD)" = "main" \
|| (echo "✗ ERROR: Not on main branch" && exit 1)
# --- Help ---
help: help:
@echo "Cleverly Crafted Unawareness - Management Commands" @echo "$(B)Available commands:$(R)"
@echo "" @echo ""
@echo "Usage: make [target] [p=profile1,profile2]" @echo "$(G)Setup:$(R)"
@echo " $(T)check-docker $(R) Verify Docker is installed and running."
@echo " $(T)check-runner $(R) Verify package manager ($(RUNNER))."
@echo " $(T)init-env $(R) Create .env from .env.example with generated secrets."
@echo " $(T)install $(R) Install ALL dependencies (Prod + Dev)."
@echo " $(T)install-hooks $(R) Install git pre-commit hooks."
@echo "" @echo ""
@echo "Setup:" @echo "$(G)Docker:$(R)"
@echo " bootstrap Generate .env.alfred, .env.librechat, .env.secrets and .env.make" @echo " $(T)build $(R) Build the docker image."
@echo " $(T)down $(R) Stop and remove containers."
@echo " $(T)logs $(R) Follow logs."
@echo " $(T)prune $(R) Clean Docker system."
@echo " $(T)ps $(R) Show container status."
@echo " $(T)restart $(R) Restart all containers."
@echo " $(T)shell $(R) Open shell in container."
@echo " $(T)up $(R) Start the agent."
@echo "" @echo ""
@echo "Docker:" @echo "$(G)Development:$(R)"
@echo " up Start containers (default profile: core)" @echo " $(T)add ... $(R) Add dependency (use --group dev or --dev if needed)."
@echo " Example: make up p=rag,meili" @echo " $(T)clean $(R) Clean caches."
@echo " down Stop all containers" @echo " $(T)coverage $(R) Run tests with coverage."
@echo " restart Restart containers (supports p=...)" @echo " $(T)format $(R) Format code (Ruff)."
@echo " logs Follow logs (supports p=...)" @echo " $(T)lint $(R) Lint code without fixing."
@echo " ps Status of containers" @echo " $(T)test ... $(R) Run tests."
@echo " shell Open bash in the core container" @echo " $(T)update $(R) Update dependencies."
@echo " build Build the production Docker image"
@echo "" @echo ""
@echo "Dev & Quality:" @echo "$(G)Versioning:$(R)"
@echo " setup Bootstrap .env and security keys" @echo " $(T)major/minor/patch $(R) Bump version."
@echo " install Install dependencies via uv"
@echo " test Run pytest suite" init-env:
@echo " coverage Run tests and generate HTML report" @echo "$(T)🔑 Initializing .env file...$(R)"
@echo " lint/format Quality and style checks" @if [ -f .env ]; then \
@echo "" echo "$(R)⚠️ .env already exists. Skipping.$(R)"; \
@echo "Release:" exit 0; \
@echo " major|minor|patch Bump version and push tags (main branch only)" fi
@if [ ! -f .env.example ]; then \
echo "$(R)❌ .env.example not found$(R)"; \
exit 1; \
fi
@if ! command -v openssl >/dev/null 2>&1; then \
echo "$(R)❌ openssl not found. Please install it first.$(R)"; \
exit 1; \
fi
@echo "$(T) → Copying .env.example...$(R)"
@cp .env.example .env
@echo "$(T) → Generating secrets...$(R)"
@sed -i.bak "s|JWT_SECRET=.*|JWT_SECRET=$$(openssl rand -base64 32)|" .env
@sed -i.bak "s|JWT_REFRESH_SECRET=.*|JWT_REFRESH_SECRET=$$(openssl rand -base64 32)|" .env
@sed -i.bak "s|CREDS_KEY=.*|CREDS_KEY=$$(openssl rand -hex 16)|" .env
@sed -i.bak "s|CREDS_IV=.*|CREDS_IV=$$(openssl rand -hex 8)|" .env
@sed -i.bak "s|MEILI_MASTER_KEY=.*|MEILI_MASTER_KEY=$$(openssl rand -base64 32)|" .env
@sed -i.bak "s|AGENT_BRAIN_API_KEY=.*|AGENT_BRAIN_API_KEY=$$(openssl rand -base64 24)|" .env
@rm -f .env.bak
@echo "$(G)✅ .env created with generated secrets!$(R)"
@echo "$(T)⚠️ Don't forget to add your API keys:$(R)"
@echo " - OPENAI_API_KEY"
@echo " - DEEPSEEK_API_KEY"
@echo " - TMDB_API_KEY (optional)"
install: check-runner
@echo "$(T)📦 Installing FULL environment ($(RUNNER))...$(R)"
$(RUNNER_INSTALL)
@echo "✅ Environment ready (Prod + Dev)."
install-hooks: check-runner
@echo "$(T)🔧 Installing hooks...$(R)"
$(RUNNER_HOOKS)
@echo "✅ Hooks ready."
lint: check-runner
@echo "$(T)🔍 Linting code...$(R)"
$(RUNNER_RUN) ruff check .
logs: check-docker
@echo "$(T)📋 Following logs...$(R)"
$(COMPOSE_CMD) logs -f
major: _check_branch
@echo "$(T)💥 Bumping major...$(R)"
SKIP=all $(BUMP_CMD) major
minor: _check_branch
@echo "$(T)✨ Bumping minor...$(R)"
SKIP=all $(BUMP_CMD) minor
patch: _check_branch
@echo "$(T)🚀 Bumping patch...$(R)"
SKIP=all $(BUMP_CMD) patch
prune: check-docker
@echo "$(T)🗑️ Pruning Docker resources...$(R)"
docker system prune -af --volumes
@echo "✅ Docker cleaned."
ps: check-docker
@echo "$(T)📋 Container status:$(R)"
@$(COMPOSE_CMD) ps
restart: check-docker
@echo "$(T)🔄 Restarting containers...$(R)"
$(COMPOSE_CMD) restart
@echo "✅ Containers restarted."
run: check-runner
$(RUNNER_RUN) $(ARGS)
shell: check-docker
@echo "$(T)🐚 Opening shell in $(SERVICE_NAME)...$(R)"
$(COMPOSE_CMD) exec $(SERVICE_NAME) /bin/sh
test: check-runner
@echo "$(T)🧪 Running tests...$(R)"
$(RUNNER_RUN) pytest -n auto --dist=loadscope $(ARGS)
up: check-docker
@echo "$(T)🚀 Starting Agent Media...$(R)"
$(COMPOSE_CMD) up -d
@echo "✅ System is up."
update: check-runner
@echo "$(T)🔄 Updating dependencies...$(R)"
$(RUNNER_UPDATE)
@echo "✅ All packages up to date."
_check_branch:
@curr=$$(git rev-parse --abbrev-ref HEAD); \
if [ "$$curr" != "main" ]; then \
echo "❌ Error: not on the main branch"; exit 1; \
fi
-433
View File
@@ -1,433 +0,0 @@
# Alfred Media Organizer 🎬
An AI-powered agent for managing your local media library with natural language. Search, download, and organize movies and TV shows effortlessly through a conversational interface.
[![Python 3.14](https://img.shields.io/badge/python-3.14-blue.svg)](https://www.python.org/downloads/)
[![uv](https://img.shields.io/badge/dependency%20manager-uv-purple)](https://github.com/astral-sh/uv)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Code style: ruff](https://img.shields.io/badge/code%20style-ruff-000000.svg)](https://github.com/astral-sh/ruff)
## ✨ Features
- 🤖 **Natural Language Interface** — Talk to your media library in plain language
- 🔍 **Smart Search** — Find movies and TV shows via TMDB with rich metadata
- 📥 **Torrent Integration** — Search and download via qBittorrent
- 🧠 **Contextual Memory** — Remembers your preferences and conversation history
- 📁 **Auto-Organization** — Moves and renames media files, resolves destinations, handles subtitles
- 🎞️ **Subtitle Pipeline** — Identifies, matches, and places subtitle tracks automatically
- 🔄 **Workflow Engine** — YAML-defined multi-step workflows (e.g. `organize_media`)
- 🌐 **OpenAI-Compatible API** — Works with any OpenAI-compatible client (LibreChat, OpenWebUI, etc.)
- 🔒 **Secure by Default** — Auto-generated secrets and encrypted credentials
## 🏗️ Architecture
Built with **Domain-Driven Design (DDD)** principles for clean separation of concerns:
```
alfred/
├── agent/ # AI agent orchestration
│ ├── llm/ # LLM clients (Ollama, DeepSeek)
│ ├── tools/ # Tool implementations (api, filesystem, language)
│ └── workflows/ # YAML-defined multi-step workflows
├── application/ # Use cases & DTOs
│ ├── movies/ # Movie search
│ ├── torrents/ # Torrent management
│ └── filesystem/ # File operations (move, list, subtitles, seed links)
├── domain/ # Business logic & entities
│ ├── media/ # Release parsing
│ ├── movies/ # Movie entities
│ ├── tv_shows/ # TV show entities & value objects
│ ├── subtitles/ # Subtitle scanner, services, knowledge base
│ └── shared/ # Common value objects (ImdbId, FilePath, FileSize)
└── infrastructure/ # External services & persistence
├── api/ # External API clients (TMDB, qBittorrent, Knaben)
├── filesystem/ # File manager (hard-link based, path-traversal safe)
├── persistence/ # Three-tier memory (LTM/STM/Episodic) + JSON repositories
└── subtitle/ # Subtitle infrastructure
```
### Key flows
**Agent execution:** `agent.step(user_input)` → LLM call → if tool_calls, execute each via registry → loop until no tool calls or `max_tool_iterations` → return final response.
**Media organization workflow:**
1. `resolve_destination` — Determines target folder/filename from release name
2. `move_media` — Hard-links file to library, deletes source
3. `manage_subtitles` — Scans, classifies, and places subtitle tracks
4. `create_seed_links` — Hard-links library file back to torrents/ for continued seeding
**Memory tiers:**
- **LTM** (`data/memory/ltm.json`) — Persisted config, media library, watchlist
- **STM** — Conversation history (capped at `MAX_HISTORY_MESSAGES`)
- **Episodic** — Transient search results, active downloads, recent errors
## 🚀 Quick Start
### Prerequisites
- **Python 3.14+**
- **uv** (dependency manager)
- **Docker & Docker Compose** (recommended for full stack)
- **API Keys:**
- TMDB API key ([get one here](https://www.themoviedb.org/settings/api))
- Optional: DeepSeek or other LLM provider keys
### Installation
```bash
# Clone the repository
git clone https://github.com/francwa/alfred_media_organizer.git
cd alfred_media_organizer
# Install dependencies
make install
# Install pre-commit hooks
make install-hooks
# Bootstrap environment (generates .env with secure secrets)
make bootstrap
# Validate your .env against the schema
make validate
# Edit .env with your API keys
nano .env
```
### Running with Docker (Recommended)
```bash
# Start all services (LibreChat + Alfred + MongoDB + Ollama)
make up
# Or start with specific profiles
make up p=rag,meili # Include RAG and Meilisearch
make up p=qbittorrent # Include qBittorrent
make up p=full # Everything
# View logs
make logs
# Stop all services
make down
```
The web interface will be available at **http://localhost:3080**
### Running Locally (Development)
```bash
uv run uvicorn alfred.app:app --reload --port 8000
```
## ⚙️ Configuration
### Settings system
`settings.toml` is the single source of truth. The schema flows:
```
settings.toml → settings_schema.py → settings_bootstrap.py → .env + .env.make → settings.py
```
To add a setting: define it in `settings.toml`, run `make bootstrap`, then access via `settings.my_new_setting`.
```bash
# First time setup
make bootstrap
# Validate existing .env against schema
make validate
# Re-run after settings.toml changes (existing secrets preserved)
make bootstrap
```
**Never commit `.env` or `.env.make`** — both are gitignored and auto-generated.
### Key settings (.env)
```bash
# --- CORE ---
MAX_HISTORY_MESSAGES=10
MAX_TOOL_ITERATIONS=10
# --- LLM ---
DEFAULT_LLM_PROVIDER=local # local (Ollama) | deepseek
OLLAMA_BASE_URL=http://ollama:11434
OLLAMA_MODEL=llama3.3:latest
LLM_TEMPERATURE=0.2
# --- API KEYS ---
TMDB_API_KEY=your-tmdb-key # Required for movie/show search
DEEPSEEK_API_KEY= # Optional
# --- SECURITY (auto-generated) ---
JWT_SECRET=<auto>
CREDS_KEY=<auto>
MONGO_PASSWORD=<auto>
```
## 🐳 Docker Services
### Docker Profiles
| Profile | Extra services | Use case |
|---------|---------------|----------|
| (default) | — | LibreChat + Alfred + MongoDB + Ollama |
| `meili` | Meilisearch | Fast full-text search |
| `rag` | RAG API + VectorDB (PostgreSQL) | Document retrieval |
| `qbittorrent` | qBittorrent | Torrent downloads |
| `full` | All of the above | Complete setup |
```bash
make up # Start (default profile)
make up p=full # Start with all services
make down # Stop
make restart # Restart
make logs # Follow logs
make ps # Container status
```
## 🛠️ Available Tools
| Tool | Description |
|------|-------------|
| `find_media_imdb_id` | Search for movies/TV shows on TMDB by title |
| `find_torrent` | Search for torrents across multiple indexers |
| `get_torrent_by_index` | Get detailed info about a specific result |
| `add_torrent_by_index` | Download a torrent from search results |
| `add_torrent_to_qbittorrent` | Add a torrent via magnet link directly |
| `resolve_destination` | Compute the target library path for a release |
| `move_media` | Hard-link a file to its library destination |
| `manage_subtitles` | Scan, classify, and place subtitle tracks |
| `create_seed_links` | Prepare torrent folder so qBittorrent keeps seeding |
| `learn` | Teach Alfred a new pattern (release group, naming convention) |
| `set_path_for_folder` | Configure folder paths |
| `list_folder` | List contents of a configured folder |
| `set_language` | Set preferred language for the session |
## 💬 Usage Examples
### Via Web Interface (LibreChat)
Navigate to **http://localhost:3080** and start chatting:
```
You: Find Inception in 1080p
Alfred: I found 3 torrents for Inception (2010):
1. Inception.2010.1080p.BluRay.x264 (150 seeders) - 2.1 GB
2. Inception.2010.1080p.WEB-DL.x265 (80 seeders) - 1.8 GB
3. Inception.2010.1080p.REMUX (45 seeders) - 25 GB
You: Download the first one
Alfred: ✓ Added to qBittorrent! Download started.
You: Organize the Breaking Bad S01 download
Alfred: ✓ Resolved destination: /tv_shows/Breaking.Bad/Season 01/
✓ Moved 6 episode files
✓ Placed 6 subtitle tracks (fr, en)
✓ Seed links created in /torrents/
```
### Via API
```bash
# Health check
curl http://localhost:8000/health
# Chat (OpenAI-compatible)
curl -X POST http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "alfred",
"messages": [{"role": "user", "content": "Find The Matrix 4K"}]
}'
# List models
curl http://localhost:8000/v1/models
# View memory state
curl http://localhost:8000/memory/state
```
Alfred is compatible with any OpenAI-compatible client. Point it at `http://localhost:8000/v1`, model `alfred`.
## 🧠 Memory System
Alfred uses a three-tier memory system:
| Tier | Storage | Contents | Lifetime |
|------|---------|----------|----------|
| **LTM** | JSON file (`data/memory/ltm.json`) | Config, library, watchlist, learned patterns | Permanent |
| **STM** | RAM | Conversation history (capped) | Session |
| **Episodic** | RAM | Search results, active downloads, errors | Short-lived |
## 🧪 Development
### Running Tests
```bash
# Run full suite (parallel)
make test
# Run with coverage report
make coverage
# Run a single file
uv run pytest tests/test_agent.py -v
# Run a single class
uv run pytest tests/test_agent.py::TestAgentInit -v
# Skip slow tests
uv run pytest -m "not slow"
```
### Test coverage
The suite covers:
- **Agent loop** — tool execution, history, max iterations, error handling
- **Tool registry** — OpenAI schema format, parameter extraction
- **Prompts** — system prompt building, tool inclusion
- **Memory** — LTM/STM/Episodic operations, persistence
- **Filesystem tools** — path traversal security, folder listing
- **File manager** — hard-link, move, seed links (real filesystem, no mocks)
- **Application use cases** — `resolve_destination`, `create_seed_links`, `list_folder`, `move_media`
- **Domain** — TV show/movie entities, shared value objects (`ImdbId`, `FilePath`, `FileSize`), subtitle scanner
- **Repositories** — JSON-backed movie, TV show, subtitle repos
- **Bootstrap** — secret generation, idempotency, URI construction
- **Workflows** — YAML loading, structure validation
- **Configuration** — boundary validation for all settings
### Code Quality
```bash
make lint # Ruff check --fix
make format # Ruff format + check --fix
```
### Adding a New Tool
1. Implement the function in `alfred/agent/tools/`:
```python
# alfred/agent/tools/api.py
def my_new_tool(param: str) -> dict[str, Any]:
"""Short description shown to the LLM to decide when to call this tool."""
memory = get_memory()
# ...
return {"status": "ok", "data": result}
```
2. Register it in `alfred/agent/registry.py`:
```python
tool_functions = [
# ... existing tools ...
api_tools.my_new_tool,
]
```
The registry auto-generates the JSON schema from the function signature and docstring.
### Adding a Workflow
Create a YAML file in `alfred/agent/workflows/`:
```yaml
name: my_workflow
description: What this workflow does
steps:
- tool: resolve_destination
description: Find where the file should go
- tool: move_media
description: Move the file
```
Workflows are loaded automatically at startup.
### Version Management
```bash
# Must be on main branch
make patch # 0.1.7 → 0.1.8
make minor # 0.1.7 → 0.2.0
make major # 0.1.7 → 1.0.0
```
## 📚 API Reference
### Endpoints
| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/health` | Health check |
| `GET` | `/v1/models` | List models (OpenAI-compatible) |
| `POST` | `/v1/chat/completions` | Chat (OpenAI-compatible, streaming supported) |
| `GET` | `/memory/state` | Full memory dump (debug) |
| `POST` | `/memory/clear-session` | Clear STM + Episodic |
| `GET` | `/memory/episodic/search-results` | Current search results |
## 🔧 Troubleshooting
### Agent doesn't respond
1. Check API keys in `.env`
2. Verify the LLM is running:
```bash
docker logs alfred-ollama
docker exec alfred-ollama ollama list
```
3. Check Alfred logs: `docker logs alfred-core`
### qBittorrent connection failed
1. Verify qBittorrent is running: `docker ps | grep qbittorrent`
2. Check credentials in `.env` (`QBITTORRENT_URL`, `QBITTORRENT_USERNAME`, `QBITTORRENT_PASSWORD`)
### Memory not persisting
1. Check `data/` directory is writable
2. Verify volume mounts in `docker-compose.yaml`
### Bootstrap fails
```bash
make validate # Check what's wrong with .env
make bootstrap # Regenerate (preserves existing secrets)
```
### Tests failing
```bash
uv run pytest tests/test_failing.py -v --tb=long
```
## 🤝 Contributing
1. Fork the repository
2. Create a feature branch: `git checkout -b feat/my-feature`
3. Make your changes + add tests
4. Run `make test && make lint && make format`
5. Commit with [Conventional Commits](https://www.conventionalcommits.org/): `feat:`, `fix:`, `docs:`, `refactor:`, `test:`, `chore:`, `infra:`
6. Open a Pull Request
## 📄 License
MIT License — see [LICENSE](LICENSE) file for details.
## 🙏 Acknowledgments
- [LibreChat](https://github.com/danny-avila/LibreChat) — Chat interface
- [Ollama](https://ollama.ai/) — Local LLM runtime
- [DeepSeek](https://www.deepseek.com/) — LLM provider
- [TMDB](https://www.themoviedb.org/) — Movie & TV database
- [qBittorrent](https://www.qbittorrent.org/) — Torrent client
- [FastAPI](https://fastapi.tiangolo.com/) — Web framework
- [uv](https://github.com/astral-sh/uv) — Fast Python package manager
---
<p align="center">Made with ❤️ by <a href="https://github.com/francwa">Francwa</a></p>
View File
-79
View File
@@ -1,79 +0,0 @@
"""Expression loader — charge et merge les fichiers YAML d'expressions par user."""
import random
from pathlib import Path
import yaml
_USERS_DIR = Path(__file__).parent.parent / "knowledge" / "users"
def _load_yaml(path: Path) -> dict:
if not path.exists():
return {}
return yaml.safe_load(path.read_text(encoding="utf-8")) or {}
def load_expressions(username: str | None) -> dict:
"""
Charge common.yaml et le merge avec {username}.yaml.
Retourne un dict avec :
- nickname: str (surnom de l'user, ou username en fallback)
- expressions: dict[situation -> list[str]]
"""
common = _load_yaml(_USERS_DIR / "common.yaml")
user_data = _load_yaml(_USERS_DIR / f"{username}.yaml") if username else {}
# Merge expressions : common + user (les phrases user s'ajoutent)
common_exprs: dict[str, list] = common.get("expressions", {})
user_exprs: dict[str, list] = user_data.get("expressions", {})
merged: dict[str, list] = {}
all_situations = set(common_exprs) | set(user_exprs)
for situation in all_situations:
base = list(common_exprs.get(situation, []))
extra = list(user_exprs.get(situation, []))
merged[situation] = base + extra
nickname = user_data.get("user", {}).get("nickname") or username or "mec"
return {
"nickname": nickname,
"expressions": merged,
}
def pick(expressions: dict, situation: str, nickname: str | None = None) -> str:
"""
Pioche une expression aléatoire pour une situation donnée.
Résout {user} avec le nickname si fourni.
Retourne une string vide si la situation n'existe pas.
"""
options = expressions.get("expressions", {}).get(situation, [])
if not options:
return ""
chosen = random.choice(options)
if nickname:
chosen = chosen.replace("{user}", nickname)
return chosen
def build_expressions_context(username: str | None) -> dict:
"""
Point d'entrée principal.
Retourne :
- nickname: str
- samples: dict[situation -> une phrase résolue] — une seule par situation
"""
data = load_expressions(username)
nickname = data["nickname"]
samples = {
situation: pick(data, situation, nickname) for situation in data["expressions"]
}
return {
"nickname": nickname,
"samples": samples,
}
-333
View File
@@ -1,333 +0,0 @@
"""Prompt builder for the agent system."""
import json
from typing import Any
from alfred.infrastructure.persistence_TO_CHECK import get_memory
from alfred.infrastructure.persistence_TO_CHECK.memory import MemoryRegistry
from .expressions import build_expressions_context
from .registry import Tool
from .workflows_TO_CHECK import WorkflowLoader
# Tools that are always available, regardless of workflow scope.
# Kept small on purpose — the noyau is what the agent uses to either
# answer trivially or pivot into a workflow.
CORE_TOOLS: tuple[str, ...] = (
"set_language",
"set_path_for_folder",
"list_folder",
"read_release_metadata",
"query_library",
"start_workflow",
"end_workflow",
)
class PromptBuilder:
"""Builds system prompts for the agent with memory context."""
def __init__(
self,
tools: dict[str, Tool],
workflow_loader: WorkflowLoader | None = None,
):
self.tools = tools
self.workflow_loader = workflow_loader or WorkflowLoader()
self._memory_registry = MemoryRegistry()
def _active_workflow(self, memory) -> dict | None:
"""Return the YAML definition of the active workflow, or None."""
current = memory.stm.workflow.current
if current is None:
return None
return self.workflow_loader.get(current.get("name"))
def visible_tool_names(self) -> list[str]:
"""
Return the names of the tools currently in scope.
- Idle (no workflow): core noyau only. The LLM enters a workflow
via start_workflow to access more tools.
- Workflow active: core noyau + the workflow's declared tools.
"""
memory = get_memory()
visible = set(CORE_TOOLS)
workflow = self._active_workflow(memory)
if workflow is not None:
for name in workflow.get("tools", []):
visible.add(name)
# Only return tools that actually exist in the registry.
return [name for name in self.tools if name in visible]
def _format_identity(self, memory) -> str:
"""Build Alfred's identity and personality section."""
username = memory.stm.get_entity("username")
expr = build_expressions_context(username)
nickname = expr["nickname"]
samples = expr["samples"]
# Format expressions as situational guidance for the LLM
expr_lines = []
situation_labels = {
"greeting": "Salutation",
"success": "Succès",
"working": "En cours",
"error": "Erreur",
"unclear": "Demande floue",
"warning": "Avertissement",
"not_found": "Introuvable",
}
for situation, label in situation_labels.items():
phrase = samples.get(situation, "")
if phrase:
expr_lines.append(f' {label}: "{phrase}"')
expressions_block = "\n".join(expr_lines)
return f"""Tu t'appelles Alfred. Tu es un assistant d'organisation de médiathèque — direct, opérationnel, légèrement impertinent.
PERSONNALITÉ:
- Tu parles franglais : français avec des touches d'anglais quand ça colle mieux
- Tu es sarcastique et n'as pas peur de te moquer si l'user fait une connerie
- Tu envoies chier poliment (mais clairement) quand la demande est trop floue
- Tu ne fais pas de blabla inutile. Si "ok c'est fait" suffit, c'est tout ce que tu dis
- Tu peux jurer (putain, merde, con, ...) — c'est naturel, pas du remplissage
- Jamais de "Great question!" ou de politesse creuse
USER COURANT: {nickname}
EXPRESSIONS À UTILISER (une par situation, naturellement intégrées dans ta réponse) :
{expressions_block}"""
def build_tools_spec(self) -> list[dict[str, Any]]:
"""Build the tool specification for the LLM API (scope-filtered)."""
visible = set(self.visible_tool_names())
tool_specs = []
for tool in self.tools.values():
if tool.name not in visible:
continue
spec = {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters,
},
}
tool_specs.append(spec)
return tool_specs
def _format_tools_description(self) -> str:
"""Format the currently-visible tools with description + params."""
visible = set(self.visible_tool_names())
visible_tools = [t for t in self.tools.values() if t.name in visible]
if not visible_tools:
return ""
return "\n".join(
f"- {tool.name}: {tool.description}\n"
f" Parameters: {json.dumps(tool.parameters, ensure_ascii=False)}"
for tool in visible_tools
)
def _format_workflow_scope(self, memory) -> str:
"""Describe the current workflow scope so the LLM has a plan."""
workflow = self._active_workflow(memory)
if workflow is None:
available = self.workflow_loader.names()
if not available:
return ""
lines = ["WORKFLOW SCOPE: idle (broad catalog narrowed to core noyau)."]
lines.append(
" Call start_workflow(workflow_name, params) to enter a scope."
)
lines.append(" Available workflows:")
for name in available:
wf = self.workflow_loader.get(name) or {}
desc = (wf.get("description") or "").strip().splitlines()
summary = desc[0] if desc else ""
lines.append(f" - {name}: {summary}")
return "\n".join(lines)
current = memory.stm.workflow.current or {}
lines = [
f"WORKFLOW SCOPE: active — {current.get('name')} "
f"(stage: {current.get('stage')})",
]
params = current.get("params")
if params:
lines.append(f" Params: {params}")
wf_desc = (workflow.get("description") or "").strip()
if wf_desc:
lines.append(f" Goal: {wf_desc}")
steps = workflow.get("steps", [])
if steps:
lines.append(" Steps:")
for step in steps:
step_id = step.get("id", "?")
step_tool = step.get("tool") or (
"ask_user" if step.get("ask_user") else ""
)
lines.append(f" - {step_id} ({step_tool})")
lines.append(" Call end_workflow(reason) when done, cancelled, or off-topic.")
return "\n".join(lines)
def _format_episodic_context(self, memory) -> str:
"""Format episodic memory context for the prompt."""
lines = []
if memory.episodic.last_search_results:
results = memory.episodic.last_search_results
result_list = results.get("results", [])
lines.append(
f"\nLAST SEARCH: '{results.get('query')}' ({len(result_list)} results)"
)
# Show first 5 results
for i, result in enumerate(result_list[:5]):
name = result.get("name", "Unknown")
lines.append(f" {i + 1}. {name}")
if len(result_list) > 5:
lines.append(f" ... and {len(result_list) - 5} more")
if memory.episodic.pending_question:
question = memory.episodic.pending_question
lines.append(f"\nPENDING QUESTION: {question.get('question')}")
lines.append(f" Type: {question.get('type')}")
if question.get("options"):
lines.append(f" Options: {len(question.get('options'))}")
if memory.episodic.active_downloads:
lines.append(f"\nACTIVE DOWNLOADS: {len(memory.episodic.active_downloads)}")
for dl in memory.episodic.active_downloads[:3]:
lines.append(f" - {dl.get('name')}: {dl.get('progress', 0)}%")
if memory.episodic.recent_errors:
lines.append("\nRECENT ERRORS (up to 3):")
for error in memory.episodic.recent_errors[-3:]:
lines.append(
f" - Action '{error.get('action')}' failed: {error.get('error')}"
)
# Unread events
unread = [e for e in memory.episodic.background_events if not e.get("read")]
if unread:
lines.append(f"\nUNREAD EVENTS: {len(unread)}")
for event in unread[:3]:
lines.append(f" - {event.get('type')}: {event.get('data')}")
return "\n".join(lines)
def _format_stm_context(self, memory) -> str:
"""Format short-term memory context for the prompt."""
lines = []
if memory.stm.current_workflow:
workflow = memory.stm.current_workflow
lines.append(
f"CURRENT WORKFLOW: {workflow.get('name')} (stage: {workflow.get('stage')})"
)
if workflow.get("params"):
lines.append(f" Params: {workflow.get('params')}")
if memory.stm.current_topic:
lines.append(f"CURRENT TOPIC: {memory.stm.current_topic}")
if memory.stm.extracted_entities:
lines.append("EXTRACTED ENTITIES:")
for key, value in memory.stm.extracted_entities.items():
lines.append(f" - {key}: {value}")
if memory.stm.language:
lines.append(f"CONVERSATION LANGUAGE: {memory.stm.language}")
return "\n".join(lines)
def _format_memory_schema(self) -> str:
"""Describe available memory components so the agent knows what to read/write and when."""
schema = self._memory_registry.schema()
tier_labels = {
"ltm": "LONG-TERM (persisted)",
"stm": "SHORT-TERM (session)",
"episodic": "EPISODIC (volatile)",
}
lines = ["MEMORY COMPONENTS:"]
for tier, components in schema.items():
if not components:
continue
lines.append(f"\n [{tier_labels.get(tier, tier.upper())}]")
for c in components:
access = c.get("access", "read")
lines.append(f" {c['name']} ({access}): {c['description']}")
for field_name, field_desc in c.get("fields", {}).items():
lines.append(f" · {field_name}: {field_desc}")
return "\n".join(lines)
def _format_config_context(self, memory) -> str:
"""Format configuration context."""
lines = ["CURRENT CONFIGURATION:"]
folders = {
**memory.ltm.workspace.as_dict(),
**memory.ltm.library_paths.to_dict(),
}
if folders:
for key, value in folders.items():
lines.append(f" - {key}: {value}")
else:
lines.append(" (no configuration set)")
return "\n".join(lines)
def build_system_prompt(self) -> str:
"""Build the complete system prompt."""
memory = get_memory()
# Identity + personality
identity = self._format_identity(memory)
# Language instruction
language_instruction = (
"Si la langue de l'user est différente de la langue courante en STM, "
"appelle `set_language` en premier avant de répondre."
)
# Configuration
config_section = self._format_config_context(memory)
# STM context
stm_context = self._format_stm_context(memory)
# Episodic context
episodic_context = self._format_episodic_context(memory)
# Memory schema
memory_schema = self._format_memory_schema()
# Workflow scope (active workflow plan or list of options)
workflow_section = self._format_workflow_scope(memory)
# Available tools (already filtered by scope)
tools_desc = self._format_tools_description()
tools_section = f"\nOUTILS DISPONIBLES:\n{tools_desc}" if tools_desc else ""
rules = """
RÈGLES:
- Utilise les outils pour accomplir les tâches, pas pour décorer
- Si des résultats de recherche sont dispo en mémoire épisodique, référence-les par index
- Confirme toujours avant une opération destructive (move, delete, overwrite)
- Réponses courtes — si c'est fait, dis-le en une ligne
- Si la demande est floue, demande un éclaircissement AVANT de lancer quoi que ce soit
"""
sections = [
identity,
language_instruction,
config_section,
stm_context,
episodic_context,
memory_schema,
workflow_section,
tools_section,
rules,
]
return "\n\n".join(s for s in sections if s and s.strip())
-178
View File
@@ -1,178 +0,0 @@
"""Tool registry — defines and registers all available tools for the agent."""
import inspect
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from .tools_TO_CHECK.spec import ToolSpec, ToolSpecError
from .tools_TO_CHECK.spec_loader import load_tool_specs
logger = logging.getLogger(__name__)
@dataclass
class Tool:
"""Represents a tool that can be used by the agent."""
name: str
description: str
func: Callable[..., dict[str, Any]]
parameters: dict[str, Any]
cache_key: str | None = None # Parameter name to use as STM cache key.
_PY_TYPE_TO_JSON = {
str: "string",
int: "integer",
float: "number",
bool: "boolean",
list: "array",
dict: "object",
}
def _json_type_for(annotation) -> str:
"""Map a Python type annotation to a JSON Schema 'type' string."""
if annotation is inspect.Parameter.empty:
return "string"
# Strip Optional[X] / X | None to X.
args = getattr(annotation, "__args__", None)
if args:
non_none = [a for a in args if a is not type(None)]
if len(non_none) == 1:
annotation = non_none[0]
return _PY_TYPE_TO_JSON.get(annotation, "string")
def _create_tool_from_function(func: Callable, spec: ToolSpec | None = None) -> Tool:
"""
Create a Tool object from a function, optionally enriched with a spec.
Types and required-ness always come from the Python signature (source of
truth for the API contract). When a spec is provided, the description
and per-parameter docs come from the YAML spec instead of the docstring.
"""
sig = inspect.signature(func)
sig_params = {name: p for name, p in sig.parameters.items() if name != "self"}
if spec is not None:
_validate_spec_matches_signature(func.__name__, sig_params, spec)
description = spec.compile_description()
param_descriptions = {
name: spec.compile_parameter_description(name) for name in sig_params
}
else:
doc = inspect.getdoc(func)
description = doc.strip().split("\n")[0] if doc else func.__name__
param_descriptions = {name: f"Parameter {name}" for name in sig_params}
properties: dict[str, dict[str, Any]] = {}
required: list[str] = []
for param_name, param in sig_params.items():
properties[param_name] = {
"type": _json_type_for(param.annotation),
"description": param_descriptions[param_name],
}
if param.default is inspect.Parameter.empty:
required.append(param_name)
parameters = {
"type": "object",
"properties": properties,
"required": required,
}
cache_key = spec.cache.key if spec is not None and spec.cache is not None else None
return Tool(
name=func.__name__,
description=description,
func=func,
parameters=parameters,
cache_key=cache_key,
)
def _validate_spec_matches_signature(
func_name: str,
sig_params: dict[str, inspect.Parameter],
spec: ToolSpec,
) -> None:
"""Ensure every signature param has a spec entry and vice versa."""
sig_names = set(sig_params.keys())
spec_names = set(spec.parameters.keys())
missing_in_spec = sig_names - spec_names
if missing_in_spec:
raise ToolSpecError(
f"tool '{func_name}': spec is missing entries for parameter(s) "
f"{sorted(missing_in_spec)}"
)
extra_in_spec = spec_names - sig_names
if extra_in_spec:
raise ToolSpecError(
f"tool '{func_name}': spec has entries for unknown parameter(s) "
f"{sorted(extra_in_spec)} (not in function signature)"
)
def make_tools(settings) -> dict[str, Tool]:
"""
Create and register all available tools.
Args:
settings: Application settings instance.
Returns:
Dictionary mapping tool names to Tool objects.
"""
from .tools_TO_CHECK import api as api_tools # noqa: PLC0415
from .tools_TO_CHECK import filesystem as fs_tools # noqa: PLC0415
from .tools_TO_CHECK import language as lang_tools # noqa: PLC0415
from .tools_TO_CHECK import workflow as wf_tools # noqa: PLC0415
tool_functions = [
fs_tools.set_path_for_folder,
fs_tools.list_folder,
fs_tools.read_release_metadata,
fs_tools.query_library,
fs_tools.analyze_release,
fs_tools.probe_media,
fs_tools.resolve_season_destination,
fs_tools.resolve_episode_destination,
fs_tools.resolve_movie_destination,
fs_tools.resolve_series_destination,
fs_tools.move_media,
fs_tools.move_to_destination,
fs_tools.manage_subtitles,
fs_tools.create_seed_links,
fs_tools.learn,
api_tools.find_media_imdb_id,
api_tools.find_torrent,
api_tools.add_torrent_by_index,
api_tools.add_torrent_to_qbittorrent,
api_tools.get_torrent_by_index,
lang_tools.set_language,
wf_tools.start_workflow,
wf_tools.end_workflow,
]
specs = load_tool_specs()
tools: dict[str, Tool] = {}
for func in tool_functions:
spec = specs.get(func.__name__)
tool = _create_tool_from_function(func, spec=spec)
tools[tool.name] = tool
with_spec = sum(1 for fn in tool_functions if fn.__name__ in specs)
logger.info(
f"Registered {len(tools)} tools "
f"({with_spec} with YAML spec, {len(tools) - with_spec} doc-only): "
f"{list(tools.keys())}"
)
return tools
-23
View File
@@ -1,23 +0,0 @@
"""Tools module — agent-exposed wrappers.
Re-exports are intentionally minimal during the ``unfuck`` refactor.
Tool wiring (registry / specs / LLM-facing surface) is the last
chunk of work on this branch; until then, importers should reach
into the submodules directly (``alfred.agent.tools.filesystem``, …).
"""
from .api import (
add_torrent_by_index,
add_torrent_to_qbittorrent,
find_torrent,
get_torrent_by_index,
)
from .language import set_language
__all__ = [
"find_torrent",
"get_torrent_by_index",
"add_torrent_to_qbittorrent",
"add_torrent_by_index",
"set_language",
]
-373
View File
@@ -1,373 +0,0 @@
"""Filesystem tools for folder management.
Thin wrappers around the 5 atomic filesystem use cases
(``alfred.application.filesystem``) plus a few self-contained tools
(``analyze_release``, ``probe_media``, ``learn``, …).
Tools removed during the ``unfuck`` filesystem refactor — to be
rewired in a later step:
- ``manage_subtitles`` (depends on the rewritten subtitle services)
- ``set_path_for_folder`` (no replacement use case yet)
- ``create_seed_links`` (flow has changed: hard-link straight to
library, no copy back; will be re-introduced per-file when the
organize-release workflow lands)
- ``resolve_season_destination`` / ``resolve_episode_destination``
/ ``resolve_movie_destination`` / ``resolve_series_destination``
(their use cases moved to ``_OLD`` files pending a rewrite)
"""
from pathlib import Path
from typing import Any
import yaml
import alfred as _alfred_pkg
from alfred.application.filesystem import (
DirectoryRoots,
create_dir_use_case,
list_dir_use_case,
move_file_use_case,
)
from alfred.infrastructure.knowledge_TO_CHECK.release_kb import YamlReleaseKnowledge
from alfred.infrastructure.metadata_TO_CHECK import MetadataStore
from alfred.infrastructure.persistence_TO_CHECK import get_memory
from alfred.infrastructure.probe_TO_CHECK import FfprobeMediaProber
# Agent-tools frontier: this is the legitimate home for the singletons that
# back every LLM-exposed wrapper. The use cases below take ``kb`` / ``prober``
# as required params; tests inject their own stubs.
_KB = YamlReleaseKnowledge()
_PROBER = FfprobeMediaProber()
_LEARNED_ROOT = Path(_alfred_pkg.__file__).parent.parent / "data" / "knowledge"
class _RootsNotConfigured(Exception):
"""Raised when one of the 4 expected roots is missing from memory."""
def __init__(self, missing: list[str]):
super().__init__(f"Roots not configured: {missing}")
self.missing = missing
def _load_directory_roots() -> DirectoryRoots:
"""Build :class:`DirectoryRoots` from the persisted memory.
Reads:
- ``ltm.workspace.download`` → ``downloads``
- ``ltm.workspace.torrent`` → ``torrents``
- ``ltm.library_paths['movies']`` → ``movies``
- ``ltm.library_paths['tv_shows']`` → ``tv_shows``
Raises:
_RootsNotConfigured: if any of the four paths is unset.
"""
memory = get_memory()
downloads = memory.ltm.workspace.download
torrents = memory.ltm.workspace.torrent
movies = memory.ltm.library_paths.get("movies")
tv_shows = memory.ltm.library_paths.get("tv_shows")
missing: list[str] = []
if not downloads:
missing.append("downloads")
if not torrents:
missing.append("torrents")
if not movies:
missing.append("movies")
if not tv_shows:
missing.append("tv_shows")
if missing:
raise _RootsNotConfigured(missing)
return DirectoryRoots(
downloads=Path(downloads),
torrents=Path(torrents),
movies=Path(movies),
tv_shows=Path(tv_shows),
)
def _roots_error(exc: _RootsNotConfigured) -> dict[str, Any]:
return {
"status": "error",
"error": "roots_not_configured",
"message": (
f"Missing roots: {exc.missing}. "
"Configure them via /set_path before using filesystem tools."
),
}
# ---------------------------------------------------------------------------
# 5 atomic filesystem tools — thin wrappers over the use cases.
# ---------------------------------------------------------------------------
def list_folder(path: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/list_folder.yaml."""
try:
roots = _load_directory_roots()
except _RootsNotConfigured as e:
return _roots_error(e)
return list_dir_use_case(Path(path), roots).to_dict()
def create_directory(path: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/create_directory.yaml."""
try:
roots = _load_directory_roots()
except _RootsNotConfigured as e:
return _roots_error(e)
return create_dir_use_case(Path(path), roots).to_dict()
def move_media(source: str, destination: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/move_media.yaml."""
try:
roots = _load_directory_roots()
except _RootsNotConfigured as e:
return _roots_error(e)
return move_file_use_case(Path(source), Path(destination), roots).to_dict()
def move_to_destination(source: str, destination: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/move_to_destination.yaml.
Convenience tool that creates the destination's parent directory
if missing, then moves the file. Saves the LLM from having to
chain ``create_directory`` + ``move_media`` explicitly.
"""
try:
roots = _load_directory_roots()
except _RootsNotConfigured as e:
return _roots_error(e)
dst = Path(destination)
mkdir_resp = create_dir_use_case(dst.parent, roots)
if mkdir_resp.status != "ok":
return mkdir_resp.to_dict()
return move_file_use_case(Path(source), dst, roots).to_dict()
# ---------------------------------------------------------------------------
# Self-contained tools — not impacted by the filesystem refactor.
# ---------------------------------------------------------------------------
def learn(pack: str, category: str, key: str, values: list[str]) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/learn.yaml."""
_VALID_PACKS = {"subtitles"}
_VALID_CATEGORIES = {"languages", "types", "formats"}
if pack not in _VALID_PACKS:
return {
"status": "error",
"error": "unknown_pack",
"message": f"Unknown pack '{pack}'. Valid: {sorted(_VALID_PACKS)}",
}
if category not in _VALID_CATEGORIES:
return {
"status": "error",
"error": "unknown_category",
"message": f"Unknown category '{category}'. Valid: {sorted(_VALID_CATEGORIES)}",
}
learned_path = _LEARNED_ROOT / "subtitles_learned.yaml"
_LEARNED_ROOT.mkdir(parents=True, exist_ok=True)
data: dict = {}
if learned_path.exists():
try:
with open(learned_path, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
except Exception as e:
return {"status": "error", "error": "read_failed", "message": str(e)}
cat_data = data.setdefault(category, {})
entry = cat_data.setdefault(key, {"tokens": []})
existing = entry.get("tokens", [])
new_tokens = [v for v in values if v not in existing]
entry["tokens"] = existing + new_tokens
tmp = learned_path.with_suffix(".yaml.tmp")
try:
with open(tmp, "w", encoding="utf-8") as f:
yaml.safe_dump(
data, f, allow_unicode=True, default_flow_style=False, sort_keys=False
)
tmp.rename(learned_path)
except Exception as e:
tmp.unlink(missing_ok=True)
return {"status": "error", "error": "write_failed", "message": str(e)}
return {
"status": "ok",
"pack": pack,
"category": category,
"key": key,
"added_count": len(new_tokens),
"tokens": entry["tokens"],
}
def analyze_release(release_name: str, source_path: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/analyze_release.yaml."""
from alfred.application.release_TO_CHECK import inspect_release # noqa: PLC0415
result = inspect_release(release_name, Path(source_path), _KB, _PROBER)
parsed = result.parsed
return {
"status": "ok",
"media_type": parsed.media_type,
"parse_path": parsed.parse_path,
"title": parsed.title,
"year": parsed.year,
"season": parsed.season,
"episode": parsed.episode,
"episode_end": parsed.episode_end,
"quality": parsed.quality,
"source": parsed.source,
"codec": parsed.codec,
"group": parsed.group,
"languages": parsed.languages,
"audio_codec": parsed.audio_codec,
"audio_channels": parsed.audio_channels,
"bit_depth": parsed.bit_depth,
"hdr_format": parsed.hdr_format,
"edition": parsed.edition,
"site_tag": parsed.site_tag,
"is_season_pack": parsed.is_season_pack,
"probe_used": result.probe_used,
"confidence": result.report.confidence,
"road": result.report.road,
"recommended_action": result.recommended_action,
}
def probe_media(source_path: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/probe_media.yaml."""
path = Path(source_path)
if not path.exists():
return {
"status": "error",
"error": "not_found",
"message": f"{source_path} does not exist",
}
media_info = _PROBER.probe(path)
if media_info is None:
return {
"status": "error",
"error": "probe_failed",
"message": "ffprobe failed to read the file",
}
return {
"status": "ok",
"video": {
"codec": media_info.video_codec,
"resolution": media_info.resolution,
"width": media_info.width,
"height": media_info.height,
"duration_seconds": media_info.duration_seconds,
"bitrate_kbps": media_info.bitrate_kbps,
},
"audio_tracks": [
{
"index": t.index,
"codec": t.codec,
"channels": t.channels,
"channel_layout": t.channel_layout,
"language": t.language,
"is_default": t.is_default,
}
for t in media_info.audio_tracks
],
"subtitle_tracks": [
{
"index": t.index,
"codec": t.codec,
"language": t.language,
"is_default": t.is_default,
"is_forced": t.is_forced,
}
for t in media_info.subtitle_tracks
],
"audio_languages": media_info.audio_languages,
"is_multi_audio": media_info.is_multi_audio,
}
def read_release_metadata(release_path: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/read_release_metadata.yaml."""
path = Path(release_path)
if not path.exists():
return {
"status": "error",
"error": "not_found",
"message": f"{release_path} does not exist",
}
root = path if path.is_dir() else path.parent
store = MetadataStore(root)
if not store.exists():
return {
"status": "ok",
"release_path": str(root),
"has_metadata": False,
"metadata": {},
}
return {
"status": "ok",
"release_path": str(root),
"has_metadata": True,
"metadata": store.load(),
}
def query_library(name: str) -> dict[str, Any]:
"""Thin tool wrapper — semantics live in alfred/agent/tools/specs/query_library.yaml."""
needle = name.strip().lower()
if not needle:
return {
"status": "error",
"error": "empty_name",
"message": "name must be a non-empty string",
}
memory = get_memory()
roots = memory.ltm.library_paths.to_dict() or {}
if not roots:
return {
"status": "error",
"error": "no_libraries",
"message": "No library paths configured — call set_path_for_folder first.",
}
matches: list[dict[str, Any]] = []
for collection, root in roots.items():
root_path = Path(root)
if not root_path.is_dir():
continue
for entry in root_path.iterdir():
if not entry.is_dir():
continue
if needle not in entry.name.lower():
continue
store = MetadataStore(entry)
matches.append(
{
"collection": collection,
"name": entry.name,
"path": str(entry),
"has_metadata": store.exists(),
}
)
return {
"status": "ok",
"query": name,
"match_count": len(matches),
"matches": matches,
}
-221
View File
@@ -1,221 +0,0 @@
"""
ToolSpec — semantic description of a tool, loaded from YAML.
Each tool exposed to the agent has a matching YAML spec under
alfred/agent/tools/specs/{tool_name}.yaml. The spec carries everything the
LLM needs to decide *when* and *why* to call the tool — separated from the
Python signature, which remains the source of truth for *how* (types,
required-ness).
The YAML structure is documented in the dataclasses below. Loading a spec
validates its shape; missing or unexpected fields raise ToolSpecError.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
import yaml
class ToolSpecError(ValueError):
"""Raised when a YAML tool spec is malformed or inconsistent."""
@dataclass(frozen=True)
class ParameterSpec:
"""Semantic description of a single tool parameter."""
description: str # Short: what the value represents.
why_needed: str # Why the tool needs this — drives LLM reasoning.
example: str | None = None # Concrete example value, shown to the LLM.
@classmethod
def from_dict(cls, name: str, data: dict) -> ParameterSpec:
_require(data, "description", f"parameter '{name}'")
_require(data, "why_needed", f"parameter '{name}'")
return cls(
description=str(data["description"]).strip(),
why_needed=str(data["why_needed"]).strip(),
example=str(data["example"]).strip()
if data.get("example") is not None
else None,
)
@dataclass(frozen=True)
class ReturnsSpec:
"""Description of one possible return shape (ok / needs_clarification / error / ...)."""
description: str
fields: dict[str, str] = field(default_factory=dict)
@classmethod
def from_dict(cls, key: str, data: dict) -> ReturnsSpec:
_require(data, "description", f"returns.{key}")
fields = data.get("fields") or {}
if not isinstance(fields, dict):
raise ToolSpecError(
f"returns.{key}.fields must be a dict, got {type(fields).__name__}"
)
return cls(
description=str(data["description"]).strip(),
fields={str(k): str(v).strip() for k, v in fields.items()},
)
@dataclass(frozen=True)
class CacheSpec:
"""Marks a tool as cacheable in STM.tool_results, keyed by one of its parameters."""
key: str # Name of the parameter whose value is the cache key.
@classmethod
def from_dict(cls, data: dict) -> CacheSpec:
_require(data, "key", "cache")
return cls(key=str(data["key"]).strip())
@dataclass(frozen=True)
class ToolSpec:
"""Full semantic spec for one tool."""
name: str
summary: str # One-liner — becomes Tool.description.
description: str # Longer paragraph.
when_to_use: str
when_not_to_use: str | None
next_steps: str | None
parameters: dict[str, ParameterSpec] # name -> ParameterSpec
returns: dict[str, ReturnsSpec] # status_key -> ReturnsSpec
cache: CacheSpec | None = None # If present, tool is cached.
@classmethod
def from_yaml_path(cls, path: Path) -> ToolSpec:
with open(path, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
raise ToolSpecError(f"{path}: top-level must be a mapping")
try:
return cls.from_dict(data)
except ToolSpecError as e:
raise ToolSpecError(f"{path}: {e}") from e
@classmethod
def from_dict(cls, data: dict) -> ToolSpec:
_require(data, "name", "spec")
_require(data, "summary", "spec")
_require(data, "description", "spec")
_require(data, "when_to_use", "spec")
params_raw = data.get("parameters") or {}
if not isinstance(params_raw, dict):
raise ToolSpecError("parameters must be a mapping")
parameters = {
pname: ParameterSpec.from_dict(pname, pdata or {})
for pname, pdata in params_raw.items()
}
returns_raw = data.get("returns") or {}
if not isinstance(returns_raw, dict):
raise ToolSpecError("returns must be a mapping")
returns = {
rkey: ReturnsSpec.from_dict(rkey, rdata or {})
for rkey, rdata in returns_raw.items()
}
cache_raw = data.get("cache")
if cache_raw is not None and not isinstance(cache_raw, dict):
raise ToolSpecError("cache must be a mapping")
cache = CacheSpec.from_dict(cache_raw) if cache_raw else None
spec = cls(
name=str(data["name"]).strip(),
summary=str(data["summary"]).strip(),
description=str(data["description"]).strip(),
when_to_use=str(data["when_to_use"]).strip(),
when_not_to_use=_strip_or_none(data.get("when_not_to_use")),
next_steps=_strip_or_none(data.get("next_steps")),
parameters=parameters,
returns=returns,
cache=cache,
)
if cache is not None and cache.key not in parameters:
raise ToolSpecError(
f"cache.key '{cache.key}' is not a declared parameter "
f"(declared: {sorted(parameters)})"
)
return spec
def compile_description(self) -> str:
"""
Build the long description text passed to the LLM as Tool.description.
Layout:
<summary>
<description>
When to use:
<when_to_use>
When NOT to use: (if present)
<when_not_to_use>
Next steps: (if present)
<next_steps>
Returns:
<status>: <description>
· <field>: <desc>
"""
parts = [self.summary, "", self.description]
parts += ["", "When to use:", _indent(self.when_to_use)]
if self.when_not_to_use:
parts += ["", "When NOT to use:", _indent(self.when_not_to_use)]
if self.next_steps:
parts += ["", "Next steps:", _indent(self.next_steps)]
if self.returns:
parts += ["", "Returns:"]
for status, ret in self.returns.items():
parts.append(f" {status}: {ret.description}")
for fname, fdesc in ret.fields.items():
parts.append(f" · {fname}: {fdesc}")
return "\n".join(parts)
def compile_parameter_description(self, name: str) -> str:
"""Build the JSON Schema 'description' field for one parameter."""
p = self.parameters.get(name)
if p is None:
raise ToolSpecError(f"tool '{self.name}': no spec for parameter '{name}'")
text = f"{p.description} (Why: {p.why_needed})"
if p.example:
text += f" Example: {p.example}"
return text
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _require(data: dict, key: str, where: str) -> None:
if data.get(key) is None or (isinstance(data[key], str) and not data[key].strip()):
raise ToolSpecError(f"{where}: missing required field '{key}'")
def _strip_or_none(value) -> str | None:
if value is None:
return None
s = str(value).strip()
return s or None
def _indent(text: str, prefix: str = " ") -> str:
return "\n".join(prefix + line for line in text.splitlines())
@@ -1,53 +0,0 @@
"""
ToolSpecLoader — discover and load all YAML tool specs from a directory.
Convention: one YAML file per tool, named exactly like the Python function
that implements it (e.g. resolve_season_destination.yaml).
"""
from __future__ import annotations
import logging
from pathlib import Path
from .spec import ToolSpec, ToolSpecError
logger = logging.getLogger(__name__)
_DEFAULT_SPECS_DIR = Path(__file__).parent / "specs"
def load_tool_specs(specs_dir: Path | None = None) -> dict[str, ToolSpec]:
"""
Load every {tool}.yaml under specs_dir into a {name -> ToolSpec} mapping.
Args:
specs_dir: Directory to scan. Defaults to alfred/agent/tools/specs/.
Returns:
Mapping from tool name to its parsed ToolSpec.
Raises:
ToolSpecError: if a spec is malformed, or if the filename doesn't
match the 'name' field inside the YAML.
"""
root = specs_dir or _DEFAULT_SPECS_DIR
if not root.exists():
logger.warning(f"Tool specs directory not found: {root}")
return {}
specs: dict[str, ToolSpec] = {}
for path in sorted(root.glob("*.yaml")):
spec = ToolSpec.from_yaml_path(path)
expected_name = path.stem
if spec.name != expected_name:
raise ToolSpecError(
f"{path}: filename stem '{expected_name}' "
f"does not match spec.name '{spec.name}'"
)
if spec.name in specs:
raise ToolSpecError(f"duplicate tool spec name: '{spec.name}'")
specs[spec.name] = spec
logger.info(f"Loaded {len(specs)} tool spec(s) from {root}")
return specs
@@ -1,53 +0,0 @@
name: add_torrent_by_index
summary: >
Pick a torrent from the last find_torrent results by index and add
it to qBittorrent in one call.
description: |
Convenience wrapper that combines get_torrent_by_index +
add_torrent_to_qbittorrent. Looks up the torrent at the given
1-based index, extracts its magnet link, and sends it to
qBittorrent. The result mirrors add_torrent_to_qbittorrent's, with
the chosen torrent's name appended on success.
when_to_use: |
The default action after find_torrent when the user picks a hit by
number ("download the second one"). One call, two side effects:
episodic memory updated + download started.
when_not_to_use: |
- When the user only wants to inspect, not download — use
get_torrent_by_index.
- When the magnet comes from outside the search results — use
add_torrent_to_qbittorrent directly.
next_steps: |
- On status=ok: confirm the download started and end the workflow
if not already ended.
- On status=error (not_found): the index is out of range; show the
available count from episodic memory.
- On status=error (no_magnet): the search result was malformed —
suggest re-running find_torrent.
parameters:
index:
description: 1-based position of the torrent in the last find_torrent results.
why_needed: |
Identifies which torrent to add. Out-of-range indices return
not_found.
example: 3
returns:
ok:
description: Torrent was added to qBittorrent.
fields:
status: "'ok'"
message: Confirmation message.
torrent_name: Name of the torrent that was added.
error:
description: Failed to add.
fields:
error: Short error code (not_found, no_magnet, ...).
message: Human-readable explanation.
@@ -1,48 +0,0 @@
name: add_torrent_to_qbittorrent
summary: >
Send a magnet link to qBittorrent and start the download.
description: |
Adds a torrent to qBittorrent using its WebUI API. On success, the
download is also recorded in episodic memory as an active_download
so the agent can track its progress later, the STM topic is set to
"downloading", and the current workflow is ended (the user typically
leaves the find-and-download scope at this point).
when_to_use: |
When the user provides a raw magnet link, or when chaining manually
after get_torrent_by_index. For the common "user picked search hit
N" case, prefer add_torrent_by_index — one call instead of two.
when_not_to_use: |
- For .torrent files (not supported by this tool — magnet only).
- When qBittorrent is not configured / reachable — the call will
fail and the user has to fix the config first.
next_steps: |
- On status=ok: the workflow is already ended; confirm to the user
that the download has started.
- On status=error: surface the message; common causes are auth
failure or qBittorrent being unreachable.
parameters:
magnet_link:
description: Magnet URI of the torrent to add (magnet:?xt=urn:btih:...).
why_needed: |
The actual payload sent to qBittorrent. Must be a full magnet
URI, not a hash alone.
example: "magnet:?xt=urn:btih:abc123..."
returns:
ok:
description: Torrent accepted by qBittorrent.
fields:
status: "'ok'"
message: Confirmation message.
error:
description: qBittorrent rejected the request or is unreachable.
fields:
error: Short error code.
message: Human-readable explanation.
@@ -1,85 +0,0 @@
name: analyze_release
summary: >
One-shot analyzer that parses a release name, detects its media type
from the folder layout, and enriches the result with ffprobe data.
description: |
Combines three steps in a single call so the agent gets a complete
picture before routing:
1. parse_release(release_name) — extracts title, year, season,
episode, quality, source, codec, group, languages, audio info,
HDR, edition, site tag.
2. detect_media_type(parsed, path) — uses the on-disk layout
(single file vs. folder, presence of S01 dirs, episode count)
to choose: movie / tv_episode / tv_season / tv_complete /
other / unknown.
3. ffprobe enrichment — when the media type is recognised, runs
ffprobe on the first video file found and fills in audio
codec/channels, bit depth, HDR format. Sets probe_used=true.
when_to_use: |
As the very first step of any organize workflow, right after
list_folder, on each release the user wants to handle. The output
drives which resolve_*_destination to call next.
when_not_to_use: |
- When you only need codec/audio info on a specific video file:
use probe_media (no parsing, no media-type detection).
- For releases the user has already analyzed earlier in the same
workflow — the parse is deterministic, no need to re-run.
next_steps: |
- media_type == movie → resolve_movie_destination
- media_type == tv_season → resolve_season_destination
- media_type == tv_episode → resolve_episode_destination
- media_type == tv_complete → resolve_series_destination
- media_type in (other, unknown) → ask the user what to do; do not
auto-route.
cache:
key: source_path
parameters:
release_name:
description: Raw release folder or file name as it appears on disk.
why_needed: |
Source of all the parsed tokens (quality, codec, group, ...).
Don't sanitise it — the parser relies on the exact spelling.
example: Breaking.Bad.S01.1080p.BluRay.x265-GROUP
source_path:
description: Absolute path to the release folder or file on disk.
why_needed: |
Required for layout-based media-type detection and for ffprobe
to find a video file inside the release.
example: /downloads/Breaking.Bad.S01.1080p.BluRay.x265-GROUP
returns:
ok:
description: Release analyzed.
fields:
status: "'ok'"
media_type: "One of: movie, tv_episode, tv_season, tv_complete, other, unknown."
parse_path: "Which parser branch was taken (debug)."
title: Parsed title.
year: Parsed year (int) or null.
season: Season number (int) or null.
episode: Episode number (int) or null.
episode_end: Range end episode (multi-episode releases) or null.
quality: Resolution token (e.g. 1080p, 2160p).
source: Source token (BluRay, WEB-DL, ...).
codec: Video codec token (x264, x265, ...).
group: Release group name or null.
languages: List of detected language tokens.
audio_codec: Audio codec from ffprobe (when probe_used=true).
audio_channels: Audio channel count from ffprobe.
bit_depth: Bit depth from ffprobe.
hdr_format: HDR format from ffprobe (HDR10, DV, ...) or null.
edition: Edition tag (Extended, Director's Cut, ...) or null.
site_tag: Source-site tag if present.
is_season_pack: True when the folder contains a full season.
probe_used: True when ffprobe successfully enriched the result.
confidence: Parser confidence score, 0100 (higher = more reliable).
road: "Parser road: 'easy' (group schema matched), 'shitty' (heuristic but acceptable), or 'path_of_pain' (low confidence — ask the user before auto-routing)."
recommended_action: "Orchestrator hint: 'process' (go straight to resolve_*_destination), 'ask_user' (media_type unknown or road=path_of_pain — confirm with the user first), or 'skip' (no main video, or media_type=other — nothing to organize)."
@@ -1,59 +0,0 @@
name: create_seed_links
summary: >
Recreate the original torrent folder structure with hard-links so
qBittorrent can keep seeding after the library move.
description: |
Hard-links the library video file back into torrents/<original_folder_name>/
and copies all remaining files from the original download folder
(subtitles, .nfo, .jpg, .txt, …) so the torrent data is complete on
disk. qBittorrent then sees the same content at the location it
expects and can keep seeding without rehashing the whole torrent.
when_to_use: |
Only when the user has confirmed they want to keep seeding after a
move. Call right after manage_subtitles (or after move_media if there
are no subs).
when_not_to_use: |
- When the user explicitly answered "no" to "keep seeding?".
- When the download was not from a torrent (e.g. direct download).
- Before the library file is in place — this tool reads it.
next_steps: |
- After success: optionally call qBittorrent to update the torrent's
save path / force a recheck (not yet covered by a tool).
- End the workflow.
parameters:
library_file:
description: Absolute path to the video file now in the library.
why_needed: |
The source for the hard-link — same inode means qBittorrent sees
identical bytes at the seeding path.
example: /tv_shows/Oz.1997.1080p.WEBRip.x265-KONTRAST/Season 03/Oz.S03E01.mkv
original_download_folder:
description: Absolute path to the original download folder.
why_needed: |
Provides the folder name to recreate under torrents/ and the
auxiliary files (subs, nfo, ...) to copy over.
example: /downloads/Oz.S03.1080p.WEBRip.x265-KONTRAST
returns:
ok:
description: Seeding folder rebuilt.
fields:
status: "'ok'"
torrent_subfolder: Absolute path of the recreated folder under torrents/.
linked_file: Absolute path of the hard-linked video.
copied_files: List of auxiliary files that were copied.
copied_count: Number of auxiliary files copied.
skipped: List of files skipped (already present, unreadable, ...).
error:
description: Failed to rebuild the seeding folder.
fields:
error: Short error code.
message: Human-readable explanation.
@@ -1,48 +0,0 @@
name: end_workflow
summary: >
Leave the current workflow scope and return to the broad-catalog mode.
description: |
Clears the active workflow from STM. After this call the visible tool
catalog returns to the core noyau plus start_workflow, so the agent is
ready to handle a different request.
when_to_use: |
- When all the workflow's steps have completed successfully.
- When the user explicitly cancels the current task.
- When the user changes subject mid-conversation and the active
workflow is no longer relevant.
- When an unrecoverable error makes continuing pointless — explain
in 'reason'.
when_not_to_use: |
- Do not call when there is no active workflow — it will return an
error. Just call start_workflow for the new request instead.
- Do not call mid-step just to "free up tools"; finish the step
or fail it explicitly first.
next_steps: |
- After ending, you can either call start_workflow for a new task or
answer the user directly from the broad catalog.
parameters:
reason:
description: Short reason for ending — completed, cancelled, changed_subject, error, ...
why_needed: |
Recorded in episodic memory for debugging and future audits. A
structured short string is more useful than a long sentence.
example: completed
returns:
ok:
description: Workflow ended; catalog is back to the broad noyau.
fields:
workflow: Name of the workflow that just ended.
reason: The reason that was passed in.
error:
description: Could not end — typically because nothing was active.
fields:
error: Short error code (no_active_workflow).
message: Human-readable explanation.
@@ -1,56 +0,0 @@
name: find_media_imdb_id
summary: >
Search TMDB for a media title and return its canonical title, year,
IMDb id, and TMDB id.
description: |
Looks up a title on TMDB and returns the canonical metadata needed by
the resolve_*_destination tools. On success, the result is also
stashed in short-term memory under "last_media_search" so later steps
in the workflow can read it without re-calling TMDB. The STM topic
is set to "searching_media".
when_to_use: |
Right after analyze_release, before calling resolve_*_destination —
the resolvers need the canonical title + year and refuse to guess
them from the raw release name.
when_not_to_use: |
- When you already have the IMDb id in STM from an earlier step in
the same workflow.
- For torrent search — use find_torrent instead.
next_steps: |
- On status=ok: call the appropriate resolve_*_destination with
tmdb_title and tmdb_year from the result.
- On status=error (not_found): show the error and ask the user for
a more precise title.
cache:
key: media_title
parameters:
media_title:
description: Title to search for. Free-form — TMDB does the matching.
why_needed: |
Drives the TMDB query. Pass a sanitized version (no resolution
tokens, no group name) for best results.
example: Breaking Bad
returns:
ok:
description: Match found.
fields:
status: "'ok'"
title: Canonical title as returned by TMDB.
year: Release year (movies) or first-air year (series).
media_type: "'movie' or 'tv'."
imdb_id: IMDb identifier (ttXXXXXXX) or null.
tmdb_id: TMDB numeric id.
error:
description: No match or API failure.
fields:
error: Short error code (not_found, api_error, ...).
message: Human-readable explanation.
@@ -1,52 +0,0 @@
name: find_torrent
summary: >
Search Knaben for torrents matching a media title; cache results in
episodic memory.
description: |
Queries the Knaben aggregator for up to 10 torrents matching the
given title, then stores the result list in episodic memory under
"last_search_results". The user can then refer to a torrent by
1-based index ("download the 3rd one") via get_torrent_by_index or
add_torrent_by_index. The STM topic is set to "selecting_torrent".
when_to_use: |
When the user wants to download something new — typically the first
step of a "find + download" sub-task. The agent should usually
pre-filter the title (canonical name + year) before searching for
cleaner results.
when_not_to_use: |
- For TMDB metadata lookup — use find_media_imdb_id.
- When a search was already performed in the same session and the
user is just picking from the existing list.
next_steps: |
- Present the indexed results to the user.
- Once chosen: call add_torrent_by_index(N) — that wraps
get_torrent_by_index + add_torrent_to_qbittorrent.
cache:
key: media_title
parameters:
media_title:
description: Title to search for on Knaben. Free-form.
why_needed: |
Drives the search query. Use the canonical title (from
find_media_imdb_id) plus quality preferences for better hits.
example: Inception 2010 1080p
returns:
ok:
description: Search returned a list of torrents.
fields:
status: "'ok'"
torrents: "List of {name, size, seeders, leechers, magnet, ...}, up to 10."
error:
description: Search failed.
fields:
error: Short error code.
message: Human-readable explanation.
@@ -1,48 +0,0 @@
name: get_torrent_by_index
summary: >
Retrieve a torrent from the last find_torrent search by its 1-based
index.
description: |
Reads episodic memory's last_search_results and returns the entry at
the given 1-based position. Pure lookup — does not start a download.
Fails when the search results are missing or the index is out of
range.
when_to_use: |
When the user references a search hit by number ("show me the second
one") but doesn't yet want to download — e.g. inspection, sharing
the magnet, ...
when_not_to_use: |
- When the user wants to start downloading: use add_torrent_by_index
instead (one call instead of two).
- When no search has been performed yet — the result will be
not_found.
next_steps: |
- Display the torrent to the user.
- If they then say "add it", call add_torrent_to_qbittorrent with the
magnet, or add_torrent_by_index with the same index.
parameters:
index:
description: 1-based position in the last find_torrent result list.
why_needed: |
Maps to a specific torrent entry. Out-of-range values return an
error, not a wraparound.
example: 3
returns:
ok:
description: Torrent found at that index.
fields:
status: "'ok'"
torrent: "Full torrent dict (name, size, seeders, leechers, magnet, ...)."
error:
description: No torrent at that index.
fields:
error: Short error code (not_found).
message: Human-readable explanation, e.g. "Search for torrents first."
@@ -1,76 +0,0 @@
name: learn
summary: >
Teach Alfred a new token mapping and persist it to the learned
knowledge pack so future scans recognise it.
description: |
Appends a new token (or list of tokens) to a key inside a knowledge
pack and writes the result to `data/knowledge/<pack>_learned.yaml`.
The change is persisted atomically (write-tmp + rename) so a crash
cannot corrupt the file. Currently only the `subtitles` pack is
supported.
when_to_use: |
When manage_subtitles returns needs_clarification with unresolved
tokens, after confirming with the user what the tokens mean. Call
once per (category, key) — multiple values can be added in a single
call.
when_not_to_use: |
- Without explicit user confirmation of what the token means.
- For knowledge that belongs in the static pack
(alfred/knowledge/<pack>.yaml) — that's editor territory, not
runtime learning.
next_steps: |
- After success: re-run the workflow step that triggered the
clarification (typically manage_subtitles) so the new mapping is
applied.
parameters:
pack:
description: Knowledge pack name. Currently only "subtitles" is supported.
why_needed: |
Decides which `*_learned.yaml` file under data/knowledge/ gets
written. The pack name is namespaced to avoid collisions across
domains.
example: subtitles
category:
description: Category within the pack — "languages", "types", or "formats".
why_needed: |
Different categories use different lookup tables at scan time.
A wrong category silently has no effect.
example: languages
key:
description: Canonical entry id — ISO 639-1 code, type name, format name.
why_needed: |
The destination bucket for the new tokens. Existing tokens under
this key are kept; only new values are appended.
example: es
values:
description: List of token spellings to add.
why_needed: |
Release groups use many spellings for the same language/type;
pass them all in one call instead of multiple round-trips.
example: '["spanish", "espanol", "spa"]'
returns:
ok:
description: Mapping saved.
fields:
status: "'ok'"
pack: Name of the pack that was written to.
category: Category that was updated.
key: Key that was updated.
added_count: Number of values that were actually new (deduplicated).
tokens: Full updated token list for that key.
error:
description: Save failed.
fields:
error: Short error code (unknown_pack, unknown_category, read_failed, write_failed).
message: Human-readable explanation.
@@ -1,63 +0,0 @@
name: list_folder
summary: >
List the contents of a configured folder, optionally below a
relative subpath.
description: |
Reads a folder previously configured via set_path_for_folder and
returns its entries (files + directories). A relative `path` lets you
drill down without re-specifying the absolute root each time. Path
traversal is rejected (no `..`, no absolute paths) so the agent
cannot escape the configured root.
when_to_use: |
- At the start of an organize workflow to discover what's available
in the download folder.
- To browse a library collection ("what tv shows do I have?").
- As a sanity check before any move to confirm the target exists.
when_not_to_use: |
- For folders that are not configured — call set_path_for_folder
first.
- To list arbitrary system paths — this tool is intentionally scoped
to the known roots.
next_steps: |
- After listing the download folder: typically call analyze_release
on a specific entry.
- After listing a library folder: use the result to disambiguate a
destination during resolve_*_destination.
cache:
key: path
parameters:
folder_type:
description: Logical folder key (download, torrent, movie, tv_show, ...).
why_needed: |
Resolves to an absolute root through LTM. Must have been set via
set_path_for_folder beforehand.
example: download
path:
description: Relative subpath inside the root (default ".").
why_needed: |
Lets you drill into a subfolder without expanding the root. No
".." or absolute path is allowed.
example: Breaking.Bad.S01.1080p.BluRay.x265-GROUP
returns:
ok:
description: Listing returned.
fields:
status: "'ok'"
folder_type: The key that was listed.
path: The relative path that was listed.
entries: List of {name, type, size?} for each entry.
error:
description: Could not list the folder.
fields:
error: Short error code (folder_not_configured, path_not_found, path_traversal, ...).
message: Human-readable explanation.
@@ -1,67 +0,0 @@
name: manage_subtitles
summary: >
Detect, filter, and place subtitle tracks next to a video that has just
been organised into the library.
description: |
Scans the source video's surroundings for subtitle files
(.srt, .ass, .ssa, .vtt, .sub), classifies them by language and type
(standard / SDH / forced), filters by the user's SubtitlePreferences
(languages, min size, keep_sdh, keep_forced), and hard-links the
passing files next to the destination video using the convention
`<lang>.<ext>`, `<lang>.sdh.<ext>`, `<lang>.forced.<ext>`.
If no subtitles are found, returns status=ok with placed_count=0 — not
an error.
when_to_use: |
Always after a successful move_media / move_to_destination, before
closing the workflow. Pass the original source path (where subs live)
and the new library path (where they should land).
when_not_to_use: |
- Do not call before the video itself has been moved — the destination
must exist for hard-links to make sense.
- Skip when the user explicitly asks not to handle subtitles.
next_steps: |
- On status=ok: continue with create_seed_links (if seeding) or end
the workflow.
- On status=needs_clarification: ask the user about the unresolved
tokens, then optionally call learn() to teach the new mapping.
parameters:
source_video:
description: Absolute path to the original video file (in the download folder).
why_needed: |
Subtitles typically live next to the source, either as siblings or
in a Subs/ subfolder. The scanner walks from this path.
example: /downloads/Oz.S03.1080p.WEBRip.x265-KONTRAST/Oz.S03E01.mkv
destination_video:
description: Absolute path to the video file in its library location.
why_needed: |
Subtitles are hard-linked next to this file so media players pick
them up automatically.
example: /tv_shows/Oz.1997.1080p.WEBRip.x265-KONTRAST/Season 03/Oz.S03E01.mkv
returns:
ok:
description: Subtitles scanned (and possibly placed).
fields:
status: "'ok'"
placed: List of {source, destination, filename} for each linked file.
placed_count: Number of subtitle files placed.
skipped_count: Number of subtitle files filtered out.
needs_clarification:
description: One or more tokens could not be classified.
fields:
unresolved: List of unrecognised tokens with their context.
question: Human-readable question to relay to the user.
error:
description: Scan or placement failed.
fields:
error: Short error code.
message: Human-readable explanation.
@@ -1,58 +0,0 @@
name: move_media
summary: >
Safely move a media file with copy + integrity check + delete source.
description: |
Copies the source file to the destination with an integrity check,
then deletes the source. Slower than move_to_destination (which is a
plain rename) but safer across filesystems where rename is not atomic
or when you want a checksum verification.
when_to_use: |
Use to move a single file across filesystems or when paranoia about
data integrity is justified — e.g. moving a finished download from a
scratch disk to the main library array.
when_not_to_use: |
- For same-filesystem moves where speed matters: use move_to_destination
(instant rename on ZFS/ext4 within the same dataset).
- For folder-level moves of complete packs: use move_to_destination —
move_media is a single-file operation.
next_steps: |
- After a successful move: call manage_subtitles to place any subtitle
tracks, then create_seed_links if the user wants to keep seeding.
- On error: surface the error code (file_not_found, destination_exists,
integrity_check_failed) and ask the user how to proceed.
parameters:
source:
description: Absolute path to the source video file.
why_needed: |
The file being moved. Typically lives under the downloads folder
after a torrent completes.
example: /downloads/Inception.2010.1080p.BluRay.x265-GROUP/movie.mkv
destination:
description: Absolute path of the destination file — must not already exist.
why_needed: |
Where the file lands in the library. Comes from a resolve_*_destination
call so the naming convention is respected.
example: /movies/Inception.2010.1080p.BluRay.x265-GROUP/Inception.2010.1080p.BluRay.x265-GROUP.mkv
returns:
ok:
description: Move succeeded.
fields:
status: "'ok'"
source: Absolute path of the source (now gone).
destination: Absolute path of the destination (now in place).
filename: Basename of the destination file.
size: Size in bytes.
error:
description: Move failed.
fields:
error: Short error code (file_not_found, destination_exists, integrity_check_failed, ...).
message: Human-readable explanation.
@@ -1,55 +0,0 @@
name: move_to_destination
summary: >
Move a file or folder to a destination, creating parent directories as needed.
description: |
Performs an actual move on disk. Uses the system 'mv' command, so on the
same filesystem (e.g. ZFS) this is an instant rename. Creates the parent
directory of the destination if it doesn't exist yet, then moves. Returns
before/after paths on success, or an error if the destination already
exists or the source can't be moved.
when_to_use: |
Use after one of the resolve_*_destination tools returned status=ok, to
perform the move it described. The 'source' and 'destination' arguments
come directly from the resolved paths.
when_not_to_use: |
- Never move when status was not 'ok' (clarification still pending or
error happened) — that would leave the library in a half-broken state.
- Don't use this for the seed-link step; use create_seed_links for that.
next_steps: |
- After a successful move: call manage_subtitles to place any subtitle
tracks, then create_seed_links to keep qBittorrent seeding.
- On error: surface the message; do not retry blindly — check whether
the destination already exists or the source path is correct.
parameters:
source:
description: Absolute path to the source file or folder to move.
why_needed: |
The thing being moved. Comes from the user's download folder or from
a previous tool's output.
example: /downloads/Oz.S03.1080p.WEBRip.x265-KONTRAST
destination:
description: Absolute path of the destination — must not already exist.
why_needed: |
Where to put the source. Comes from a resolve_*_destination call so
that the path matches the library's naming convention.
example: /tv_shows/Oz.1997.1080p.WEBRip.x265-KONTRAST/Oz.S03.1080p.WEBRip.x265-KONTRAST
returns:
ok:
description: Move succeeded.
fields:
source: Absolute path of the source (now gone).
destination: Absolute path of the destination (now in place).
error:
description: Move failed.
fields:
error: Short error code (source_not_found, destination_exists, mkdir_failed, move_failed).
message: Human-readable explanation of what went wrong.
@@ -1,56 +0,0 @@
name: probe_media
summary: >
Run ffprobe on a single video file and return its technical details.
description: |
Inspects a specific video file with ffprobe and returns codec,
resolution, duration, bitrate, the list of audio tracks (with
language and channel layout), and the list of embedded subtitle
tracks. Independent of any release-name parsing — works on any file
you can point at.
when_to_use: |
- To inspect a file's audio/subtitle tracks before deciding what to
do (e.g. choose a default audio language).
- To verify a video's resolution / codec when the release name is
unreliable.
- As a building block when analyze_release is overkill.
when_not_to_use: |
- For full release routing — analyze_release does parsing + media
type detection + probe in one call.
- On non-video files — ffprobe will return probe_failed.
next_steps: |
- The returned info typically feeds a user-facing decision (e.g.
"this is 7.1 DTS, want to keep it?"); rarely chained directly to
another tool.
cache:
key: source_path
parameters:
source_path:
description: Absolute path to the video file to probe.
why_needed: |
ffprobe needs the exact file (not a folder). For releases use
analyze_release; for a known file path, pass it here.
example: /downloads/Inception.2010.1080p.BluRay.x265-GROUP/movie.mkv
returns:
ok:
description: Probe succeeded.
fields:
status: "'ok'"
video: "Dict with codec, resolution, width, height, duration_seconds, bitrate_kbps."
audio_tracks: "List of {index, codec, channels, channel_layout, language, is_default}."
subtitle_tracks: "List of {index, codec, language, is_default, is_forced}."
audio_languages: List of language codes present in audio tracks.
is_multi_audio: True when more than one audio language is present.
error:
description: Probe failed.
fields:
error: Short error code (not_found, probe_failed).
message: Human-readable explanation.
@@ -1,54 +0,0 @@
name: query_library
summary: >
Find release folders across all configured library roots whose name
contains a substring (case-insensitive).
description: |
Scans every configured library root (movies, tv_shows, …) at depth 1
and returns folders whose name contains the query. For each match,
reports whether a `.alfred/metadata.yaml` exists — handy to spot
releases that have not been inspected yet. Does not recurse into
seasons / episodes; one entry per release folder.
when_to_use: |
- To answer "do I already have X?" without listing whole library
roots one by one.
- To pick the release_path to feed read_release_metadata or any
inspector tool.
when_not_to_use: |
- To list the *whole* library — that scan should live behind a
dedicated tool (not implemented yet).
- To browse a single root — use list_folder instead, it's cheaper
and doesn't open every library.
next_steps: |
- When one match is found: feed its path to read_release_metadata or
analyze_release.
- When several match: surface the indexed list to the user and ask
which one they mean.
parameters:
name:
description: Case-insensitive substring of the release name to look for.
why_needed: |
Library folders are named after the release (Title.Year.... or
Title (Year)). A substring is enough to catch typical user
phrasings ("foundation", "inception 2010").
example: foundation
returns:
ok:
description: Scan completed (possibly zero matches).
fields:
status: "'ok'"
query: The query string as received.
match_count: Number of matching folders.
matches: "List of {collection, name, path, has_metadata}."
error:
description: Scan could not run.
fields:
error: Short error code (no_libraries, empty_name).
message: Human-readable explanation.
@@ -1,55 +0,0 @@
name: read_release_metadata
summary: >
Read the `.alfred/metadata.yaml` file for a release folder.
description: |
Returns whatever has been previously persisted by inspector tools
(analyze_release, probe_media, find_media_imdb_id) and by the subtitle
pipeline. Works for any folder — download or library — as long as the
release has been touched at least once. Missing metadata is not an
error: the tool returns `has_metadata=false` with an empty dict.
when_to_use: |
- Before re-running analyze_release / probe_media on a release you
might have already seen — saves a full re-inspection.
- To answer "what do we know about X?" without scanning.
- To list which releases in a library have no `.alfred` yet (loop +
`has_metadata`).
when_not_to_use: |
- To search a library by name — use query_library.
- When you need a fresh probe/parse — call the inspector directly,
the result will be persisted automatically.
next_steps: |
- If `has_metadata=false`, decide whether to inspect now
(analyze_release / probe_media).
- If `has_metadata=true`, read `metadata.parse`, `metadata.probe`,
`metadata.tmdb` blocks before deciding next actions.
cache:
key: release_path
parameters:
release_path:
description: Absolute path to the release folder (or any file inside it).
why_needed: |
The store lives at `<release_root>/.alfred/metadata.yaml`. A file
path is auto-resolved to its parent folder.
example: /mnt/library/tv_shows/Foundation.2021.1080p.WEBRip.x265-RARBG
returns:
ok:
description: Release inspected (file may or may not exist).
fields:
status: "'ok'"
release_path: Absolute path of the release folder.
has_metadata: True if `.alfred/metadata.yaml` exists.
metadata: Full content of the file, or empty dict.
error:
description: Path does not exist on disk.
fields:
error: Short error code (not_found).
message: Human-readable explanation.
@@ -1,93 +0,0 @@
name: resolve_episode_destination
summary: >
Compute destination paths for a single TV episode file (file move).
description: |
Resolves the target series folder, season subfolder, and full destination
filename for a single-episode release. Returns paths only — does not move
anything. If a series folder with a different name already exists, returns
needs_clarification.
when_to_use: |
Use after analyze_release has identified the release as a single episode
(media_type=tv_show, season AND episode both set). TMDB must already be
queried for the canonical title/year, and optionally the episode title.
when_not_to_use: |
- Season packs (folder containing many episodes): use resolve_season_destination.
- Multi-season packs: use resolve_series_destination.
- Movies: use resolve_movie_destination.
next_steps: |
- On status=ok: call move_to_destination with the source video file and
destination=library_file.
- On status=needs_clarification: present question/options to the user,
then re-call with confirmed_folder set.
- On status=error: surface the message; do not move.
parameters:
release_name:
description: Raw release file name (with extension).
why_needed: |
Drives extraction of quality/source/codec/group, which become part of
the destination filename so each file is self-describing.
example: Oz.S03E01.1080p.WEBRip.x265-KONTRAST.mkv
source_file:
description: Absolute path to the source video file on disk.
why_needed: |
Used to read the source file extension (.mkv, .mp4, .avi…) for the
destination filename — release names don't always carry the extension.
example: /downloads/Oz.S03E01.1080p.WEBRip.x265-KONTRAST/file.mkv
tmdb_title:
description: Canonical show title from TMDB.
why_needed: |
Title prefix for both the series folder and the destination filename;
ensures consistent naming across all episodes of the show.
example: Oz
tmdb_year:
description: Show start year from TMDB.
why_needed: |
Disambiguates remakes/reboots sharing a title; year is part of the
series folder identity.
example: "1997"
tmdb_episode_title:
description: Episode title from TMDB. Optional.
why_needed: |
When present, the destination filename embeds the episode title for
human-readability (e.g. Oz.S01E01.The.Routine...).
example: The Routine
confirmed_folder:
description: Folder name the user picked after needs_clarification.
why_needed: |
Forces the use case to skip detection and use this exact folder name.
example: Oz.1997.1080p.WEBRip.x265-KONTRAST
returns:
ok:
description: Paths resolved; ready to move the episode file.
fields:
series_folder: Absolute path to the series root folder.
season_folder: Absolute path to the season subfolder.
library_file: Absolute path to the destination .mkv file (move target).
series_folder_name: Series folder name for display.
season_folder_name: Season folder name for display.
filename: Destination filename for display.
is_new_series_folder: True if the series folder doesn't exist yet.
needs_clarification:
description: A folder exists with a different name; user must choose.
fields:
question: Human-readable question.
options: List of folder names to pick from.
error:
description: Resolution failed.
fields:
error: Short error code.
message: Human-readable explanation.
@@ -1,72 +0,0 @@
name: resolve_movie_destination
summary: >
Compute destination paths for a movie file (file move).
description: |
Resolves the target movie folder and full destination filename for a movie
release. Returns paths only — does not move anything. Movies do not have
the existing-folder disambiguation problem that TV shows have (each
release lands in its own folder named after the canonical title + year +
tech).
when_to_use: |
Use after analyze_release has identified the release as a movie
(media_type=movie). TMDB must already be queried for the canonical title
and release year.
when_not_to_use: |
- TV shows in any form: use resolve_season_destination /
resolve_episode_destination / resolve_series_destination.
- Documentaries when they're treated as series rather than standalone
films: route them through the TV-show resolvers.
next_steps: |
- On status=ok: call move_to_destination with the source video file and
destination=library_file.
- On status=error: surface the message; do not move.
parameters:
release_name:
description: Raw release folder or file name.
why_needed: |
Drives extraction of quality/source/codec/group/edition tokens, which
become part of both the movie folder and filename so each release is
self-describing on disk.
example: Inception.2010.1080p.BluRay.x265-GROUP
source_file:
description: Absolute path to the source video file on disk.
why_needed: |
Used to read the file extension for the destination filename.
example: /downloads/Inception.2010.1080p.BluRay.x265-GROUP/movie.mkv
tmdb_title:
description: Canonical movie title from TMDB.
why_needed: |
Title prefix for the destination folder/file; ensures the library
uses the canonical title and not a sanitized release-name title.
example: Inception
tmdb_year:
description: Movie release year from TMDB.
why_needed: |
Disambiguates remakes that share a title (Dune 1984 vs Dune 2021)
and locks the folder identity in time.
example: "2010"
returns:
ok:
description: Paths resolved; ready to move.
fields:
movie_folder: Absolute path to the movie folder.
library_file: Absolute path to the destination .mkv file (move target).
movie_folder_name: Folder name for display.
filename: Destination filename for display.
is_new_folder: True if the movie folder doesn't exist yet.
error:
description: Resolution failed.
fields:
error: Short error code (e.g. library_not_set).
message: Human-readable explanation.
@@ -1,95 +0,0 @@
name: resolve_season_destination
summary: >
Compute destination paths for a season pack (folder move) in the TV library.
description: |
Resolves the target series folder and season subfolder for a complete-season
download. Returns the paths only — does not perform any move. If a series
folder for this show already exists in the library with a different name
(different group/quality/source), returns needs_clarification so the user
can decide whether to merge into the existing folder or create a new one.
when_to_use: |
Use after analyze_release has identified the release as a season pack
(media_type=tv_show, season set, episode unset). TMDB must already be
queried so tmdb_title and tmdb_year are canonical values, not raw tokens
from the release name.
when_not_to_use: |
- Single-episode files: use resolve_episode_destination instead.
- Multi-season packs (S01-S05 etc.): use resolve_series_destination.
- Movies: use resolve_movie_destination.
next_steps: |
- On status=ok: call move_to_destination with source=<download folder> and
destination=season_folder.
- On status=needs_clarification: present the question and options to the
user, then re-call this tool with confirmed_folder set to the user's pick.
- On status=error: surface the message to the user; do not move anything.
parameters:
release_name:
description: Raw release folder name as it appears on disk.
why_needed: |
Drives extraction of quality/source/codec/group tokens — these are
embedded in the target folder name (Title.Year.Quality.Source.Codec-GROUP)
to make releases self-describing on the filesystem.
example: Oz.S03.1080p.WEBRip.x265-KONTRAST
tmdb_title:
description: Canonical show title from TMDB.
why_needed: |
Builds the title prefix of the folder name. Must come from TMDB to
avoid typos and variant spellings present in the raw release name.
example: Oz
tmdb_year:
description: Show start year from TMDB.
why_needed: |
Disambiguates shows that share a title across decades (e.g. multiple
remakes of "The Office") and locks the folder identity.
example: "1997"
confirmed_folder:
description: |
Folder name chosen by the user after a previous needs_clarification
response.
why_needed: |
Short-circuits the existing-folder detection and forces the use case
to use this exact folder name, even if it doesn't match the computed
one.
example: Oz.1997.1080p.WEBRip.x265-KONTRAST
source_path:
description: |
Absolute path to the release folder on disk. Optional.
why_needed: |
When provided, the tool runs ffprobe on the main video inside the
folder and uses the probe data to fill quality/codec tokens that
may be missing from the release name. The enriched tech tokens
end up in the destination folder name, so providing source_path
gives more accurate names for releases with sparse metadata.
example: /downloads/Oz.S03.1080p.WEBRip.x265-KONTRAST
returns:
ok:
description: Paths resolved unambiguously; ready to move.
fields:
series_folder: Absolute path to the series root folder.
season_folder: Absolute path to the season subfolder (move target).
series_folder_name: Just the series folder name, for display.
season_folder_name: Just the season folder name, for display.
is_new_series_folder: True if the series folder doesn't exist yet.
needs_clarification:
description: A folder already exists with a different name; ask the user.
fields:
question: Human-readable question for the user.
options: List of folder names the user can pick from.
error:
description: Resolution failed (config missing, invalid release name, etc.).
fields:
error: Short error code (e.g. library_not_set).
message: Human-readable explanation.
@@ -1,87 +0,0 @@
name: resolve_series_destination
summary: >
Compute the destination path for a complete multi-season series pack (folder move).
description: |
Resolves the target series folder for a pack that contains multiple seasons
(e.g. S01-S05 in a single release). Returns only the series folder — the
whole source folder is moved as-is into the library, no per-season
restructuring. If a folder with a different name already exists for this
show, returns needs_clarification.
when_to_use: |
Use after analyze_release has identified the release as a complete-series
pack (media_type=tv_complete, or multi-season indicators). TMDB must
already be queried for canonical title/year.
when_not_to_use: |
- Single-season packs: use resolve_season_destination.
- Single episodes: use resolve_episode_destination.
- Movies: use resolve_movie_destination.
next_steps: |
- On status=ok: call move_to_destination with source=<download folder> and
destination=series_folder.
- On status=needs_clarification: ask the user, re-call with
confirmed_folder set.
- On status=error: surface the message; do not move.
parameters:
release_name:
description: Raw release folder name as it appears on disk.
why_needed: |
Drives extraction of quality/source/codec/group tokens for the target
folder name, even though the multi-season structure inside is kept
as-is.
example: The.Wire.S01-S05.1080p.BluRay.x265-GROUP
tmdb_title:
description: Canonical show title from TMDB.
why_needed: |
Title prefix of the series folder; comes from TMDB to avoid raw
release-name spellings.
example: The Wire
tmdb_year:
description: Show start year from TMDB.
why_needed: |
Disambiguates shows that share a title across eras and locks the
folder identity.
example: "2002"
confirmed_folder:
description: Folder name chosen by the user after needs_clarification.
why_needed: |
Forces the use case to use this exact folder name and skip detection.
example: The.Wire.2002.1080p.BluRay.x265-GROUP
source_path:
description: |
Absolute path to the release folder on disk. Optional.
why_needed: |
When provided, the tool runs ffprobe on the main video inside the
folder and uses probe data to fill quality/codec tokens that may
be missing from the release name, producing a more accurate
destination folder name.
example: /downloads/The.Wire.S01-S05.1080p.BluRay.x265-GROUP
returns:
ok:
description: Path resolved; ready to move the pack.
fields:
series_folder: Absolute path to the destination series folder.
series_folder_name: Folder name for display.
is_new_series_folder: True if the folder doesn't exist yet.
needs_clarification:
description: A folder exists with a different name; ask the user.
fields:
question: Human-readable question.
options: List of folder names to pick from.
error:
description: Resolution failed.
fields:
error: Short error code.
message: Human-readable explanation.
@@ -1,47 +0,0 @@
name: set_language
summary: >
Set the conversation language so all subsequent assistant messages
match it.
description: |
Persists an ISO 639-1 language code in short-term memory under
conversation.language. Read by the prompt builder and any tool that
needs to localise output. Does not validate the code against an ISO
list — the LLM is trusted to pass a sensible value.
when_to_use: |
As the very first call when the user writes in a language different
from the current STM language. Doing it before answering avoids a
mid-reply switch.
when_not_to_use: |
- On every turn — only when the language actually changes.
- To pick a subtitle language — that lives in SubtitlePreferences,
not the conversation language.
next_steps: |
- After success: continue the user's request in the newly set
language.
parameters:
language:
description: ISO 639-1 language code (en, fr, es, de, ...).
why_needed: |
Identifies the target language unambiguously across the UI and
any localisation logic.
example: fr
returns:
ok:
description: Language saved.
fields:
status: "'ok'"
message: Confirmation message.
language: The language code that was saved.
error:
description: Could not save the language.
fields:
status: "'error'"
error: Short error code or exception message.
@@ -1,58 +0,0 @@
name: set_path_for_folder
summary: >
Configure where a known folder lives on disk (download, torrent, or
any library collection).
description: |
Stores an absolute path in long-term memory under a folder key. Two
classes of folders exist:
- Workspace paths: "download", "torrent" — single-valued each, used
by the organize workflows.
- Library paths: any other key (e.g. "movie", "tv_show",
"documentary") — these are the collections you organise into.
The path must exist and be a directory; otherwise the call fails
without changing memory.
when_to_use: |
On first run, or when the user moves a folder, or when introducing a
new library collection (e.g. "set the documentaries folder to ...").
when_not_to_use: |
- For one-off listings — list_folder works without configuration only
if the folder is already set.
- To rename or delete an existing folder — this only sets paths.
next_steps: |
- After success: typical follow-ups are list_folder on the same key,
or starting a workflow that needs the path.
parameters:
folder_name:
description: Logical name of the folder (download, torrent, movie, tv_show, ...).
why_needed: |
The key the agent uses everywhere afterwards. "download" and
"torrent" are reserved for workspace; anything else becomes a
library collection.
example: tv_show
path_value:
description: Absolute path to the folder on disk.
why_needed: |
Must exist and be readable. Stored verbatim in LTM — relative
paths are rejected.
example: /tank/library/tv_shows
returns:
ok:
description: Path saved to long-term memory.
fields:
status: "'ok'"
folder_name: The logical name that was set.
path_value: The absolute path that was saved.
error:
description: Could not set the path.
fields:
error: Short error code (path_not_found, not_a_directory, invalid_path, ...).
message: Human-readable explanation.
@@ -1,64 +0,0 @@
name: start_workflow
summary: >
Enter a workflow scope — narrows the visible tool catalog and gives the
agent a clear multi-step plan to follow.
description: |
Activates a named workflow defined in YAML under agent/workflows/.
Once active, only the workflow's declared tools (plus the core noyau)
are exposed to the LLM, which keeps the decision space small and
focused. The returned plan (description + steps) is the script the
agent should execute until end_workflow is called.
when_to_use: |
Use as the very first action whenever the user request maps to a
known workflow (e.g. "organize Breaking Bad" → media.organize_media).
Pass any parameters you already know (release name, target media,
flags) in 'params' so later steps can read them from STM.
when_not_to_use: |
- Do not start a workflow for purely conversational replies or
one-shot lookups that need a single tool call.
- Do not start a new workflow while one is already active — call
end_workflow first.
next_steps: |
- On status=ok: follow the returned 'steps' list, calling the tools
in order. The visible tool catalog has already been narrowed.
- On status=error (unknown_workflow): surface the available list to
the user and ask which one they meant.
- On status=error (workflow_already_active): either continue the
active workflow or call end_workflow first.
parameters:
workflow_name:
description: Fully-qualified name of the workflow to start (e.g. media.organize_media).
why_needed: |
Identifies which YAML definition to load. Names use the
'domain.action' convention (media.*, mail.*, ...).
example: media.organize_media
params:
description: Initial parameters to seed the workflow with (release name, target, flags).
why_needed: |
Later steps read these from STM instead of asking the user again.
Pass whatever you already extracted from the user's message.
example: '{"release_name": "Breaking.Bad.S01.1080p.BluRay.x265-GROUP", "keep_seeding": true}'
returns:
ok:
description: Workflow activated; catalog has been narrowed.
fields:
workflow: Name of the activated workflow.
description: Human-readable description of what the workflow does.
steps: Ordered list of steps to execute.
tools: Tools that are now visible (in addition to the core noyau).
error:
description: Could not activate the workflow.
fields:
error: Short error code (unknown_workflow, workflow_already_active).
message: Human-readable explanation.
available_workflows: List of valid workflow names (only on unknown_workflow).
active_workflow: Name of the currently active workflow (only on workflow_already_active).
-86
View File
@@ -1,86 +0,0 @@
"""Workflow scoping tools — start_workflow / end_workflow meta-tools.
These tools let the agent enter and leave a workflow scope. While a
workflow is active, the PromptBuilder narrows the visible tool catalog
to the noyau + the workflow's declared tools, so the LLM doesn't have
to reason over the full set.
"""
import logging
from typing import Any
from alfred.infrastructure.persistence_TO_CHECK import get_memory
from ..workflows_TO_CHECK import WorkflowLoader
logger = logging.getLogger(__name__)
_loader_cache: list[WorkflowLoader] = []
def _get_loader() -> WorkflowLoader:
"""Lazily build the module-level WorkflowLoader."""
if not _loader_cache:
_loader_cache.append(WorkflowLoader())
return _loader_cache[0]
def start_workflow(workflow_name: str, params: dict) -> dict[str, Any]:
"""See specs/start_workflow.yaml for full description."""
loader = _get_loader()
workflow = loader.get(workflow_name)
if workflow is None:
return {
"status": "error",
"error": "unknown_workflow",
"message": f"Workflow '{workflow_name}' not found",
"available_workflows": loader.names(),
}
memory = get_memory()
current = memory.stm.workflow.current
if current is not None:
return {
"status": "error",
"error": "workflow_already_active",
"message": (
f"Workflow '{current.get('name')}' is already active. "
"Call end_workflow before starting a new one."
),
"active_workflow": current.get("name"),
}
memory.stm.start_workflow(workflow_name, params or {})
memory.save()
logger.info(f"start_workflow: '{workflow_name}' with params={params}")
return {
"status": "ok",
"workflow": workflow_name,
"description": workflow.get("description", ""),
"steps": workflow.get("steps", []),
"tools": workflow.get("tools", []),
}
def end_workflow(reason: str) -> dict[str, Any]:
"""See specs/end_workflow.yaml for full description."""
memory = get_memory()
current = memory.stm.workflow.current
if current is None:
return {
"status": "error",
"error": "no_active_workflow",
"message": "No workflow is currently active.",
}
workflow_name = current.get("name")
memory.stm.end_workflow()
memory.save()
logger.info(f"end_workflow: '{workflow_name}' reason={reason!r}")
return {
"status": "ok",
"workflow": workflow_name,
"reason": reason,
}
@@ -1,3 +0,0 @@
from .loader import WorkflowLoader
__all__ = ["WorkflowLoader"]
-52
View File
@@ -1,52 +0,0 @@
"""WorkflowLoader — autodiscovers and loads workflow YAML files.
Scans the workflows/ directory for all .yaml files and exposes them
as dicts. No manual registration needed — drop a new .yaml file and
it will be picked up automatically.
"""
import logging
from pathlib import Path
import yaml
logger = logging.getLogger(__name__)
_WORKFLOWS_DIR = Path(__file__).parent
class WorkflowLoader:
"""
Loads all workflow definitions from the workflows/ directory.
Usage:
loader = WorkflowLoader()
all_workflows = loader.all()
workflow = loader.get("media.organize_media")
"""
def __init__(self):
self._workflows: dict[str, dict] = {}
self._load()
def _load(self) -> None:
for path in sorted(_WORKFLOWS_DIR.glob("*.yaml")):
try:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
name = data.get("name") or path.stem
self._workflows[name] = data
logger.info(f"WorkflowLoader: Loaded '{name}' from {path.name}")
except Exception as e:
logger.warning(f"WorkflowLoader: Could not load {path.name}: {e}")
def all(self) -> dict[str, dict]:
"""Return all loaded workflows keyed by name."""
return self._workflows
def get(self, name: str) -> dict | None:
"""Return a specific workflow by name, or None if not found."""
return self._workflows.get(name)
def names(self) -> list[str]:
"""Return all available workflow names."""
return list(self._workflows.keys())
@@ -1,69 +0,0 @@
name: media.manage_subtitles
description: >
Place subtitle files alongside a video that has just been organised into the library.
Detects the release pattern automatically, identifies and classifies all tracks,
filters by user rules, and hard-links matching files to the destination.
If any tracks are unrecognised, asks the user and optionally teaches Alfred.
trigger:
examples:
- "handle subtitles for The X-Files S01E01"
- "place the subs next to the file"
- "subtitles are in the Subs/ folder"
- "add subtitles"
tools:
- manage_subtitles
- learn
memory:
SubtitlePreferences: read
Workflow: read-write
steps:
- id: place_subtitles
tool: manage_subtitles
description: >
Detect release pattern, identify and classify all subtitle tracks,
filter by rules, hard-link matching files next to the destination video.
Reads SubtitlePreferences from LTM for language/type/format filtering.
params:
source_video: "{source_video}"
destination_video: "{destination_video}"
imdb_id: "{imdb_id}"
media_type: "{media_type}"
release_group: "{release_group}"
season: "{season}"
episode: "{episode}"
on_result:
ok_placed_zero: skip # no subtitles found — not an error
needs_clarification: ask_user # unrecognised tokens found
- id: ask_user
description: >
Some tracks could not be classified. Show the user the unresolved tokens
and ask if they want to teach Alfred what they mean.
If yes → go to learn_tokens. If no → end workflow.
ask_user:
question: >
I could not identify some tokens in the subtitle files: {unresolved}.
Do you want to teach me what they mean?
answers:
yes: { next_step: learn_tokens }
no: { next_step: end }
- id: learn_tokens
tool: learn
description: >
Persist a new token mapping to the learned knowledge pack so Alfred
recognises it in future scans without asking again.
params:
pack: "subtitles"
category: "{token_category}" # "languages" or "types"
key: "{token_key}" # e.g. "es", "de"
values: "{token_values}" # e.g. ["spanish", "espanol"]
subtitle_naming:
standard: "{lang}.{ext}"
sdh: "{lang}.sdh.{ext}"
forced: "{lang}.forced.{ext}"
@@ -1,92 +0,0 @@
name: media.organize_media
description: >
Organise a downloaded series or movie into the media library.
Triggered when the user asks to move/organize a specific title.
Always moves the video file. Optionally creates seed links in the
torrents folder so qBittorrent can keep seeding.
trigger:
examples:
- "organize Breaking Bad"
- "organise Severance season 2"
- "move Inception to my library"
- "organize Breaking Bad season 1, keep seeding"
tools:
- list_folder
- analyze_release
- probe_media
- find_media_imdb_id
- resolve_season_destination
- resolve_episode_destination
- resolve_movie_destination
- resolve_series_destination
- move_to_destination
- manage_subtitles
- create_seed_links
memory:
WorkspacePaths: read
LibraryPaths: read
Library: read-write
Workflow: read-write
Entities: read-write
steps:
- id: list_downloads
tool: list_folder
description: List the download folder to find the target files.
params:
folder_type: download
- id: analyze
tool: analyze_release
description: >
Parse the release name to detect media_type (movie / tv_season /
tv_episode / tv_complete) and extract season/episode info.
- id: identify_media
tool: find_media_imdb_id
description: Confirm canonical title and year via TMDB.
- id: resolve_destination
description: >
Call the resolver that matches media_type from analyze_release:
movie → resolve_movie_destination
tv_season → resolve_season_destination
tv_episode → resolve_episode_destination
tv_complete → resolve_series_destination
If the resolver returns needs_clarification, ask the user and
re-call with confirmed_folder.
- id: move_file
tool: move_to_destination
description: >
Move the video file/folder to the destination returned by the
resolver above.
- id: handle_subtitles
tool: manage_subtitles
description: >
Place subtitle files alongside the video in the library.
Pass the original source path and the new library destination path.
on_missing: skip
- id: ask_seeding
ask_user:
question: "Do you want to keep seeding this torrent?"
answers:
"yes": { next_step: create_seed_links }
"no": { next_step: end }
- id: create_seed_links
tool: create_seed_links
description: >
Hard-link the library video file back into torrents/<original_folder>/
and copy all remaining files from the original download folder
(subs, nfo, jpg, …) so the torrent stays complete for seeding.
naming_convention:
# Resolved by domain entities (Movie, Episode) — not hardcoded here
tv_show: "{title}/Season {season:02d}/{title}.S{season:02d}E{episode:02d}.{ext}"
movie: "{title} ({year})/{title}.{year}.{ext}"
-26
View File
@@ -1,26 +0,0 @@
"""Application-layer exceptions shared across orchestrators.
Kept in a dedicated module (rather than inside each orchestrator's
file) because the sync flows for TV shows and movies raise structurally
identical "not found in library" errors — pulling them out makes the
shared semantics explicit and avoids cross-imports between the
``tv_shows`` and ``movies`` packages.
"""
from __future__ import annotations
class ShowNotFoundInLibrary(LookupError):
"""Raised when no on-disk TV show carries the requested ``tmdb_id``.
The sync orchestrator raises this when both the library index and
the per-show release repository return ``None`` for a lookup —
there is nothing on disk to refresh TMDB facts against.
"""
class MovieNotFoundInLibrary(LookupError):
"""Raised when no on-disk movie carries the requested ``tmdb_id``.
Symmetric to :class:`ShowNotFoundInLibrary` for the movies library.
"""
-42
View File
@@ -1,42 +0,0 @@
"""Filesystem application layer — 5 atomic use cases as free functions.
Each use case:
- accepts :class:`pathlib.Path` inputs plus a :class:`DirectoryRoots` VO,
- guards inputs against escaping configured roots,
- calls the matching infra op,
- catches :class:`~alfred.infrastructure.filesystem.FilesystemError` and
returns a frozen DTO with a normalized error code.
No global state, no ``get_memory()``. Roots are injected.
"""
from .create_dir import create_dir_use_case
from .directory_roots import DirectoryRoots
from .dto import (
CreateDirResponse,
LinkFileResponse,
ListDirResponse,
MoveDirResponse,
MoveFileResponse,
)
from .link_file import link_file_use_case
from .list_dir import list_dir_use_case
from .move_dir import move_dir_use_case
from .move_file import move_file_use_case
__all__ = [
# use cases
"list_dir_use_case",
"create_dir_use_case",
"link_file_use_case",
"move_file_use_case",
"move_dir_use_case",
# VO
"DirectoryRoots",
# DTOs
"ListDirResponse",
"CreateDirResponse",
"LinkFileResponse",
"MoveFileResponse",
"MoveDirResponse",
]
-41
View File
@@ -1,41 +0,0 @@
"""Internal helpers: mapping infra exceptions → error codes.
Kept private (``_errors``) — only the 5 use cases in this package use
it. Centralizes the exception → code translation so every use case
returns consistent error payloads.
"""
from __future__ import annotations
from alfred.infrastructure.filesystem import (
CrossDevice,
DestinationExists,
FilesystemError,
FilesystemOSError,
NotADirectory,
NotAFile,
PermissionDenied,
SourceNotFound,
)
# Application-layer error codes (guard violations, not infra).
PATH_NOT_ALLOWED = "path_not_allowed"
def code_for(exc: FilesystemError) -> str:
"""Return the snake-case error code for an infra exception."""
if isinstance(exc, SourceNotFound):
return "source_not_found"
if isinstance(exc, DestinationExists):
return "destination_exists"
if isinstance(exc, NotADirectory):
return "not_a_directory"
if isinstance(exc, NotAFile):
return "not_a_file"
if isinstance(exc, PermissionDenied):
return "permission_denied"
if isinstance(exc, CrossDevice):
return "cross_device"
if isinstance(exc, FilesystemOSError):
return "filesystem_os_error"
return "filesystem_error"
@@ -1,33 +0,0 @@
"""create_dir use case — create a directory under one of the configured roots."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, create_dir
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import CreateDirResponse
def create_dir_use_case(path: Path, roots: DirectoryRoots) -> CreateDirResponse:
"""Create directory ``path`` (and any missing parents) provided it
lives under one of the configured roots.
Idempotent on the infra side: re-running on an existing directory
returns ``status="ok"``.
"""
if not roots.contains(path):
return CreateDirResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Path is outside configured roots: {path}",
)
try:
create_dir(path)
except FilesystemError as e:
return CreateDirResponse(status="error", error=code_for(e), message=str(e))
return CreateDirResponse(status="ok", path=path)
@@ -1,54 +0,0 @@
"""CreateSeedLinksUseCase — prepares a torrent folder for continued seeding."""
import logging
from alfred.infrastructure.filesystem import FileManager
from alfred.infrastructure.persistence_TO_CHECK import get_memory
from .dto import CreateSeedLinksResponse
logger = logging.getLogger(__name__)
class CreateSeedLinksUseCase:
"""
Prepares a torrent subfolder so qBittorrent can keep seeding after a move.
Hard-links the video file from the library back into torrents/<original_folder>/,
then copies all remaining files from the original download folder (subs, nfo, …).
"""
def __init__(self, file_manager: FileManager):
self.file_manager = file_manager
def execute(
self, library_file: str, original_download_folder: str
) -> CreateSeedLinksResponse:
memory = get_memory()
torrent_folder = memory.ltm.workspace.torrent
if not torrent_folder:
return CreateSeedLinksResponse(
status="error",
error="torrent_folder_not_set",
message="Torrent folder is not configured. Use set_path_for_folder to set it.",
)
result = self.file_manager.create_seed_links(
library_file, original_download_folder, torrent_folder
)
if result.get("status") == "ok":
return CreateSeedLinksResponse(
status="ok",
torrent_subfolder=result.get("torrent_subfolder"),
linked_file=result.get("linked_file"),
copied_files=result.get("copied_files"),
copied_count=result.get("copied_count", 0),
skipped=result.get("skipped"),
)
return CreateSeedLinksResponse(
status="error",
error=result.get("error"),
message=result.get("message"),
)
@@ -1,56 +0,0 @@
"""DirectoryRoots — VO carrying the configured filesystem roots.
Replaces the ad-hoc ``get_memory().ltm.workspace.<x>`` lookups that were
sprinkled across the filesystem use cases. By making roots an explicit
input, use cases become pure (no global state read) and easy to test.
The roots are read once at the tool wrapper boundary (where the agent
config lives) and threaded through the use cases.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class DirectoryRoots:
"""Configured roots of Alfred's filesystem.
All paths must be absolute and existing directories — validation is
expected at the boundary that builds this VO.
Attributes:
downloads: where qBittorrent drops finished torrents.
torrents: where seeding hard-links live (mirrors downloads/).
movies: library root for movies.
tv_shows: library root for TV shows.
"""
downloads: Path
torrents: Path
movies: Path
tv_shows: Path
def all(self) -> tuple[Path, ...]:
"""Return every configured root, in declaration order."""
return (self.downloads, self.torrents, self.movies, self.tv_shows)
def contains(self, path: Path) -> bool:
"""Return True if ``path`` is inside one of the configured roots.
Uses ``Path.resolve()`` to handle symlinks and ``..`` segments,
then ``relative_to`` for an exact within-root check.
"""
try:
resolved = path.resolve()
except OSError:
return False
for root in self.all():
try:
resolved.relative_to(root.resolve())
return True
except (ValueError, OSError):
continue
return False
-111
View File
@@ -1,111 +0,0 @@
"""DTOs for the 5 atomic filesystem use cases.
Each use case returns a small frozen dataclass tagged with a ``status``
field. On error, ``error`` (machine-readable code) and ``message``
(human-readable) are populated; on success, the relevant payload
fields are.
Error codes mirror the infrastructure exception types (lowercased,
snake-cased) — e.g. ``SourceNotFound`` → ``"source_not_found"`` — plus
the application-layer ``"path_not_allowed"`` for guard violations.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
@dataclass(frozen=True)
class ListDirResponse:
"""Response from ``list_dir_use_case``."""
status: str # "ok" | "error"
path: Path | None = None
entries: tuple[Path, ...] = ()
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"path": str(self.path) if self.path else None,
"entries": [str(p) for p in self.entries],
}
@dataclass(frozen=True)
class CreateDirResponse:
"""Response from ``create_dir_use_case``."""
status: str
path: Path | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {"status": self.status, "path": str(self.path) if self.path else None}
@dataclass(frozen=True)
class LinkFileResponse:
"""Response from ``link_file_use_case``."""
status: str
source: Path | None = None
destination: Path | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": str(self.source) if self.source else None,
"destination": str(self.destination) if self.destination else None,
}
@dataclass(frozen=True)
class MoveFileResponse:
"""Response from ``move_file_use_case``."""
status: str
source: Path | None = None
destination: Path | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": str(self.source) if self.source else None,
"destination": str(self.destination) if self.destination else None,
}
@dataclass(frozen=True)
class MoveDirResponse:
"""Response from ``move_dir_use_case``."""
status: str
source: Path | None = None
destination: Path | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": str(self.source) if self.source else None,
"destination": str(self.destination) if self.destination else None,
}
-188
View File
@@ -1,188 +0,0 @@
"""Filesystem application DTOs."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class CopyMediaResponse:
"""Response from copying a media file."""
status: str
source: str | None = None
destination: str | None = None
filename: str | None = None
size: int | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": self.source,
"destination": self.destination,
"filename": self.filename,
"size": self.size,
}
@dataclass
class MoveMediaResponse:
"""Response from moving a media file."""
status: str
source: str | None = None
destination: str | None = None
filename: str | None = None
size: int | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": self.source,
"destination": self.destination,
"filename": self.filename,
"size": self.size,
}
@dataclass
class PlacedSubtitle:
"""One subtitle file successfully placed."""
source: str
destination: str
filename: str
def to_dict(self) -> dict:
return {
"source": self.source,
"destination": self.destination,
"filename": self.filename,
}
@dataclass
class UnresolvedTrack:
"""A subtitle track that needs agent clarification before placement."""
raw_tokens: list[str]
file_path: str | None = None
file_size_kb: float | None = None
reason: str = "" # "unknown_language" | "low_confidence"
def to_dict(self) -> dict:
return {
"raw_tokens": self.raw_tokens,
"file_path": self.file_path,
"file_size_kb": self.file_size_kb,
"reason": self.reason,
}
@dataclass
class AvailableSubtitle:
"""One subtitle track available on an embedded media item."""
language: str # ISO 639-2 code
subtitle_type: str # "standard" | "sdh" | "forced" | "unknown"
def to_dict(self) -> dict:
return {"language": self.language, "type": self.subtitle_type}
@dataclass
class ManageSubtitlesResponse:
"""Response from the manage_subtitles use case."""
status: str # "ok" | "needs_clarification" | "error"
video_path: str | None = None
placed: list[PlacedSubtitle] | None = None
skipped_count: int = 0
unresolved: list[UnresolvedTrack] | None = None
available: list[AvailableSubtitle] | None = None # embedded tracks summary
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
result = {
"status": self.status,
"video_path": self.video_path,
"placed": [p.to_dict() for p in (self.placed or [])],
"placed_count": len(self.placed or []),
"skipped_count": self.skipped_count,
}
if self.unresolved:
result["unresolved"] = [u.to_dict() for u in self.unresolved]
result["unresolved_count"] = len(self.unresolved)
if self.available:
result["available"] = [a.to_dict() for a in self.available]
return result
@dataclass
class CreateSeedLinksResponse:
"""Response from creating seed links for a torrent."""
status: str
torrent_subfolder: str | None = None
linked_file: str | None = None
copied_files: list[str] | None = None
copied_count: int = 0
skipped: list[str] | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"torrent_subfolder": self.torrent_subfolder,
"linked_file": self.linked_file,
"copied_files": self.copied_files or [],
"copied_count": self.copied_count,
"skipped": self.skipped or [],
}
@dataclass
class ListFolderResponse:
"""Response from listing a folder."""
status: str
folder_type: str | None = None # SHOULD BE A PROPERTY
path: str | None = None # NOT NONE - Should be path
entries: list[str] | None = None # NOT NONE - Empty list of path
count: int | None = None # USELESS
error: str | None = None
message: str | None = None
def to_dict(self):
"""Convert to dict for agent compatibility."""
result = {"status": self.status}
if self.error:
result["error"] = self.error
result["message"] = self.message
else:
if self.folder_type:
result["folder_type"] = self.folder_type
if self.path:
result["path"] = self.path
if self.entries is not None:
result["entries"] = self.entries
if self.count is not None:
result["count"] = self.count
return result
@@ -1,40 +0,0 @@
"""link_file use case — hard-link a file from one root to another."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, link_file
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import LinkFileResponse
def link_file_use_case(
src: Path, dst: Path, roots: DirectoryRoots
) -> LinkFileResponse:
"""Hard-link ``src`` to ``dst``. Both must be under configured roots.
The destination parent must already exist — the caller is expected
to have created it via ``create_dir_use_case`` if needed.
"""
if not roots.contains(src):
return LinkFileResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Source is outside configured roots: {src}",
)
if not roots.contains(dst):
return LinkFileResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Destination is outside configured roots: {dst}",
)
try:
link_file(src, dst)
except FilesystemError as e:
return LinkFileResponse(status="error", error=code_for(e), message=str(e))
return LinkFileResponse(status="ok", source=src, destination=dst)
-34
View File
@@ -1,34 +0,0 @@
"""list_dir use case — list a directory after guarding it within roots."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, list_dir
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import ListDirResponse
def list_dir_use_case(path: Path, roots: DirectoryRoots) -> ListDirResponse:
"""List the immediate children of ``path`` if it lives under one of
the configured roots.
Returns a :class:`ListDirResponse`. On guard failure, status is
``"error"`` with ``error="path_not_allowed"``. On infra failure,
status is ``"error"`` with a code mapped from the raised exception.
"""
if not roots.contains(path):
return ListDirResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Path is outside configured roots: {path}",
)
try:
entries = list_dir(path)
except FilesystemError as e:
return ListDirResponse(status="error", error=code_for(e), message=str(e))
return ListDirResponse(status="ok", path=path, entries=tuple(entries))
@@ -1,308 +0,0 @@
"""ManageSubtitlesUseCase — orchestrates the full subtitle pipeline for a video file."""
import logging
from pathlib import Path
from alfred.application.subtitles_TO_CHECK.placer import (
PlacedTrack,
SubtitlePlacer,
_build_dest_name,
)
from alfred.domain.shared_TO_CHECK.value_objects import ImdbId
from alfred.domain.subtitles_TO_CHECK.entities import SubtitleScanResult
from alfred.domain.subtitles_TO_CHECK.services.identifier import SubtitleIdentifier
from alfred.domain.subtitles_TO_CHECK.services.matcher import SubtitleMatcher
from alfred.domain.subtitles_TO_CHECK.services.pattern_detector import PatternDetector
from alfred.domain.subtitles_TO_CHECK.services.utils import available_subtitles
from alfred.domain.subtitles_TO_CHECK.value_objects import ScanStrategy
from alfred.infrastructure.filesystem.scanner import PathlibFilesystemScanner
from alfred.infrastructure.knowledge_TO_CHECK.subtitles.base import SubtitleKnowledgeBase
from alfred.infrastructure.knowledge_TO_CHECK.subtitles.loader import KnowledgeLoader
from alfred.infrastructure.persistence_TO_CHECK.context import get_memory
from alfred.infrastructure.probe_TO_CHECK.ffprobe_prober import FfprobeMediaProber
from alfred.infrastructure.subtitle_TO_CHECK.metadata_store import SubtitleMetadataStore
from alfred.infrastructure.subtitle_TO_CHECK.rule_repository import RuleSetRepository
from .dto import (
AvailableSubtitle,
ManageSubtitlesResponse,
PlacedSubtitle,
UnresolvedTrack,
)
logger = logging.getLogger(__name__)
def _infer_library_root(dest_video: Path, media_type: str) -> Path:
"""
Infer the media library root folder from the destination video path.
TV show: video → Season 01 → The X-Files (3 levels up)
Movie: video → Inception (2010) (1 level up)
"""
if media_type == "tv_show":
return dest_video.parent.parent
return dest_video.parent
def _to_imdb_id(raw: str | None) -> ImdbId | None:
if not raw:
return None
try:
return ImdbId(raw)
except Exception:
return None
class ManageSubtitlesUseCase:
"""
Full subtitle pipeline:
1. Load knowledge base
2. Detect (or confirm) the release pattern
3. Identify all tracks (ffprobe + filesystem scan)
4. Load + resolve rules for this media
5. Match tracks against rules
6. If any tracks are unresolved → return needs_clarification (don't place yet)
7. Place matched tracks via hard-link
8. Persist to .alfred/metadata.yaml
The use case is stateless — all dependencies are instantiated inline.
"""
def execute(
self,
source_video: str,
destination_video: str,
imdb_id: str | None = None,
media_type: str = "tv_show",
release_group: str | None = None,
season: int | None = None,
episode: int | None = None,
confirmed_pattern_id: str | None = None,
dry_run: bool = False,
) -> ManageSubtitlesResponse:
source_path = Path(source_video)
dest_path = Path(destination_video)
if not source_path.exists() and not source_path.parent.exists():
return ManageSubtitlesResponse(
status="error",
error="source_not_found",
message=f"Source video not found: {source_video}",
)
kb = SubtitleKnowledgeBase(KnowledgeLoader())
prober = FfprobeMediaProber()
scanner = PathlibFilesystemScanner()
library_root = _infer_library_root(dest_path, media_type)
store = SubtitleMetadataStore(library_root)
repo = RuleSetRepository(library_root)
# --- Pattern resolution ---
pattern = self._resolve_pattern(
kb,
prober,
scanner,
store,
source_path,
confirmed_pattern_id,
release_group,
)
if pattern is None:
return ManageSubtitlesResponse(
status="error",
error="pattern_not_found",
message="Could not determine subtitle pattern for this release.",
)
# --- Identify ---
media_id = _to_imdb_id(imdb_id)
identifier = SubtitleIdentifier(kb, prober, scanner)
metadata = identifier.identify(
video_path=source_path,
pattern=pattern,
media_id=media_id,
media_type=media_type,
release_group=release_group,
)
if metadata.total_count == 0:
logger.info(
f"ManageSubtitles: no subtitle tracks found for {source_path.name}"
)
return ManageSubtitlesResponse(
status="ok",
video_path=destination_video,
placed=[],
skipped_count=0,
)
# --- Embedded short-circuit ---
if pattern.scan_strategy == ScanStrategy.EMBEDDED:
logger.info("ManageSubtitles: embedded pattern — skipping matcher")
available = [
AvailableSubtitle(
language=t.language.code if t.language else "?",
subtitle_type=t.subtitle_type.value,
)
for t in available_subtitles(metadata.embedded_tracks)
]
return ManageSubtitlesResponse(
status="ok",
video_path=destination_video,
placed=[],
skipped_count=0,
available=available,
)
# --- Match (external only) ---
subtitle_prefs = None
try:
memory = get_memory()
subtitle_prefs = memory.ltm.subtitle_preferences
except Exception:
pass
rules = repo.load(release_group, subtitle_prefs).resolve(kb.default_rules())
matcher = SubtitleMatcher()
matched, unresolved = matcher.match(metadata.external_tracks, rules)
if unresolved:
logger.info(
f"ManageSubtitles: {len(unresolved)} unresolved track(s) — needs clarification"
)
return ManageSubtitlesResponse(
status="needs_clarification",
video_path=destination_video,
placed=[],
unresolved=[_to_unresolved_dto(t) for t in unresolved],
)
if not matched:
return ManageSubtitlesResponse(
status="ok",
video_path=destination_video,
placed=[],
skipped_count=metadata.total_count,
)
# --- Dry run: skip placement ---
if dry_run:
placed_dtos = []
for t in matched:
if not t.file_path:
continue
try:
filename = _build_dest_name(t, dest_path.stem)
except ValueError:
continue
placed_dtos.append(
PlacedSubtitle(
source=str(t.file_path),
destination=str(dest_path.parent / filename),
filename=filename,
)
)
return ManageSubtitlesResponse(
status="ok",
video_path=destination_video,
placed=placed_dtos,
skipped_count=0,
)
# --- Place ---
placer = SubtitlePlacer()
place_result = placer.place(matched, dest_path)
# --- Persist ---
if place_result.placed:
pairs = _pair_placed_with_tracks(place_result.placed, matched)
store.append_history(pairs, season, episode, release_group)
placed_dtos = [
PlacedSubtitle(
source=str(p.source),
destination=str(p.destination),
filename=p.filename,
)
for p in place_result.placed
]
return ManageSubtitlesResponse(
status="ok",
video_path=destination_video,
placed=placed_dtos,
skipped_count=place_result.skipped_count,
)
def _resolve_pattern(
self,
kb: SubtitleKnowledgeBase,
prober: FfprobeMediaProber,
scanner: PathlibFilesystemScanner,
store: SubtitleMetadataStore,
source_path: Path,
confirmed_pattern_id: str | None,
release_group: str | None,
):
# 1. Explicit override from caller
if confirmed_pattern_id:
p = kb.pattern(confirmed_pattern_id)
if p:
return p
logger.warning(f"ManageSubtitles: unknown pattern '{confirmed_pattern_id}'")
# 2. Previously confirmed in metadata store
stored_id = store.confirmed_pattern()
if stored_id:
p = kb.pattern(stored_id)
if p:
logger.debug(f"ManageSubtitles: using confirmed pattern '{stored_id}'")
return p
# 3. Auto-detect
release_root = source_path.parent
detector = PatternDetector(kb, prober, scanner)
result = detector.detect(release_root, source_path)
if result["detected"] and result["confidence"] >= 0.6:
logger.info(
f"ManageSubtitles: auto-detected pattern '{result['detected'].id}' "
f"(confidence={result['confidence']:.2f})"
)
return result["detected"]
# 4. Fallback — adjacent (safest default)
logger.info("ManageSubtitles: falling back to 'adjacent' pattern")
return kb.pattern("adjacent")
def _to_unresolved_dto(
track: SubtitleScanResult, min_confidence: float = 0.7
) -> UnresolvedTrack:
reason = "unknown_language" if track.language is None else "low_confidence"
return UnresolvedTrack(
raw_tokens=track.raw_tokens,
file_path=str(track.file_path) if track.file_path else None,
file_size_kb=track.file_size_kb,
reason=reason,
)
def _pair_placed_with_tracks(
placed: list[PlacedTrack],
tracks: list[SubtitleScanResult],
) -> list[tuple[PlacedTrack, SubtitleScanResult]]:
"""
Pair each PlacedTrack with its originating SubtitleScanResult by source path.
Falls back to positional matching if paths don't align.
"""
track_by_path = {t.file_path: t for t in tracks if t.file_path}
pairs = []
for p in placed:
track = track_by_path.get(p.source)
if track is None and tracks:
track = tracks[0] # positional fallback
if track:
pairs.append((p, track))
return pairs
-36
View File
@@ -1,36 +0,0 @@
"""move_dir use case — move a directory tree between configured roots."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, move_dir
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import MoveDirResponse
def move_dir_use_case(
src: Path, dst: Path, roots: DirectoryRoots
) -> MoveDirResponse:
"""Move directory ``src`` to ``dst``. Both must be under configured roots."""
if not roots.contains(src):
return MoveDirResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Source is outside configured roots: {src}",
)
if not roots.contains(dst):
return MoveDirResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Destination is outside configured roots: {dst}",
)
try:
move_dir(src, dst)
except FilesystemError as e:
return MoveDirResponse(status="error", error=code_for(e), message=str(e))
return MoveDirResponse(status="ok", source=src, destination=dst)
@@ -1,36 +0,0 @@
"""move_file use case — move a file between configured roots."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, move_file
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import MoveFileResponse
def move_file_use_case(
src: Path, dst: Path, roots: DirectoryRoots
) -> MoveFileResponse:
"""Move file ``src`` to ``dst``. Both must be under configured roots."""
if not roots.contains(src):
return MoveFileResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Source is outside configured roots: {src}",
)
if not roots.contains(dst):
return MoveFileResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Destination is outside configured roots: {dst}",
)
try:
move_file(src, dst)
except FilesystemError as e:
return MoveFileResponse(status="error", error=code_for(e), message=str(e))
return MoveFileResponse(status="ok", source=src, destination=dst)
@@ -1,43 +0,0 @@
"""Move media use case."""
import logging
from alfred.infrastructure.filesystem import FileManager
from .dto import MoveMediaResponse
logger = logging.getLogger(__name__)
class MoveMediaUseCase:
"""Use case for moving a media file to a destination (copy + delete source)."""
def __init__(self, file_manager: FileManager):
self.file_manager = file_manager
def execute(self, source: str, destination: str) -> MoveMediaResponse:
"""
Move a media file from source to destination.
Args:
source: Absolute path to the source file.
destination: Absolute path to the destination file.
Returns:
MoveMediaResponse with success or error information.
"""
result = self.file_manager.move_file(source, destination)
if result.get("status") == "ok":
return MoveMediaResponse(
status="ok",
source=result.get("source"),
destination=result.get("destination"),
filename=result.get("filename"),
size=result.get("size"),
)
return MoveMediaResponse(
status="error",
error=result.get("error"),
message=result.get("message"),
)
@@ -1,464 +0,0 @@
"""
Destination resolution — compute library paths for releases.
Four distinct use cases, one per release type:
- resolve_season_destination : season pack (folder move)
- resolve_episode_destination : single episode (file move)
- resolve_movie_destination : movie (file move)
- resolve_series_destination : complete series multi-season pack (folder move)
Each returns a dedicated DTO with only the fields that make sense for that type.
These use cases follow Option B of the snapshot-VO design: ``ParsedRelease``
arrives with ``title_sanitized`` already computed, and TMDB-supplied strings
are sanitized **at the use-case boundary** (here) before being passed into
``ParsedRelease`` builder methods. The builders themselves perform no I/O and
no sanitization.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from alfred.application.release_TO_CHECK import inspect_release
from alfred.domain.release import parse_release
from alfred.domain.releases_TO_CHECK.ports import ReleaseKnowledge
from alfred.domain.release.value_objects import ParsedRelease
from alfred.domain.shared_TO_CHECK.ports import MediaProber
from alfred.infrastructure.persistence_TO_CHECK import get_memory
logger = logging.getLogger(__name__)
def _resolve_parsed(
release_name: str,
source_path: str | None,
kb: ReleaseKnowledge,
prober: MediaProber,
) -> ParsedRelease:
"""Pick the right entry point depending on whether we have a path.
When ``source_path`` is provided and points to something that exists,
we run the full inspection pipeline so probe data can refresh tech
fields (which feed every filename builder). Otherwise we fall back
to a parse-only path — same behavior as before.
"""
if source_path:
path = Path(source_path)
if path.exists():
return inspect_release(release_name, path, kb, prober).parsed
parsed, _ = parse_release(release_name, kb)
return parsed
def _find_existing_tvshow_folders(
tv_root: Path, tmdb_title_safe: str, tmdb_year: int
) -> list[str]:
"""Return folder names in tv_root that match title + year prefix."""
if not tv_root.exists():
return []
clean_title = tmdb_title_safe.replace(" ", ".")
prefix = f"{clean_title}.{tmdb_year}".lower()
return sorted(
entry.name
for entry in tv_root.iterdir()
if entry.is_dir() and entry.name.lower().startswith(prefix)
)
def _get_tv_root() -> Path | None:
memory = get_memory()
tv_root = memory.ltm.library_paths.get("tv_show")
return Path(tv_root) if tv_root else None
# ---------------------------------------------------------------------------
# Internal sentinel + series-folder resolver (shared by the 3 TV use cases)
# ---------------------------------------------------------------------------
@dataclass
class _Clarification:
"""Module-private sentinel signalling that user input is needed."""
question: str
options: list[str]
def _resolve_series_folder(
tv_root: Path,
tmdb_title: str,
tmdb_title_safe: str,
tmdb_year: int,
computed_name: str,
confirmed_folder: str | None,
) -> tuple[str, bool] | _Clarification:
"""
Resolve which series folder to use.
Returns:
(folder_name, is_new) if resolved unambiguously,
_Clarification(question, options) if the caller must ask the user.
"""
if confirmed_folder:
return confirmed_folder, not (tv_root / confirmed_folder).exists()
existing = _find_existing_tvshow_folders(tv_root, tmdb_title_safe, tmdb_year)
if not existing:
return computed_name, True
if len(existing) == 1 and existing[0] == computed_name:
return existing[0], False
options = existing + ([computed_name] if computed_name not in existing else [])
return _Clarification(
question=(
f"Un dossier série existe déjà pour '{tmdb_title}' "
f"mais son nom diffère du nom calculé ({computed_name}). "
f"Lequel utiliser ?"
),
options=options,
)
# ---------------------------------------------------------------------------
# DTOs
# ---------------------------------------------------------------------------
@dataclass
class _ResolvedDestinationBase:
"""
Shared shape across all resolution DTOs.
Holds the status flag and the fields used in non-ok states
(error / needs_clarification). Subclasses add their own ok-state fields
and a to_dict() that delegates the non-ok cases via _base_dict().
"""
status: str # "ok" | "needs_clarification" | "error"
# needs_clarification
question: str | None = None
options: list[str] | None = None
# error
error: str | None = None
message: str | None = None
def _base_dict(self) -> dict | None:
"""Return the dict for error/needs_clarification, or None for ok."""
if self.status == "error":
return {"status": self.status, "error": self.error, "message": self.message}
if self.status == "needs_clarification":
return {
"status": self.status,
"question": self.question,
"options": self.options or [],
}
return None
@dataclass
class ResolvedSeasonDestination(_ResolvedDestinationBase):
"""Paths for a season pack — folder move, no individual file paths."""
series_folder: str | None = None
season_folder: str | None = None
series_folder_name: str | None = None
season_folder_name: str | None = None
is_new_series_folder: bool = False
def to_dict(self) -> dict:
return self._base_dict() or {
"status": self.status,
"series_folder": self.series_folder,
"season_folder": self.season_folder,
"series_folder_name": self.series_folder_name,
"season_folder_name": self.season_folder_name,
"is_new_series_folder": self.is_new_series_folder,
}
@dataclass
class ResolvedEpisodeDestination(_ResolvedDestinationBase):
"""Paths for a single episode — file move."""
series_folder: str | None = None
season_folder: str | None = None
library_file: str | None = None # full path to destination .mkv
series_folder_name: str | None = None
season_folder_name: str | None = None
filename: str | None = None
is_new_series_folder: bool = False
def to_dict(self) -> dict:
return self._base_dict() or {
"status": self.status,
"series_folder": self.series_folder,
"season_folder": self.season_folder,
"library_file": self.library_file,
"series_folder_name": self.series_folder_name,
"season_folder_name": self.season_folder_name,
"filename": self.filename,
"is_new_series_folder": self.is_new_series_folder,
}
@dataclass
class ResolvedMovieDestination(_ResolvedDestinationBase):
"""Paths for a movie — file move."""
movie_folder: str | None = None
library_file: str | None = None
movie_folder_name: str | None = None
filename: str | None = None
is_new_folder: bool = False
def to_dict(self) -> dict:
return self._base_dict() or {
"status": self.status,
"movie_folder": self.movie_folder,
"library_file": self.library_file,
"movie_folder_name": self.movie_folder_name,
"filename": self.filename,
"is_new_folder": self.is_new_folder,
}
@dataclass
class ResolvedSeriesDestination(_ResolvedDestinationBase):
"""Paths for a complete multi-season series pack — folder move."""
series_folder: str | None = None
series_folder_name: str | None = None
is_new_series_folder: bool = False
def to_dict(self) -> dict:
return self._base_dict() or {
"status": self.status,
"series_folder": self.series_folder,
"series_folder_name": self.series_folder_name,
"is_new_series_folder": self.is_new_series_folder,
}
# ---------------------------------------------------------------------------
# Use cases
# ---------------------------------------------------------------------------
def resolve_season_destination(
release_name: str,
tmdb_title: str,
tmdb_year: int,
kb: ReleaseKnowledge,
prober: MediaProber,
confirmed_folder: str | None = None,
source_path: str | None = None,
) -> ResolvedSeasonDestination:
"""
Compute destination paths for a season pack.
Returns series_folder + season_folder. No file paths — the whole
source folder is moved as-is into season_folder.
When ``source_path`` points to the release on disk, the parser is
augmented with ffprobe data so tech tokens missing from the release
name (quality / codec) end up in the folder names.
"""
tv_root = _get_tv_root()
if not tv_root:
return ResolvedSeasonDestination(
status="error",
error="library_not_set",
message="TV show library path is not configured.",
)
parsed = _resolve_parsed(release_name, source_path, kb, prober)
tmdb_title_safe = kb.sanitize_for_fs(tmdb_title)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
)
if isinstance(resolved, _Clarification):
return ResolvedSeasonDestination(
status="needs_clarification",
question=resolved.question,
options=resolved.options,
)
series_folder_name, is_new = resolved
season_folder_name = parsed.season_folder_name()
series_path = tv_root / series_folder_name
season_path = series_path / season_folder_name
return ResolvedSeasonDestination(
status="ok",
series_folder=str(series_path),
season_folder=str(season_path),
series_folder_name=series_folder_name,
season_folder_name=season_folder_name,
is_new_series_folder=is_new,
)
def resolve_episode_destination(
release_name: str,
source_file: str,
tmdb_title: str,
tmdb_year: int,
kb: ReleaseKnowledge,
prober: MediaProber,
tmdb_episode_title: str | None = None,
confirmed_folder: str | None = None,
) -> ResolvedEpisodeDestination:
"""
Compute destination paths for a single episode file.
Returns series_folder + season_folder + library_file (full path to .mkv).
``source_file`` doubles as the inspection target — when it exists,
ffprobe enrichment refreshes tech tokens missing from the release name.
"""
tv_root = _get_tv_root()
if not tv_root:
return ResolvedEpisodeDestination(
status="error",
error="library_not_set",
message="TV show library path is not configured.",
)
parsed = _resolve_parsed(release_name, source_file, kb, prober)
ext = Path(source_file).suffix
tmdb_title_safe = kb.sanitize_for_fs(tmdb_title)
tmdb_episode_title_safe = (
kb.sanitize_for_fs(tmdb_episode_title) if tmdb_episode_title else None
)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
)
if isinstance(resolved, _Clarification):
return ResolvedEpisodeDestination(
status="needs_clarification",
question=resolved.question,
options=resolved.options,
)
series_folder_name, is_new = resolved
season_folder_name = parsed.season_folder_name()
filename = parsed.episode_filename(tmdb_episode_title_safe, ext)
series_path = tv_root / series_folder_name
season_path = series_path / season_folder_name
file_path = season_path / filename
return ResolvedEpisodeDestination(
status="ok",
series_folder=str(series_path),
season_folder=str(season_path),
library_file=str(file_path),
series_folder_name=series_folder_name,
season_folder_name=season_folder_name,
filename=filename,
is_new_series_folder=is_new,
)
def resolve_movie_destination(
release_name: str,
source_file: str,
tmdb_title: str,
tmdb_year: int,
kb: ReleaseKnowledge,
prober: MediaProber,
) -> ResolvedMovieDestination:
"""
Compute destination paths for a movie file.
Returns movie_folder + library_file (full path to .mkv).
``source_file`` doubles as the inspection target — when it exists,
ffprobe enrichment refreshes tech tokens missing from the release name.
"""
memory = get_memory()
movies_root = memory.ltm.library_paths.get("movie")
if not movies_root:
return ResolvedMovieDestination(
status="error",
error="library_not_set",
message="Movie library path is not configured.",
)
parsed = _resolve_parsed(release_name, source_file, kb, prober)
ext = Path(source_file).suffix
tmdb_title_safe = kb.sanitize_for_fs(tmdb_title)
folder_name = parsed.movie_folder_name(tmdb_title_safe, tmdb_year)
filename = parsed.movie_filename(tmdb_title_safe, tmdb_year, ext)
folder_path = Path(movies_root) / folder_name
file_path = folder_path / filename
return ResolvedMovieDestination(
status="ok",
movie_folder=str(folder_path),
library_file=str(file_path),
movie_folder_name=folder_name,
filename=filename,
is_new_folder=not folder_path.exists(),
)
def resolve_series_destination(
release_name: str,
tmdb_title: str,
tmdb_year: int,
kb: ReleaseKnowledge,
prober: MediaProber,
confirmed_folder: str | None = None,
source_path: str | None = None,
) -> ResolvedSeriesDestination:
"""
Compute destination path for a complete multi-season series pack.
Returns only series_folder — the whole pack lands directly inside it.
When ``source_path`` points to the release on disk, ffprobe
enrichment refreshes tech tokens missing from the release name.
"""
tv_root = _get_tv_root()
if not tv_root:
return ResolvedSeriesDestination(
status="error",
error="library_not_set",
message="TV show library path is not configured.",
)
parsed = _resolve_parsed(release_name, source_path, kb, prober)
tmdb_title_safe = kb.sanitize_for_fs(tmdb_title)
computed_name = parsed.show_folder_name(tmdb_title_safe, tmdb_year)
resolved = _resolve_series_folder(
tv_root, tmdb_title, tmdb_title_safe, tmdb_year, computed_name, confirmed_folder
)
if isinstance(resolved, _Clarification):
return ResolvedSeriesDestination(
status="needs_clarification",
question=resolved.question,
options=resolved.options,
)
series_folder_name, is_new = resolved
series_path = tv_root / series_folder_name
return ResolvedSeriesDestination(
status="ok",
series_folder=str(series_path),
series_folder_name=series_folder_name,
is_new_series_folder=is_new,
)
-40
View File
@@ -1,40 +0,0 @@
"""Movie application DTOs."""
from dataclasses import dataclass, field
@dataclass(frozen=True)
class MovieHit:
"""One movie hit, flattened for transport to the agent."""
tmdb_id: int
title: str
release_year: int | None = None
def to_dict(self) -> dict:
out: dict = {"tmdb_id": self.tmdb_id, "title": self.title}
if self.release_year is not None:
out["release_year"] = self.release_year
return out
@dataclass
class SearchMovieResponse:
"""Response from searching for a movie."""
status: str
hits: list[MovieHit] = field(default_factory=list)
error: str | None = None
message: str | None = None
def to_dict(self):
"""Convert to dict for agent compatibility."""
result: dict = {"status": self.status}
if self.error:
result["error"] = self.error
result["message"] = self.message
else:
result["hits"] = [h.to_dict() for h in self.hits]
return result
@@ -1,60 +0,0 @@
"""Search movie use case."""
import logging
from alfred.infrastructure.api_TO_CHECK.tmdb import (
TMDBAPIError,
TMDBClient,
TMDBConfigurationError,
)
from .dto import MovieHit, SearchMovieResponse
logger = logging.getLogger(__name__)
class SearchMovieUseCase:
"""List movies matching a free-text query via TMDB ``/search/movie``.
The use case is a thin orchestrator: it asks the client for hits,
flattens domain VOs into agent-friendly primitives, and wraps
errors. It deliberately does **not** look up ``imdb_id`` —
enrichment is the caller's job (via :meth:`TMDBClient.get_movie_info`
on a chosen ``tmdb_id``).
"""
def __init__(self, tmdb_client: TMDBClient):
self.tmdb_client = tmdb_client
def execute(self, media_title: str) -> SearchMovieResponse:
try:
results = self.tmdb_client.search_movies(media_title)
hits = [
MovieHit(
tmdb_id=r.tmdb_id.value,
title=str(r.title),
release_year=r.release_year.value if r.release_year else None,
)
for r in results
]
logger.info(f"search_movies({media_title!r}) → {len(hits)} hits")
return SearchMovieResponse(status="ok", hits=hits)
except TMDBConfigurationError as e:
logger.error(f"TMDB configuration error: {e}")
return SearchMovieResponse(
status="error", error="configuration_error", message=str(e)
)
except TMDBAPIError as e:
logger.error(f"TMDB API error: {e}")
return SearchMovieResponse(
status="error", error="api_error", message=str(e)
)
except ValueError as e:
logger.error(f"Validation error: {e}")
return SearchMovieResponse(
status="error", error="validation_failed", message=str(e)
)
@@ -1,20 +0,0 @@
"""Release application layer — orchestrators sitting between domain
parsing and infrastructure I/O.
Public surface:
- :func:`is_supported_video` / :func:`find_main_video` — pre-pipeline
filesystem helpers (extension-only filtering, top-level video pick).
- :func:`inspect_release` / :class:`InspectedResult` — full inspection
pipeline combining parse + filesystem refinement + probe enrichment.
"""
from .inspect import InspectedResult, inspect_release
from .supported_media import find_main_video, is_supported_video
__all__ = [
"InspectedResult",
"find_main_video",
"inspect_release",
"is_supported_video",
]
@@ -1,67 +0,0 @@
"""
detect_media_type — filesystem-based media type refinement.
Enriches a ParsedRelease.media_type with evidence from the actual source path
(file or folder). Called after parse_release() to produce a final classification.
Classification logic:
1. If source_path is a file — check its extension directly.
2. If source_path is a folder — collect all extensions inside (non-recursive
for the first level, then recursive if nothing conclusive found).
3. Decision:
- Any non_video extension AND no video extension → "other"
- Any video extension → keep parsed media_type ("movie" | "tv_show" | "unknown")
- No conclusive extension found → keep parsed media_type as-is
- Mixed (video + non_video) → "unknown"
"""
from __future__ import annotations
from pathlib import Path
from alfred.domain.releases_TO_CHECK.ports import ReleaseKnowledge
from alfred.domain.release.value_objects import ParsedRelease
def detect_media_type(
parsed: ParsedRelease, source_path: Path, kb: ReleaseKnowledge
) -> str:
"""
Return a refined media_type string for the given source_path.
Does not mutate parsed — returns the new media_type value only.
The caller is responsible for updating the ParsedRelease if needed.
"""
extensions = _collect_extensions(source_path)
# Metadata extensions (.nfo, .srt, …) are always present alongside releases
# and must not influence the type decision.
conclusive = extensions - kb.metadata_extensions
has_video = bool(conclusive & kb.video_extensions)
has_non_video = bool(conclusive & kb.non_video_extensions)
if has_video and has_non_video:
return "unknown"
if has_non_video and not has_video:
return "other"
if has_video:
return parsed.media_type # trust token-level inference
# No conclusive extension — trust token-level inference
return parsed.media_type
def _collect_extensions(path: Path) -> set[str]:
"""Return the set of lowercase extensions found at path (file or folder)."""
if not path.exists():
return set()
if path.is_file():
return {path.suffix.lower()}
# Folder — scan first level only
exts: set[str] = set()
for child in path.iterdir():
if child.is_file():
exts.add(child.suffix.lower())
return exts
@@ -1,74 +0,0 @@
"""enrich_from_probe — fill missing ParsedRelease fields from MediaInfo."""
from __future__ import annotations
from dataclasses import replace
from alfred.domain.releases_TO_CHECK.ports import ReleaseKnowledge
from alfred.domain.release.value_objects import ParsedRelease
from alfred.domain.shared_TO_CHECK.media import MediaInfo
def enrich_from_probe(
parsed: ParsedRelease, info: MediaInfo, kb: ReleaseKnowledge
) -> ParsedRelease:
"""
Return a new ParsedRelease with None fields filled from ffprobe MediaInfo.
Only overwrites fields that are currently None — token-level values
from the release name always take priority. ``ParsedRelease`` is
frozen; this returns a new instance via :func:`dataclasses.replace`.
Translation tables (ffprobe codec name → scene token, channel count
→ layout) live in ``kb.probe_mappings`` (loaded from
``alfred/knowledge/release/probe_mappings.yaml``). When ffprobe
reports a value with no mapping entry, the fallback is the uppercase
raw value so unknown codecs still surface in a predictable form.
"""
mappings = kb.probe_mappings
video_codec_map: dict[str, str] = mappings.get("video_codec", {})
audio_codec_map: dict[str, str] = mappings.get("audio_codec", {})
channel_map: dict[int, str] = mappings.get("audio_channels", {})
updates: dict[str, object] = {}
if parsed.quality is None and info.resolution:
updates["quality"] = info.resolution
if parsed.codec is None and info.video_codec:
updates["codec"] = video_codec_map.get(
info.video_codec.lower(), info.video_codec.upper()
)
# bit_depth: ffprobe exposes it via pix_fmt — not in MediaInfo yet, skip.
# Audio — use the default track, fallback to first
default_track = next((t for t in info.audio_tracks if t.is_default), None)
track = default_track or (info.audio_tracks[0] if info.audio_tracks else None)
if track:
if parsed.audio_codec is None and track.codec:
updates["audio_codec"] = audio_codec_map.get(
track.codec.lower(), track.codec.upper()
)
if parsed.audio_channels is None and track.channels:
updates["audio_channels"] = channel_map.get(
track.channels, f"{track.channels}ch"
)
# Languages — merge ffprobe languages with token-level ones
# "und" = undetermined, not useful
if info.audio_languages:
existing_upper = {lang.upper() for lang in parsed.languages}
new_languages = list(parsed.languages)
for lang in info.audio_languages:
if lang.lower() != "und" and lang.upper() not in existing_upper:
new_languages.append(lang)
existing_upper.add(lang.upper())
if len(new_languages) != len(parsed.languages):
updates["languages"] = tuple(new_languages)
if not updates:
return parsed
return replace(parsed, **updates)
@@ -1,192 +0,0 @@
"""Release inspection orchestrator — the canonical "look at this thing"
entry point.
``inspect_release`` is the single composition of the four layers we
care about for a freshly-arrived release:
1. **Parse the name** — :func:`alfred.domain.release.services.parse_release`
gives a ``ParsedRelease`` plus a ``ParseReport`` (confidence + road).
2. **Pick the main video** — :func:`find_main_video` runs a top-level
scan over the source path. If nothing qualifies the result still
completes; downstream callers decide what to do with a videoless
release.
3. **Refine the media type** — :func:`detect_media_type` uses the
on-disk extension mix to override any token-level guess (e.g. a
bare ``.iso`` folder becomes ``"other"``). The refined value is
patched onto ``parsed`` in place — same convention as
``analyze_release`` had before.
4. **Probe the video** — the injected :class:`MediaProber` fills in
missing technical fields via :func:`enrich_from_probe`. Skipped
when there is no main video or when ``media_type`` ended up in
``{"unknown", "other"}`` (the probe would tell us nothing useful).
The return type is :class:`InspectedResult`, a frozen VO that bundles
everything downstream callers need (``analyze_release`` tool,
``resolve_destination``, future workflow stages) without forcing them
to redo the same four calls.
Design notes:
- **Application layer.** This module touches both domain
(``parse_release``) and infrastructure (``MediaProber`` port). That
is exactly application's job — orchestrate.
- **Knowledge base is injected.** ``inspect_release`` takes ``kb`` and
``prober`` as parameters; no module-level singletons here. Callers
(the tool wrapper, tests) decide what to plug in.
- **Mutation is contained.** We still mutate ``parsed.media_type`` and
let ``enrich_from_probe`` fill its ``None`` fields, because
``ParsedRelease`` is intentionally a mutable dataclass. The outer
``InspectedResult`` is frozen so the *bundle* is immutable from the
caller's perspective.
- **Never raises.** Filesystem / probe errors surface as ``None``
fields on the result, never as exceptions — same contract as the
underlying adapters.
"""
from __future__ import annotations
from dataclasses import dataclass, replace
from pathlib import Path
from alfred.application.release_TO_CHECK.detect_media_type import detect_media_type
from alfred.application.release_TO_CHECK.enrich_from_probe import enrich_from_probe
from alfred.application.release_TO_CHECK.supported_media import find_main_video
from alfred.domain.releases_TO_CHECK.ports import ReleaseKnowledge
from alfred.domain.releases_TO_CHECK.parser.services import parse_release
from alfred.domain.release.value_objects import (
MediaTypeToken,
ParsedRelease,
ParseReport,
)
from alfred.domain.shared_TO_CHECK.media import MediaInfo
from alfred.domain.shared_TO_CHECK.ports import MediaProber
# Media types for which a probe carries no useful information.
_NON_PROBABLE_MEDIA_TYPES = frozenset({"unknown", "other"})
# Media types for which there's nothing for the organizer to do.
# ``other`` covers things like games / ISOs / archives sitting on the
# downloads folder. ``unknown`` does NOT belong here — those need a
# user decision, not a skip.
_SKIPPABLE_MEDIA_TYPES = frozenset({"other"})
# Roads that signal the parser couldn't reach a confident answer on its
# own. ``Road`` values are kept as strings on the report to avoid a
# cross-package import here.
_ASK_USER_ROADS = frozenset({"path_of_pain"})
@dataclass(frozen=True)
class InspectedResult:
"""The full picture of a release: parsed name + filesystem reality.
Bundles everything the downstream pipeline needs after a single
inspection pass:
- ``parsed`` — :class:`ParsedRelease`, with ``media_type`` already
refined by :func:`detect_media_type` and ``None`` tech fields
filled in by :func:`enrich_from_probe` when a probe ran.
- ``report`` — :class:`ParseReport` from the parser (confidence +
road, untouched by inspection).
- ``source_path`` — the path the inspector was pointed at (file or
folder), as supplied by the caller.
- ``main_video`` — the canonical video file inside ``source_path``,
or ``None`` if no eligible file was found.
- ``media_info`` — the :class:`MediaInfo` snapshot when a probe
succeeded; ``None`` when no video was probed (no main video, or
``media_type`` in ``{"unknown", "other"}``) or when ffprobe
failed.
- ``probe_used`` — ``True`` iff ``media_info`` is non-``None`` and
``enrich_from_probe`` actually ran. Explicit flag so callers
don't have to re-derive the condition.
- ``recommended_action`` — derived hint for the orchestrator (see
property docstring). Encodes the exclusion / clarification /
go-ahead decision in one place so downstream callers don't
re-implement the same checks.
"""
parsed: ParsedRelease
report: ParseReport
source_path: Path
main_video: Path | None
media_info: MediaInfo | None
probe_used: bool
@property
def recommended_action(self) -> str:
"""Return one of ``"skip"`` / ``"ask_user"`` / ``"process"``.
- ``"skip"`` — nothing to organize:
* the source has no main video file, **or**
* ``media_type`` is ``"other"`` (games / ISOs / archives).
- ``"ask_user"`` — a decision is required before any action:
* ``media_type`` is ``"unknown"`` (parser couldn't classify), **or**
* the parse landed on ``Road.PATH_OF_PAIN``
(low-confidence, malformed name, etc.).
- ``"process"`` — everything else: a confident parse with a
usable media type and a main video on disk. The orchestrator
can move straight to the planning step.
The check ordering matters: ``"skip"`` wins over ``"ask_user"``
because if there's no video to organize, no question to the
user can change that. ``"ask_user"`` then wins over
``"process"`` because a confident parse alone isn't enough if
the type or road still flag uncertainty.
"""
if self.main_video is None:
return "skip"
if self.parsed.media_type.value in _SKIPPABLE_MEDIA_TYPES:
return "skip"
if self.parsed.media_type.value == "unknown":
return "ask_user"
if self.report.road in _ASK_USER_ROADS:
return "ask_user"
return "process"
def inspect_release(
release_name: str,
source_path: Path,
kb: ReleaseKnowledge,
prober: MediaProber,
) -> InspectedResult:
"""Run the full inspection pipeline on ``release_name`` /
``source_path``.
See module docstring for the four-step flow. ``kb`` and ``prober``
are injected so the caller controls the knowledge base layering
and the probe adapter (real ffprobe in production, stubs in tests).
Never raises. A missing or unreadable ``source_path`` simply
results in ``main_video=None`` and ``media_info=None``.
"""
parsed, report = parse_release(release_name, kb)
# Step 2: refine media_type from the on-disk extension mix.
# detect_media_type tolerates non-existent paths (returns parsed.media_type
# untouched), so no need to guard here. ParsedRelease is frozen — use
# dataclasses.replace to rebind with the refined value.
refined_media_type = MediaTypeToken(detect_media_type(parsed, source_path, kb))
if refined_media_type != parsed.media_type:
parsed = replace(parsed, media_type=refined_media_type)
# Step 3: pick the canonical main video (top-level scan only).
main_video = find_main_video(source_path, kb)
# Step 4: probe + enrich, when it makes sense.
media_info: MediaInfo | None = None
probe_used = False
if main_video is not None and parsed.media_type not in _NON_PROBABLE_MEDIA_TYPES:
media_info = prober.probe(main_video)
if media_info is not None:
parsed = enrich_from_probe(parsed, media_info, kb)
probe_used = True
return InspectedResult(
parsed=parsed,
report=report,
source_path=source_path,
main_video=main_video,
media_info=media_info,
probe_used=probe_used,
)
@@ -1,74 +0,0 @@
"""Pre-pipeline exclusion — decide which files are worth parsing.
These helpers live one notch above the domain: they touch the
filesystem (``Path.iterdir``, ``Path.suffix``) but carry no parsing
logic of their own. The goal is to filter out non-video files and pick
the canonical "main video" from a release folder *before* anything
hits :func:`~alfred.domain.release.parse_release`.
Design notes (Phase A bis, 2026-05-20):
- **Extension is the sole eligibility criterion.** A file is supported
iff its suffix is in ``kb.video_extensions``. No size threshold, no
filename heuristics ("sample", "trailer", …). If a release packs a
bloated featurette or names its sample alphabetically before the
main feature, that's PATH_OF_PAIN territory — not this layer's job.
- **Top-level scan only.** ``find_main_video`` does not descend into
subdirectories. Releases that wrap the main video in ``Sample/`` or
similar are non-scene-standard and handled by the orchestrator
upstream.
- **Lexicographic tie-break.** When several candidates qualify
(legitimate for season packs), we return the first by alphabetical
order. Deterministic, no size-based ranking.
- **Direct ``Path`` I/O.** No ``FilesystemScanner`` port — this layer
is application, not domain. If isolation becomes necessary for
testing scale, we'll introduce a port then.
"""
from __future__ import annotations
from pathlib import Path
from alfred.domain.releases_TO_CHECK.ports.knowledge import ReleaseKnowledge
def is_supported_video(path: Path, kb: ReleaseKnowledge) -> bool:
"""Return True when ``path`` is a video file the parser should
consider.
The check is purely extension-based: ``path.suffix.lower()`` must
belong to ``kb.video_extensions``. ``path`` must also be a regular
file — directories and broken symlinks return False.
"""
if not path.is_file():
return False
return path.suffix.lower() in kb.video_extensions
def find_main_video(folder: Path, kb: ReleaseKnowledge) -> Path | None:
"""Return the canonical main video file inside ``folder``, or
``None`` if there isn't one.
Behavior:
- Top-level scan only — subdirectories are ignored.
- Eligibility is :func:`is_supported_video`.
- When several files qualify, the lexicographically first one wins.
- When ``folder`` itself is a video file, it is returned as-is
(single-file releases are valid).
- When ``folder`` doesn't exist or isn't a directory (and isn't a
video file either), returns ``None``.
"""
if folder.is_file():
return folder if is_supported_video(folder, kb) else None
if not folder.is_dir():
return None
candidates = sorted(
child for child in folder.iterdir() if is_supported_video(child, kb)
)
return candidates[0] if candidates else None
@@ -1,116 +0,0 @@
"""SubtitlePlacer — hard-links matched subtitle tracks next to the destination video."""
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from alfred.domain.subtitles_TO_CHECK.entities import SubtitleScanResult
from alfred.domain.subtitles_TO_CHECK.value_objects import SubtitleType
logger = logging.getLogger(__name__)
def _build_dest_name(track: SubtitleScanResult, video_stem: str) -> str:
"""
Build the destination filename for a subtitle track.
Format: {video_stem}.{lang}.{ext}
{video_stem}.{lang}.sdh.{ext}
{video_stem}.{lang}.forced.{ext}
"""
if not track.language or not track.format:
raise ValueError("Cannot compute destination name: language or format missing")
ext = track.format.extensions[0].lstrip(".")
parts = [video_stem, track.language.code]
if track.subtitle_type == SubtitleType.SDH:
parts.append("sdh")
elif track.subtitle_type == SubtitleType.FORCED:
parts.append("forced")
return ".".join(parts) + "." + ext
@dataclass
class PlacedTrack:
source: Path
destination: Path
filename: str
@dataclass
class PlaceResult:
placed: list[PlacedTrack]
skipped: list[tuple[SubtitleScanResult, str]] # (track, reason)
@property
def placed_count(self) -> int:
return len(self.placed)
@property
def skipped_count(self) -> int:
return len(self.skipped)
class SubtitlePlacer:
"""
Hard-links matched SubtitleScanResult files next to a destination video.
Uses the same hard-link strategy as FileManager.copy_file:
instant, no data duplication, qBittorrent keeps seeding.
Embedded tracks are skipped — nothing to place on disk.
"""
def place(
self,
tracks: list[SubtitleScanResult],
destination_video: Path,
) -> PlaceResult:
placed: list[PlacedTrack] = []
skipped: list[tuple[SubtitleScanResult, str]] = []
dest_dir = destination_video.parent
for track in tracks:
if track.is_embedded:
logger.debug(f"SubtitlePlacer: skip embedded track ({track.language})")
skipped.append((track, "embedded — no file to place"))
continue
if not track.file_path:
skipped.append((track, "source file not set"))
continue
try:
dest_name = _build_dest_name(track, destination_video.stem)
except ValueError as e:
skipped.append((track, str(e)))
continue
dest_path = dest_dir / dest_name
try:
os.link(track.file_path, dest_path)
placed.append(
PlacedTrack(
source=track.file_path,
destination=dest_path,
filename=dest_name,
)
)
logger.info(f"SubtitlePlacer: placed {dest_name}")
except FileNotFoundError:
skipped.append((track, "source file not found"))
except FileExistsError:
logger.debug(f"SubtitlePlacer: skip {dest_name} — already exists")
skipped.append((track, "destination already exists"))
except OSError as e:
logger.warning(f"SubtitlePlacer: failed to place {dest_name}: {e}")
skipped.append((track, str(e)))
logger.info(
f"SubtitlePlacer: {len(placed)} placed, {len(skipped)} skipped "
f"for {destination_video.name}"
)
return PlaceResult(placed=placed, skipped=skipped)
@@ -1,21 +0,0 @@
"""TV-show orchestrators — operate on the Alfred-managed TV library tree.
The TV library is a directory of show folders (one per TV show), each
holding season folders containing video files. Modules here walk this
tree and reconstruct on-disk :class:`SeriesRelease` aggregates by
reusing the existing release pipeline (``inspect_release``) rather
than duplicating its parse/probe logic.
"""
from .dto import SearchShowResponse, ShowHit
from .search_show import SearchShowUseCase
from .walker import SeasonFolder, ShowTree, walk_show
__all__ = [
"SearchShowResponse",
"SearchShowUseCase",
"SeasonFolder",
"ShowHit",
"ShowTree",
"walk_show",
]
@@ -1,39 +0,0 @@
"""TV show application DTOs."""
from dataclasses import dataclass, field
@dataclass(frozen=True)
class ShowHit:
"""One TV-show hit, flattened for transport to the agent."""
tmdb_id: int
name: str
first_air_year: int | None = None
def to_dict(self) -> dict:
out: dict = {"tmdb_id": self.tmdb_id, "name": self.name}
if self.first_air_year is not None:
out["first_air_year"] = self.first_air_year
return out
@dataclass
class SearchShowResponse:
"""Response from searching for a TV show."""
status: str
hits: list[ShowHit] = field(default_factory=list)
error: str | None = None
message: str | None = None
def to_dict(self):
result: dict = {"status": self.status}
if self.error:
result["error"] = self.error
result["message"] = self.message
else:
result["hits"] = [h.to_dict() for h in self.hits]
return result
@@ -1,59 +0,0 @@
"""Search TV show use case."""
import logging
from alfred.infrastructure.api_TO_CHECK.tmdb import (
TMDBAPIError,
TMDBClient,
TMDBConfigurationError,
)
from .dto import SearchShowResponse, ShowHit
logger = logging.getLogger(__name__)
class SearchShowUseCase:
"""List TV shows matching a free-text query via TMDB ``/search/tv``.
Symmetric to :class:`alfred.application.movies.SearchMovieUseCase`:
thin orchestrator, flattens domain VOs into agent-friendly
primitives, no ``imdb_id`` enrichment (caller follows up with
:meth:`TMDBClient.get_tv_show_info` on a chosen ``tmdb_id``).
"""
def __init__(self, tmdb_client: TMDBClient):
self.tmdb_client = tmdb_client
def execute(self, show_title: str) -> SearchShowResponse:
try:
results = self.tmdb_client.search_shows(show_title)
hits = [
ShowHit(
tmdb_id=r.tmdb_id.value,
name=r.name,
first_air_year=r.first_air_year,
)
for r in results
]
logger.info(f"search_shows({show_title!r}) → {len(hits)} hits")
return SearchShowResponse(status="ok", hits=hits)
except TMDBConfigurationError as e:
logger.error(f"TMDB configuration error: {e}")
return SearchShowResponse(
status="error", error="configuration_error", message=str(e)
)
except TMDBAPIError as e:
logger.error(f"TMDB API error: {e}")
return SearchShowResponse(
status="error", error="api_error", message=str(e)
)
except ValueError as e:
logger.error(f"Validation error: {e}")
return SearchShowResponse(
status="error", error="validation_failed", message=str(e)
)
@@ -1,208 +0,0 @@
"""Show tree walker — minimal filesystem traversal of a TV show folder.
The walker is intentionally dumb: it lists season folders, classifies
each one as PACK or EPISODIC by **inspecting its filesystem
structure**, and hands the orchestrator a flat list of video files
per season. It does not parse release names, run ffprobe, or
classify subtitle files. All of that intelligence lives in the
existing release pipeline (``inspect_release`` + downstream
services); the walker just hands the orchestrator the paths to feed
into that pipeline.
Folder convention
-----------------
Inside an Alfred-managed library, a show root looks like::
Foundation/
Foundation.S01.1080p.WEB-DL.x265-GROUP/ ← PACK season
Foundation.S01E01.1080p.WEB-DL.x265.mkv ← flat video
Foundation.S01E02.1080p.WEB-DL.x265.mkv
...
Foundation.S02/ ← EPISODIC season
Foundation.S02E01.1080p.WEB-DL.x265-GROUP/ ← episode subfolder
Foundation.S02E01.1080p.WEB-DL.x265-GROUP.mkv
Foundation.S02E02.1080p.WEB-DL.x265-OTHER/
Foundation.S02E02.1080p.WEB-DL.x265-OTHER.mkv
The walker recognizes a season folder by a ``Sxx`` token anywhere in
its name (case-insensitive). It does **not** care about Plex-style
names (``Season 01``, ``Specials``) — the Alfred library uses
release-style folder names only.
PACK vs EPISODIC is a **structural distinction**, not a naming one:
* **PACK** — season folder contains N flat video files. No
subfolders.
* **EPISODIC** — season folder contains N subfolders, each holding
exactly one video.
A season folder that mixes the two layouts (some flat videos AND
some subfolders) is malformed: the walker reports
``mode=None`` and an empty ``video_files`` tuple so the
orchestrator can warn and skip it.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from alfred.domain.releases_TO_CHECK.ports import ReleaseKnowledge
from alfred.domain.releases_TO_CHECK.value_objects import ReleaseMode
from alfred.domain.shared_TO_CHECK.ports import FilesystemScanner
_LOG = logging.getLogger(__name__)
# Matches any ``Sxx`` token (1-2 digits) bounded by non-alphanumerics.
# Examples that match: ``Foundation.S01.1080p`` , ``S2.Pack`` , ``BBC.s10.bluray``.
# Examples that don't: ``Sample`` , ``Soundtrack`` , ``2024.S0E1`` (no S+digits boundary).
_SEASON_TOKEN_RE = re.compile(r"(?<![A-Za-z0-9])s(\d{1,2})(?![A-Za-z0-9])", re.IGNORECASE)
@dataclass(frozen=True)
class SeasonFolder:
"""One season folder discovered inside a show root.
``mode`` is set by the walker from the FS structure:
* :attr:`ReleaseMode.PACK` — ``video_files`` lists the season
folder's flat videos.
* :attr:`ReleaseMode.EPISODIC` — ``video_files`` lists each
episode subfolder's single video.
* ``None`` — the folder is empty, malformed (mixed layout), or
otherwise unclassifiable. ``video_files`` is empty. The
orchestrator decides whether to warn/skip.
"""
season_dir: Path
mode: ReleaseMode | None
video_files: tuple[Path, ...]
@dataclass(frozen=True)
class ShowTree:
"""The full structural snapshot of a show on disk."""
show_root: Path
season_folders: tuple[SeasonFolder, ...]
def walk_show(
show_root: Path,
*,
scanner: FilesystemScanner,
kb: ReleaseKnowledge,
) -> ShowTree:
"""Walk ``show_root`` and return its structural tree.
The walker:
* lists direct children of ``show_root``,
* keeps the directories whose name contains a ``Sxx`` token,
* classifies each season folder as PACK / EPISODIC / unknown by
inspecting its direct children (videos vs subfolders),
* for EPISODIC, descends one extra level into each episode
subfolder to collect its single video,
* sorts season folders by name and video files by name within
each folder.
The walker never raises — empty / unreadable / malformed
directories surface as a ``SeasonFolder`` with ``mode=None`` and
an empty ``video_files`` tuple.
"""
video_exts = {ext.lower() for ext in kb.video_extensions}
season_folders: list[SeasonFolder] = []
for entry in scanner.scan_dir(show_root):
if not entry.is_dir or not _SEASON_TOKEN_RE.search(entry.name):
continue
season_folders.append(
_classify_season(entry.path, scanner=scanner, video_exts=video_exts)
)
return ShowTree(
show_root=show_root, season_folders=tuple(season_folders)
)
# --------------------------------------------------------------------------- #
# Season-folder classification #
# --------------------------------------------------------------------------- #
def _classify_season(
season_dir: Path,
*,
scanner: FilesystemScanner,
video_exts: set[str],
) -> SeasonFolder:
"""Inspect one season folder and decide PACK / EPISODIC / unknown.
Looks only at direct children. For EPISODIC, descends one extra
level into each subfolder to collect its single video. Mixed
layouts (flat videos + subfolders) are reported as ``mode=None``
so the orchestrator can skip them with a warning.
"""
flat_videos: list[Path] = []
subdirs: list[Path] = []
for child in scanner.scan_dir(season_dir):
if child.is_file and child.suffix.lower() in video_exts:
flat_videos.append(child.path)
elif child.is_dir:
subdirs.append(child.path)
# Anything else (non-video files like .nfo, .srt at the season
# root) is ignored — it doesn't affect classification.
has_flat = bool(flat_videos)
has_subdirs = bool(subdirs)
if has_flat and has_subdirs:
_LOG.warning(
"walker: season folder %s mixes flat videos and subfolders — "
"malformed layout, skipping",
season_dir,
)
return SeasonFolder(season_dir=season_dir, mode=None, video_files=())
if has_flat:
return SeasonFolder(
season_dir=season_dir,
mode=ReleaseMode.PACK,
video_files=tuple(sorted(flat_videos)),
)
if has_subdirs:
episode_videos: list[Path] = []
for sub in sorted(subdirs):
videos_in_sub = [
child.path
for child in scanner.scan_dir(sub)
if child.is_file and child.suffix.lower() in video_exts
]
if len(videos_in_sub) == 0:
_LOG.warning(
"walker: episode subfolder %s contains no video — skipping",
sub,
)
continue
if len(videos_in_sub) > 1:
_LOG.warning(
"walker: episode subfolder %s contains %d videos — "
"malformed, skipping season %s",
sub,
len(videos_in_sub),
season_dir,
)
return SeasonFolder(
season_dir=season_dir, mode=None, video_files=()
)
episode_videos.append(videos_in_sub[0])
return SeasonFolder(
season_dir=season_dir,
mode=ReleaseMode.EPISODIC,
video_files=tuple(episode_videos),
)
# No flat videos, no subdirs → empty season folder.
return SeasonFolder(season_dir=season_dir, mode=None, video_files=())
-91
View File
@@ -1,91 +0,0 @@
"""Movie domain entities."""
from dataclasses import dataclass
from ..shared_TO_CHECK.value_objects import ImdbId, TmdbId
from .value_objects import MovieTitle, ReleaseYear
@dataclass(frozen=True, eq=False)
class Movie:
"""
Movie aggregate root for the movies domain.
TMDB-only aggregate: carries identity (``tmdb_id`` + optional
``imdb_id``) plus the catalog facts that come from TMDB (``title``,
``release_year``). Filesystem-side concerns (file path, quality,
tracks, ``added_at``) live on :class:`alfred.domain.releases.entities.
MovieRelease`, the per-movie release aggregate persisted alongside.
Frozen: rebuild via ``dataclasses.replace`` to project metadata
updates (e.g. a TMDB refresh) onto a new instance.
Equality is identity-based on ``tmdb_id``: two ``Movie`` instances
are equal iff they share the same primary key. ``imdb_id`` is a
secondary anchor and not part of the identity.
"""
tmdb_id: TmdbId
title: MovieTitle
imdb_id: ImdbId | None = None
release_year: ReleaseYear | None = None
def __post_init__(self) -> None:
if not isinstance(self.tmdb_id, TmdbId):
raise ValueError(
f"tmdb_id must be TmdbId, got {type(self.tmdb_id)}"
)
if not isinstance(self.title, MovieTitle):
if isinstance(self.title, str):
object.__setattr__(self, "title", MovieTitle(self.title))
else:
raise ValueError(
f"title must be MovieTitle or str, got {type(self.title)}"
)
if self.imdb_id is not None and not isinstance(self.imdb_id, ImdbId):
raise ValueError(
f"imdb_id must be ImdbId or None, got {type(self.imdb_id)}"
)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Movie):
return NotImplemented
return self.tmdb_id == other.tmdb_id
def __hash__(self) -> int:
return hash(self.tmdb_id)
# WRONG
def get_folder_name(self) -> str:
"""
Get the folder name for this movie.
Format: "Title (Year)"
Example: "Inception (2010)"
"""
if self.release_year:
return f"{self.title.value} ({self.release_year.value})"
return self.title.value
# WRONG
def get_filename(self) -> str:
"""
Get the suggested base filename (without extension) for this movie.
Format: ``Title.Year`` (quality lives on
:class:`alfred.domain.releases.entities.MovieRelease` now and is
appended by the release-aware caller — typically the rescan /
organize flow, after Phase 4).
Example: ``Inception.2010``.
"""
parts = [self.title.normalized()]
if self.release_year:
parts.append(str(self.release_year.value))
return ".".join(parts)
def __str__(self) -> str:
return f"{self.title.value} ({self.release_year.value if self.release_year else 'Unknown'})"
def __repr__(self) -> str:
return f"Movie(tmdb_id={self.tmdb_id}, title='{self.title.value}')"
@@ -1,38 +0,0 @@
"""Filesystem release aggregates — what the user owns on disk.
This bounded context is intentionally separated from
``alfred.domain.tv_shows`` / ``alfred.domain.movies`` (TMDB identity).
A :class:`SeriesRelease` describes the physical files on disk for one
show; a :class:`TVShow` describes the work as catalogued by TMDB. The
two are linked by :class:`~alfred.domain.shared.value_objects.TmdbId`
in the persistence layer, never by direct reference.
Not to be confused with ``alfred.domain.release`` (singular) which
parses release **names** (strings → tokens). The two packages may be
merged later; for now they coexist as separate concerns.
"""
from .builders import SeasonReleaseBuilder, SeriesReleaseBuilder
from .entities import (
EpisodeRelease,
MovieRelease,
SeasonRelease,
SeriesRelease,
TrackProfile,
)
from .repositories import MovieReleaseRepository, SeriesReleaseRepository
from .value_objects import EpisodeRange, ReleaseMode
__all__ = [
"EpisodeRange",
"EpisodeRelease",
"MovieRelease",
"MovieReleaseRepository",
"ReleaseMode",
"SeasonRelease",
"SeasonReleaseBuilder",
"SeriesRelease",
"SeriesReleaseBuilder",
"SeriesReleaseRepository",
"TrackProfile",
]
-243
View File
@@ -1,243 +0,0 @@
"""Builders for the filesystem release aggregates.
The aggregates are frozen — :class:`SeriesRelease`, :class:`SeasonRelease`,
and :class:`EpisodeRelease` are ``@dataclass(frozen=True)`` and offer no
mutation methods. All construction goes through these builders, which
assemble the aggregate piece by piece and emit a frozen instance via
``build()``.
Typical usage during a filesystem walk::
builder = SeriesReleaseBuilder(tmdb_id=TmdbId(84958), imdb_id=ImdbId("tt0804484"))
sb = builder.season_builder(SeasonNumber(1), folder="Show.S01", mode=ReleaseMode.PACK)
sb.add_episode(EpisodeRelease(
episodes=EpisodeRange(EpisodeNumber(1), EpisodeNumber(1)),
file_path=FilePath("Show.S01/Show.S01E01.mkv"),
tracks=TrackProfile(),
))
release = builder.build()
Builders are **single-use scratchpads**: they hold mutable state during
construction, then produce an immutable aggregate.
Invariants enforced at ``build()`` time:
* Seasons are emitted sorted by ``season_number``.
* Episodes within each season are emitted sorted by their
``EpisodeRange.start`` (so a season with ``E01-E03`` + ``E04`` is
emitted in that order).
* No two ``EpisodeRelease`` within a season may overlap (same TMDB
episode covered by two distinct files) — raises ``ValidationError``.
"""
from __future__ import annotations
from ..shared_TO_CHECK.exceptions import ValidationError
from ..shared_TO_CHECK.value_objects import ImdbId, TmdbId
from ..tv_shows.value_objects import SeasonNumber
from .entities import (
EpisodeRelease,
SeasonRelease,
SeriesRelease,
)
from .value_objects import ReleaseMode
# ════════════════════════════════════════════════════════════════════════════
# MovieReleaseBuilder
# ════════════════════════════════════════════════════════════════════════════
# ...
# ════════════════════════════════════════════════════════════════════════════
# SeasonReleaseBuilder
# ════════════════════════════════════════════════════════════════════════════
class SeasonReleaseBuilder:
"""
Mutable scratchpad for a :class:`SeasonRelease`.
Episodes are appended in arbitrary order; ``build()`` sorts them by
their range start before emitting the frozen aggregate and verifies
there are no overlapping ranges.
"""
def __init__(
self,
season_number: SeasonNumber | int,
*,
folder: str,
mode: ReleaseMode,
) -> None:
if isinstance(season_number, int):
season_number = SeasonNumber(season_number)
self._season_number: SeasonNumber = season_number
self._folder: str = folder
self._mode: ReleaseMode = mode
self._episodes: list[EpisodeRelease] = []
@classmethod
def from_existing(cls, season: SeasonRelease) -> SeasonReleaseBuilder:
"""Seed a builder from an existing frozen :class:`SeasonRelease`."""
builder = cls(
season.season_number,
folder=season.folder,
mode=season.mode,
)
builder._episodes = list(season.episodes)
return builder
@property
def season_number(self) -> SeasonNumber:
return self._season_number
@property
def mode(self) -> ReleaseMode:
return self._mode
def set_folder(self, folder: str) -> SeasonReleaseBuilder:
self._folder = folder
return self
def set_mode(self, mode: ReleaseMode) -> SeasonReleaseBuilder:
self._mode = mode
return self
def add_episode(self, episode: EpisodeRelease) -> SeasonReleaseBuilder:
"""Append a physical-file :class:`EpisodeRelease` to this season."""
self._episodes.append(episode)
return self
def build(self) -> SeasonRelease:
"""Emit a frozen :class:`SeasonRelease` with episodes sorted.
Raises :class:`ValidationError` if any two episode ranges overlap
(same TMDB slot claimed by two distinct files).
"""
ordered = tuple(
sorted(self._episodes, key=lambda ep: ep.episodes.start.value)
)
# Overlap check — ranges are inclusive on both ends, sorted by start.
for prev, curr in zip(ordered, ordered[1:], strict=False):
if curr.episodes.start.value <= prev.episodes.end.value:
raise ValidationError(
f"SeasonRelease season {self._season_number}: overlapping "
f"episode ranges {prev.episodes} and {curr.episodes}"
)
return SeasonRelease(
season_number=self._season_number,
folder=self._folder,
mode=self._mode,
episodes=ordered,
)
# ════════════════════════════════════════════════════════════════════════════
# SeriesReleaseBuilder
# ════════════════════════════════════════════════════════════════════════════
class SeriesReleaseBuilder:
"""
Mutable scratchpad for the :class:`SeriesRelease` aggregate root.
Seasons are tracked via internal :class:`SeasonReleaseBuilder`
instances keyed by :class:`SeasonNumber`.
"""
def __init__(
self,
*,
tmdb_id: TmdbId | int,
imdb_id: ImdbId | str | None = None,
) -> None:
if isinstance(tmdb_id, int):
tmdb_id = TmdbId(tmdb_id)
if isinstance(imdb_id, str):
imdb_id = ImdbId(imdb_id)
self._tmdb_id: TmdbId = tmdb_id
self._imdb_id: ImdbId | None = imdb_id
self._season_builders: dict[SeasonNumber, SeasonReleaseBuilder] = {}
@classmethod
def from_existing(cls, release: SeriesRelease) -> SeriesReleaseBuilder:
"""Seed a builder from an existing frozen :class:`SeriesRelease`."""
builder = cls(
tmdb_id=release.tmdb_id,
imdb_id=release.imdb_id,
)
for season in release.seasons:
builder._season_builders[season.season_number] = (
SeasonReleaseBuilder.from_existing(season)
)
return builder
# ── Top-level mutators ─────────────────────────────────────────────────
def set_imdb_id(self, imdb_id: ImdbId | str | None) -> SeriesReleaseBuilder:
if isinstance(imdb_id, str):
imdb_id = ImdbId(imdb_id)
self._imdb_id = imdb_id
return self
# ── Content ────────────────────────────────────────────────────────────
def season_builder(
self,
season_number: SeasonNumber | int,
*,
folder: str | None = None,
mode: ReleaseMode | None = None,
) -> SeasonReleaseBuilder:
"""
Return (creating if needed) the :class:`SeasonReleaseBuilder` for a
season.
``folder`` and ``mode`` are required when the builder does not yet
exist for this season; subsequent calls may pass them to override.
"""
if isinstance(season_number, int):
season_number = SeasonNumber(season_number)
sb = self._season_builders.get(season_number)
if sb is None:
if folder is None or mode is None:
raise ValidationError(
f"season_builder({season_number}): folder and mode "
f"are required to create a new season builder"
)
sb = SeasonReleaseBuilder(season_number, folder=folder, mode=mode)
self._season_builders[season_number] = sb
else:
if folder is not None:
sb.set_folder(folder)
if mode is not None:
sb.set_mode(mode)
return sb
def add_season(self, season: SeasonRelease) -> SeriesReleaseBuilder:
"""
Attach (or replace) a fully-built :class:`SeasonRelease`.
Replaces any existing season with the same number.
"""
self._season_builders[season.season_number] = (
SeasonReleaseBuilder.from_existing(season)
)
return self
# ── Emit ───────────────────────────────────────────────────────────────
def build(self) -> SeriesRelease:
"""Emit a frozen :class:`SeriesRelease` with seasons sorted by number."""
ordered_seasons = tuple(
self._season_builders[n].build()
for n in sorted(self._season_builders, key=lambda x: x.value)
)
return SeriesRelease(
tmdb_id=self._tmdb_id,
imdb_id=self._imdb_id,
seasons=ordered_seasons,
)
-217
View File
@@ -1,217 +0,0 @@
"""Filesystem release aggregates.
The release domain models what the user owns on disk — one
:class:`SeriesRelease` per show, one :class:`MovieRelease` per movie.
TMDB identity (title, status, episode_count, …) lives in the
``tv_shows`` / ``movies`` domains and is linked via the
:class:`~alfred.domain.shared.value_objects.TmdbId` natural key.
All entities are frozen. Mutation goes through the builders in
:mod:`alfred.domain.releases.builders`.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from ..shared_TO_CHECK.exceptions import ValidationError
from ..shared_TO_CHECK.media import AudioTrack, SubtitleTrack
from ..shared_TO_CHECK.value_objects import FilePath, ImdbId, TmdbId
from ..tv_shows.value_objects import SeasonNumber
from .value_objects import EpisodeRange, ReleaseMode
__all__ = [
"EpisodeRelease",
"MovieRelease",
"SeasonRelease",
"SeriesRelease",
"TrackProfile",
]
@dataclass(frozen=True)
class TrackProfile:
"""
Audio + subtitle tracks of one physical file.
Tracks live per-file (not per-season): every ``EpisodeRelease`` and
``MovieRelease`` carries its own ``TrackProfile``. Season-level
aggregation is computed by the caller when needed.
"""
audio_tracks: tuple[AudioTrack, ...] = ()
subtitle_tracks: tuple[SubtitleTrack, ...] = ()
@dataclass(frozen=True)
class EpisodeRelease:
"""
One physical episode file (or multi-episode file) on disk.
:attr:`episodes` is an :class:`EpisodeRange` — a single ``.mkv``
that covers ``S01E02E03`` carries ``EpisodeRange(start=E02, end=E03)``
and is recorded once. The library index lists it under each covered
slot (``E02``, ``E03``) for symmetric lookups.
:attr:`file_path` is **relative to the show root** (e.g.
``"Show.S01/Show.S01E02.mkv"`` for PACK,
``"Show.S01/Show.S01E02-RG/Show.S01E02-RG.mkv"`` for EPISODIC).
The caller (repository) prepends the absolute show root when
needed.
"""
episodes: EpisodeRange
file_path: FilePath
tracks: TrackProfile = TrackProfile()
@dataclass(frozen=True)
class SeasonRelease:
"""
All physical files on disk for one season of a show.
The :attr:`mode` flag records the filesystem layout:
* :attr:`ReleaseMode.PACK` — the season folder contains N video
files directly. ``episodes`` lists each ``.mkv`` in the folder.
* :attr:`ReleaseMode.EPISODIC` — the season folder contains N
sub-folders, each with one episode. ``episodes`` lists each
``(subfolder, file)`` pair.
:attr:`folder` is the season folder name, relative to the show root.
Invariant: every ``EpisodeRelease.episodes`` range stays within
sane bounds (validated at construction). Cross-episode duplicate
detection (two files claiming the same TMDB slot) is the
builder's job, not the entity's.
"""
season_number: SeasonNumber
folder: str
mode: ReleaseMode
episodes: tuple[EpisodeRelease, ...] = ()
def __post_init__(self) -> None:
if not isinstance(self.season_number, SeasonNumber):
raise ValidationError(
f"SeasonRelease.season_number must be SeasonNumber, "
f"got {type(self.season_number)}"
)
if not isinstance(self.mode, ReleaseMode):
raise ValidationError(
f"SeasonRelease.mode must be ReleaseMode, got {type(self.mode)}"
)
if not isinstance(self.folder, str) or not self.folder:
raise ValidationError(
f"SeasonRelease.folder must be a non-empty string, "
f"got {self.folder!r}"
)
def episode_count(self) -> int:
"""
Total number of TMDB episode slots covered by all physical files.
Sums each :meth:`EpisodeRange.count` — a season with two files
``E01`` + ``E02-E03`` returns ``3`` (one slot from the first
file, two from the second).
Compared by the caller against the library index's TMDB
``episode_count`` to detect incomplete seasons.
"""
return sum(ep.episodes.count() for ep in self.episodes)
@dataclass(frozen=True)
class SeriesRelease:
"""
All physical seasons on disk for one show.
Anchored to TMDB by :attr:`tmdb_id` (primary key). :attr:`imdb_id`
is optional and stored as a secondary anchor — useful for the
occasional show without TMDB coverage, and for cross-checking
when both ids are known.
Seasons are exposed sorted by ``season_number`` (the builder
enforces this on emit). No duplicate ``season_number`` is
permitted across :attr:`seasons`.
"""
tmdb_id: TmdbId
imdb_id: ImdbId | None
seasons: tuple[SeasonRelease, ...] = ()
def __post_init__(self) -> None:
if not isinstance(self.tmdb_id, TmdbId):
raise ValidationError(
f"SeriesRelease.tmdb_id must be TmdbId, got {type(self.tmdb_id)}"
)
if self.imdb_id is not None and not isinstance(self.imdb_id, ImdbId):
raise ValidationError(
f"SeriesRelease.imdb_id must be ImdbId or None, "
f"got {type(self.imdb_id)}"
)
seen: set[int] = set()
for s in self.seasons:
if s.season_number.value in seen:
raise ValidationError(
f"SeriesRelease has duplicate season "
f"{s.season_number}"
)
seen.add(s.season_number.value)
def get_season(self, season_number: SeasonNumber) -> SeasonRelease | None:
"""Return the :class:`SeasonRelease` for ``season_number`` or ``None``."""
for s in self.seasons:
if s.season_number == season_number:
return s
return None
@dataclass(frozen=True)
class MovieRelease:
"""
A single physical movie file on disk.
Anchored to TMDB by :attr:`tmdb_id`; :attr:`imdb_id` optional
secondary anchor.
:attr:`folder` is the movie folder name relative to the
``movies/`` library root. :attr:`file_path` is the video file
name relative to the folder (movies are one folder, one file in
Alfred's layout — no sub-folders).
:attr:`added_at` is the UTC timestamp at which the release was
first observed in the library — set by the caller (organizer /
rescan) when the aggregate is built. Persisted by the v2 movie
sidecar; not derived from the filesystem (mtime drifts across
moves and hard-links).
"""
tmdb_id: TmdbId
imdb_id: ImdbId | None
folder: str
file_path: FilePath
added_at: datetime
tracks: TrackProfile = TrackProfile()
def __post_init__(self) -> None:
if not isinstance(self.tmdb_id, TmdbId):
raise ValidationError(
f"MovieRelease.tmdb_id must be TmdbId, got {type(self.tmdb_id)}"
)
if self.imdb_id is not None and not isinstance(self.imdb_id, ImdbId):
raise ValidationError(
f"MovieRelease.imdb_id must be ImdbId or None, "
f"got {type(self.imdb_id)}"
)
if not isinstance(self.folder, str) or not self.folder:
raise ValidationError(
f"MovieRelease.folder must be a non-empty string, "
f"got {self.folder!r}"
)
if not isinstance(self.added_at, datetime):
raise ValidationError(
f"MovieRelease.added_at must be datetime, "
f"got {type(self.added_at)}"
)
@@ -1,27 +0,0 @@
"""Release parser v2 — annotate-based pipeline.
This package is the future home of ``parse_release``. It restructures the
parsing logic around a **tokenize → annotate → assemble** pipeline:
1. **tokenize**: split the release name into atomic tokens.
2. **annotate**: walk tokens left-to-right, assigning each one a
:class:`TokenRole` (TITLE, YEAR, SEASON, RESOLUTION, …) using the
injected :class:`~alfred.domain.release.ports.knowledge.ReleaseKnowledge`.
3. **assemble**: fold the annotated tokens into a :class:`ParsedRelease`.
The pipeline has three internal paths driven by the detected release group:
- **EASY**: known group (KONTRAST, RARBG, …) with a schema-driven layout
declared in ``knowledge/release/release_groups/<group>.yaml``.
- **SHITTY**: unknown group, best-effort matching against the global
knowledge sets, with a 0-100 confidence score.
- **PATH OF PAIN**: score below threshold OR critical chunks missing —
signaled to the caller, who decides whether to involve the LLM/user.
"""
from __future__ import annotations
from .schema import GroupSchema, SchemaChunk
from .tokens import Token, TokenRole
__all__ = ["GroupSchema", "SchemaChunk", "Token", "TokenRole"]
@@ -1,762 +0,0 @@
"""Annotate-based pipeline.
Three stages:
1. :func:`tokenize` — release name → ``list[Token]`` (all UNKNOWN), plus
a separately-returned site tag (e.g. ``[YTS.MX]``) that is never
tokenized.
2. :func:`annotate` — promote each token's :class:`TokenRole` using the
injected knowledge base. Two sub-passes:
a. **Structural** (schema-driven, EASY only). Detects the group at
the right end, looks up its :class:`GroupSchema`, then matches
the schema's chunk sequence against the token stream. Between
two structural chunks, any number of unmatched tokens may
remain — they are left UNKNOWN for the enricher pass to handle.
b. **Enrichers** (non-positional). Walks UNKNOWN tokens and tags
audio / video-meta / edition / language roles. Multi-token
sequences (``DTS.HD.MA``, ``DV.HDR10``, ``DIRECTORS.CUT``) are
matched first, single tokens after.
3. :func:`assemble` — fold annotated tokens into a
:class:`~alfred.domain.release.value_objects.ParsedRelease`-compatible
dict.
The pipeline is **pure**: no I/O, no TMDB, no probe. All knowledge
arrives through ``kb: ReleaseKnowledge``.
"""
from __future__ import annotations
from ..ports.knowledge import ReleaseKnowledge
from alfred.domain.releases_TO_CHECK.value_objects_old_question_mark import MediaTypeToken
from .schema import GroupSchema
from .tokens import Token, TokenRole
# ---------------------------------------------------------------------------
# Stage 1 — tokenize
# ---------------------------------------------------------------------------
def strip_site_tag(name: str) -> tuple[str, str | None]:
"""Split off a ``[site.tag]`` prefix or suffix.
Returns ``(clean_name, tag)``. If no tag is found, returns
``(name.strip(), None)``.
"""
s = name.strip()
if s.startswith("["):
close = s.find("]")
if close != -1:
tag = s[1:close].strip()
remainder = s[close + 1 :].strip()
if tag and remainder:
return remainder, tag
if s.endswith("]"):
open_bracket = s.rfind("[")
if open_bracket != -1:
tag = s[open_bracket + 1 : -1].strip()
remainder = s[:open_bracket].strip()
if tag and remainder:
return remainder, tag
return s, None
def tokenize(name: str, kb: ReleaseKnowledge) -> tuple[list[Token], str | None]:
"""Split ``name`` into tokens after stripping any site tag.
String-ops style: replace every configured separator with a single
NUL byte then split. NUL cannot legally appear in a release name, so
it's a safe sentinel.
"""
clean, site_tag = strip_site_tag(name)
DELIM = "\x00"
buf = clean
for sep in kb.separators:
if sep != DELIM:
buf = buf.replace(sep, DELIM)
pieces = [p for p in buf.split(DELIM) if p]
tokens = [Token(text=p, index=i) for i, p in enumerate(pieces)]
return tokens, site_tag
# ---------------------------------------------------------------------------
# Helpers shared across passes
# ---------------------------------------------------------------------------
def _parse_season_episode(text: str) -> tuple[int, int | None, int | None] | None:
"""Parse a single token as ``SxxExx`` / ``SxxExxExx`` / ``Sxx`` /
``Sxx-yy`` (season range) / ``NxNN``.
Returns ``(season, episode, episode_end)`` or ``None`` if the token
is not a season/episode marker. For ``Sxx-yy``, returns the first
season with no episode info — the caller is expected to detect the
range form and promote ``media_type`` to ``tv_complete`` separately.
"""
upper = text.upper()
# SxxExx form (and Sxx, Sxx-yy)
if len(upper) >= 3 and upper[0] == "S" and upper[1:3].isdigit():
season = int(upper[1:3])
rest = upper[3:]
if not rest:
return season, None, None
# Sxx-yy season-range form: capture the first season, treat as a
# complete-series marker (no episode info).
if (
len(rest) == 3
and rest[0] == "-"
and rest[1:3].isdigit()
):
return season, None, None
episodes: list[int] = []
while rest.startswith("E") and len(rest) >= 3 and rest[1:3].isdigit():
episodes.append(int(rest[1:3]))
rest = rest[3:]
if not episodes:
return None
# For chained multi-episode markers (E09E10E11), the range is the
# first → last episode. Intermediate values are implied.
return season, episodes[0], episodes[-1] if len(episodes) >= 2 else None
# NxNN form
if "X" in upper:
parts = upper.split("X")
if len(parts) >= 2 and all(p.isdigit() and p for p in parts):
season = int(parts[0])
episode = int(parts[1])
episode_end = int(parts[2]) if len(parts) >= 3 else None
return season, episode, episode_end
return None
def _is_year(text: str) -> bool:
"""Return True if ``text`` is a 4-digit year in [1900, 2099]."""
return len(text) == 4 and text.isdigit() and 1900 <= int(text) <= 2099
def _split_codec_group(text: str, kb: ReleaseKnowledge) -> tuple[str, str] | None:
"""Split a ``codec-GROUP`` token into ``(codec, group)`` if it fits.
Returns ``None`` if the token doesn't match the ``codec-GROUP``
shape. Handles the empty-group case (``x265-``) as ``(codec, "")``.
"""
if "-" not in text:
return None
head, _, tail = text.rpartition("-")
if head.lower() in kb.codecs:
return head, tail
return None
def _match_role(text: str, role: TokenRole, kb: ReleaseKnowledge) -> TokenRole | None:
"""Return ``role`` if ``text`` matches it under ``kb``, else ``None``."""
lower = text.lower()
if role is TokenRole.YEAR:
return TokenRole.YEAR if _is_year(text) else None
if role is TokenRole.SEASON_EPISODE:
return (
TokenRole.SEASON_EPISODE
if _parse_season_episode(text) is not None
else None
)
if role is TokenRole.RESOLUTION:
return TokenRole.RESOLUTION if lower in kb.resolutions else None
if role is TokenRole.SOURCE:
return TokenRole.SOURCE if lower in kb.sources else None
if role is TokenRole.CODEC:
return TokenRole.CODEC if lower in kb.codecs else None
return None
# ---------------------------------------------------------------------------
# Stage 2a — group detection
# ---------------------------------------------------------------------------
def _detect_group(tokens: list[Token], kb: ReleaseKnowledge) -> tuple[str, int | None]:
"""Identify the release group by walking tokens right-to-left.
Returns ``(group_name, token_index_carrying_group)``. ``index`` is
``None`` when the group is absent (no trailing ``-`` in the stream).
"""
# Priority 1: codec-GROUP shape (clearest signal).
for tok in reversed(tokens):
split = _split_codec_group(tok.text, kb)
if split is not None:
_, group = split
return (group or "UNKNOWN"), tok.index
# Priority 2: rightmost dash, excluding dashed sources (Web-DL, etc.).
for tok in reversed(tokens):
if "-" not in tok.text:
continue
head, _, tail = tok.text.rpartition("-")
if (
head.lower() in kb.sources
or tok.text.lower().replace("-", "") in kb.sources
):
continue
if tail:
return tail, tok.index
return "UNKNOWN", None
# ---------------------------------------------------------------------------
# Stage 2b — structural annotation (schema-driven)
# ---------------------------------------------------------------------------
def _annotate_structural(
tokens: list[Token],
kb: ReleaseKnowledge,
schema: GroupSchema,
group_token_index: int,
) -> list[Token] | None:
"""Annotate structural tokens following a known group schema.
Walks the schema's chunks against the body (tokens up to the group
token). For each chunk, scans forward in the body for a matching
token — tokens passed over without match are left UNKNOWN (the
enricher pass will handle them).
Returns ``None`` if any mandatory chunk fails to find a match.
"""
result = list(tokens)
# The codec-GROUP token carries CODEC + GROUP. Split it now so the
# schema walk knows the codec is "pre-consumed" at the end.
group_token = result[group_token_index]
cg_split = _split_codec_group(group_token.text, kb)
codec_pre_consumed = False
if cg_split is not None:
codec, group = cg_split
result[group_token_index] = group_token.with_role(
TokenRole.CODEC, codec=codec, group=group or "UNKNOWN"
)
codec_pre_consumed = True
else:
head, _, tail = group_token.text.rpartition("-")
result[group_token_index] = group_token.with_role(
TokenRole.GROUP, group=tail or "UNKNOWN", prefix=head
)
body_end = group_token_index # exclusive
tok_idx = 0
chunk_idx = 0
# 1) TITLE — leftmost contiguous tokens up to the first structural
# boundary. Title is special because it can be multi-token.
while (
chunk_idx < len(schema.chunks)
and schema.chunks[chunk_idx].role is TokenRole.TITLE
):
title_end = _find_title_end(result, body_end, kb)
for i in range(tok_idx, title_end):
result[i] = result[i].with_role(TokenRole.TITLE)
tok_idx = title_end
chunk_idx += 1
# 2) Remaining structural chunks. For each, scan forward in the body
# for a matching token; tokens passed over remain UNKNOWN.
for chunk in schema.chunks[chunk_idx:]:
if chunk.role is TokenRole.GROUP:
continue
if chunk.role is TokenRole.CODEC and codec_pre_consumed:
continue
match_idx = _find_chunk(result, tok_idx, body_end, chunk.role, kb)
if match_idx is None:
if chunk.optional:
continue
return None
result[match_idx] = result[match_idx].with_role(chunk.role)
tok_idx = match_idx + 1
return result
def _find_title_end(
tokens: list[Token], body_end: int, kb: ReleaseKnowledge
) -> int:
"""Return the exclusive index where the title ends.
The title is the leftmost run of tokens whose text does not match
any structural role (year, season/episode, resolution, source,
codec). Enricher tokens (audio, HDR, language) are *not* boundaries
because they can appear in the middle of the structural sequence;
however, in canonical scene names they don't appear inside the title
itself, so this heuristic holds in practice.
"""
for i in range(body_end):
text = tokens[i].text
if _parse_season_episode(text) is not None:
return i
if _is_year(text):
return i
lower = text.lower()
if lower in kb.resolutions:
return i
if lower in kb.sources:
return i
if lower in kb.codecs:
return i
# codec-GROUP token (e.g. "x265-KONTRAST") or dashed source (Web-DL).
if "-" in text:
head, _, _ = text.rpartition("-")
if (
head.lower() in kb.codecs
or head.lower() in kb.sources
or text.lower().replace("-", "") in kb.sources
):
return i
return body_end
def _find_chunk(
tokens: list[Token],
start: int,
end: int,
role: TokenRole,
kb: ReleaseKnowledge,
) -> int | None:
"""Return the first index in ``[start, end)`` whose token matches ``role``.
Returns ``None`` if no token in the range matches. Tokens already
annotated (non-UNKNOWN) are skipped — they belong to another chunk.
"""
for i in range(start, end):
if tokens[i].role is not TokenRole.UNKNOWN:
continue
if _match_role(tokens[i].text, role, kb) is not None:
return i
return None
# ---------------------------------------------------------------------------
# Stage 2b' — SHITTY annotation (schema-less heuristic)
# ---------------------------------------------------------------------------
def _annotate_shitty(
tokens: list[Token],
kb: ReleaseKnowledge,
group_index: int | None,
) -> list[Token]:
"""Schema-less, dictionary-driven annotation.
SHITTY's job is narrow: for releases that *look* like scene names
but don't have a registered group schema, tag every token whose text
falls into a known YAML bucket (resolutions, codecs, sources, …).
Anything we can't classify stays UNKNOWN. The leftmost run of
UNKNOWN tokens becomes the title. Done.
Anything that requires more reasoning (parenthesized tech blocks,
bare-dashed title fragments, year-disguised slug suffixes, …) is
PATH OF PAIN territory and stays out of here on purpose.
"""
result = list(tokens)
# 1) Group token — split codec-GROUP or tag GROUP. Same logic as EASY.
if group_index is not None:
gt = result[group_index]
cg_split = _split_codec_group(gt.text, kb)
if cg_split is not None:
codec, group = cg_split
result[group_index] = gt.with_role(
TokenRole.CODEC, codec=codec, group=group or "UNKNOWN"
)
else:
_, _, tail = gt.text.rpartition("-")
result[group_index] = gt.with_role(
TokenRole.GROUP, group=tail or "UNKNOWN"
)
# 2) Enrichers (audio / video-meta / edition / language).
result = _annotate_enrichers(result, kb)
# 3) Single pass: tag each UNKNOWN token by looking it up in the kb
# buckets. First match wins per token, first occurrence wins per
# role (we don't overwrite an already-tagged role).
matchers: list[tuple[TokenRole, callable]] = [
(TokenRole.SEASON_EPISODE, lambda t: _parse_season_episode(t) is not None),
(TokenRole.YEAR, _is_year),
(TokenRole.RESOLUTION, lambda t: t.lower() in kb.resolutions),
(TokenRole.DISTRIBUTOR, lambda t: t.upper() in kb.distributors),
(TokenRole.SOURCE, lambda t: t.lower() in kb.sources),
(TokenRole.CODEC, lambda t: t.lower() in kb.codecs),
]
seen: set[TokenRole] = set()
for i, tok in enumerate(result):
if tok.role is not TokenRole.UNKNOWN:
continue
for role, matches in matchers:
if role in seen:
continue
if matches(tok.text):
result[i] = tok.with_role(role)
seen.add(role)
break
# 4) Title = leftmost contiguous UNKNOWN tokens.
for i, tok in enumerate(result):
if tok.role is not TokenRole.UNKNOWN:
break
result[i] = tok.with_role(TokenRole.TITLE)
return result
# ---------------------------------------------------------------------------
# Stage 2c — enricher pass (non-positional roles)
# ---------------------------------------------------------------------------
def _annotate_enrichers(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]:
"""Tag the remaining UNKNOWN tokens with non-positional roles.
Multi-token sequences are matched first (so ``DTS.HD.MA`` wins over
a single-token ``DTS``). For each sequence match, the first token
receives the role + ``extra["sequence"]`` (the canonical joined
value), and the trailing members are marked with the same role +
``extra["sequence_member"]=True`` so :func:`assemble` extracts the
value only from the primary.
"""
result = list(tokens)
# Multi-token sequences first.
_apply_sequences(
result, kb.audio.get("sequences", []), "codec", TokenRole.AUDIO_CODEC
)
_apply_sequences(
result, kb.video_meta.get("sequences", []), "hdr", TokenRole.HDR
)
_apply_sequences(
result, kb.editions.get("sequences", []), "edition", TokenRole.EDITION
)
# Single tokens.
known_audio_codecs = {c.upper() for c in kb.audio.get("codecs", [])}
known_audio_channels = set(kb.audio.get("channels", []))
known_hdr = {h.upper() for h in kb.video_meta.get("hdr", [])} | kb.hdr_extra
known_bit_depth = {d.lower() for d in kb.video_meta.get("bit_depth", [])}
known_editions = {t.upper() for t in kb.editions.get("tokens", [])}
# Channel layouts like "5.1" are tokenized as two tokens ("5", "1")
# because "." is a separator. Detect consecutive pairs whose joined
# value (without any trailing "-GROUP") is in the channel set.
_detect_channel_pairs(result, known_audio_channels)
for i, tok in enumerate(result):
if tok.role is not TokenRole.UNKNOWN:
continue
text = tok.text
upper = text.upper()
lower = text.lower()
if upper in known_audio_codecs:
result[i] = tok.with_role(TokenRole.AUDIO_CODEC)
continue
if text in known_audio_channels:
result[i] = tok.with_role(TokenRole.AUDIO_CHANNELS)
continue
if upper in known_hdr:
result[i] = tok.with_role(TokenRole.HDR)
continue
if lower in known_bit_depth:
result[i] = tok.with_role(TokenRole.BIT_DEPTH)
continue
if upper in known_editions:
result[i] = tok.with_role(TokenRole.EDITION)
continue
if upper in kb.language_tokens:
result[i] = tok.with_role(TokenRole.LANGUAGE)
continue
if upper in kb.distributors:
result[i] = tok.with_role(TokenRole.DISTRIBUTOR)
continue
return result
def _apply_sequences(
tokens: list[Token],
sequences: list[dict],
value_key: str,
role: TokenRole,
) -> None:
"""Mark the first occurrence of each sequence in place.
Mutates ``tokens`` (replacing entries with new role-tagged Token
instances). Sequences in the YAML must be ordered most-specific
first; the first match wins per starting position.
"""
if not sequences:
return
upper_texts = [t.text.upper() for t in tokens]
consumed: set[int] = set()
for seq in sequences:
seq_upper = [s.upper() for s in seq["tokens"]]
n = len(seq_upper)
for start in range(len(tokens) - n + 1):
if any(idx in consumed for idx in range(start, start + n)):
continue
if any(
tokens[start + k].role is not TokenRole.UNKNOWN for k in range(n)
):
continue
if upper_texts[start : start + n] == seq_upper:
tokens[start] = tokens[start].with_role(
role, sequence=seq[value_key]
)
for k in range(1, n):
tokens[start + k] = tokens[start + k].with_role(
role, sequence_member="True"
)
consumed.update(range(start, start + n))
def _detect_channel_pairs(
tokens: list[Token], known_channels: set[str]
) -> None:
"""Spot two consecutive numeric tokens that form a channel layout.
Example: ``["5", "1-KTH"]`` → joined ``"5.1"`` (after stripping the
``-GROUP`` suffix on the second). The second token may be the trailing
codec-GROUP token, in which case it's already tagged CODEC and we
skip — we'd corrupt its role.
"""
for i in range(len(tokens) - 1):
first = tokens[i]
second = tokens[i + 1]
if first.role is not TokenRole.UNKNOWN:
continue
# Strip a "-GROUP" suffix on the second token before joining.
second_text = second.text.split("-")[0]
candidate = f"{first.text}.{second_text}"
if candidate not in known_channels:
continue
# Only tag the first token (carries the channel value). The
# second token may legitimately remain UNKNOWN (or be the
# codec-GROUP token, already tagged CODEC).
tokens[i] = first.with_role(
TokenRole.AUDIO_CHANNELS, sequence=candidate
)
if second.role is TokenRole.UNKNOWN:
tokens[i + 1] = second.with_role(
TokenRole.AUDIO_CHANNELS, sequence_member="True"
)
# ---------------------------------------------------------------------------
# Stage 2 entry point
# ---------------------------------------------------------------------------
def annotate(tokens: list[Token], kb: ReleaseKnowledge) -> list[Token]:
"""Annotate token roles.
Dispatch:
* If a group is detected AND has a known schema, run the EASY
structural walk. If the schema walk aborts on a mandatory chunk
mismatch, fall through to SHITTY (the heuristic still does better
than giving up).
* Otherwise run SHITTY — schema-less, best-effort, never aborts.
The enricher pass runs in both cases. The pipeline always returns a
populated token list; downstream callers don't need to distinguish
EASY vs SHITTY at this layer (the parse_path is decided in the
service based on whether a schema matched).
"""
group_name, group_index = _detect_group(tokens, kb)
schema = kb.group_schema(group_name) if group_index is not None else None
if schema is not None and group_index is not None:
structural = _annotate_structural(tokens, kb, schema, group_index)
if structural is not None:
return _annotate_enrichers(structural, kb)
# SHITTY fallback — heuristic positional pass. ``_annotate_shitty``
# runs its own enricher pass internally (it has to, so the title
# scan can skip enricher-tagged tokens).
return _annotate_shitty(tokens, kb, group_index)
def has_known_schema(tokens: list[Token], kb: ReleaseKnowledge) -> bool:
"""Return True if ``tokens`` would take the EASY path in :func:`annotate`."""
group_name, group_index = _detect_group(tokens, kb)
if group_index is None:
return False
return kb.group_schema(group_name) is not None
# ---------------------------------------------------------------------------
# Stage 3 — assemble
# ---------------------------------------------------------------------------
def assemble(
annotated: list[Token],
site_tag: str | None,
raw_name: str,
kb: ReleaseKnowledge,
) -> dict:
"""Fold annotated tokens into a ``ParsedRelease``-compatible dict.
Returns a dict (not a ``ParsedRelease`` instance) so the caller can
layer in additional fields (``parse_path``, ``raw``, …) before
instantiation.
"""
# Pure-punctuation tokens (e.g. a stray "-" left by ` - ` separators in
# human-friendly release names) carry no title content and would leak
# into the joined title as ``"Show.-.Episode"``. Drop them here.
title_parts = [
t.text
for t in annotated
if t.role is TokenRole.TITLE and any(c.isalnum() for c in t.text)
]
title = ".".join(title_parts) if title_parts else (
annotated[0].text if annotated else raw_name
)
year: int | None = None
season: int | None = None
episode: int | None = None
episode_end: int | None = None
quality: str | None = None
source: str | None = None
codec: str | None = None
group = "UNKNOWN"
audio_codec: str | None = None
audio_channels: str | None = None
bit_depth: str | None = None
hdr_format: str | None = None
edition: str | None = None
distributor: str | None = None
languages: list[str] = []
is_season_range = False
for tok in annotated:
# Skip non-primary members of a multi-token sequence.
if tok.extra.get("sequence_member") == "True":
continue
role = tok.role
if role is TokenRole.YEAR:
year = int(tok.text)
elif role is TokenRole.SEASON_EPISODE:
parsed = _parse_season_episode(tok.text)
if parsed is not None:
season, episode, episode_end = parsed
# Detect Sxx-yy range form to flag it as a multi-season pack.
upper = tok.text.upper()
if (
len(upper) == 6
and upper[0] == "S"
and upper[1:3].isdigit()
and upper[3] == "-"
and upper[4:6].isdigit()
):
is_season_range = True
elif role is TokenRole.RESOLUTION:
quality = tok.text
elif role is TokenRole.SOURCE:
source = tok.text
elif role is TokenRole.CODEC:
codec = tok.extra.get("codec", tok.text)
if "group" in tok.extra:
group = tok.extra["group"] or "UNKNOWN"
elif role is TokenRole.GROUP:
group = tok.extra.get("group", tok.text) or "UNKNOWN"
elif role is TokenRole.AUDIO_CODEC:
if audio_codec is None:
audio_codec = tok.extra.get("sequence", tok.text)
elif role is TokenRole.AUDIO_CHANNELS:
if audio_channels is None:
audio_channels = tok.extra.get("sequence", tok.text)
elif role is TokenRole.BIT_DEPTH:
if bit_depth is None:
bit_depth = tok.text.lower()
elif role is TokenRole.HDR:
if hdr_format is None:
hdr_format = tok.extra.get("sequence", tok.text.upper())
elif role is TokenRole.EDITION:
if edition is None:
edition = tok.extra.get("sequence", tok.text.upper())
elif role is TokenRole.LANGUAGE:
languages.append(tok.text.upper())
elif role is TokenRole.DISTRIBUTOR:
if distributor is None:
distributor = tok.text.upper()
# Media type heuristic. Doc/concert/integrale tokens win over the
# generic tech-based fallback. We look across all tokens (not just
# annotated ones) because these markers may be tagged UNKNOWN by the
# structural pass — only the assemble step cares about them.
upper_tokens = {tok.text.upper() for tok in annotated}
doc_tokens = {t.upper() for t in kb.media_type_tokens.get("doc", [])}
concert_tokens = {t.upper() for t in kb.media_type_tokens.get("concert", [])}
integrale_tokens = {t.upper() for t in kb.media_type_tokens.get("integrale", [])}
if upper_tokens & doc_tokens:
media_type = MediaTypeToken.DOCUMENTARY
elif upper_tokens & concert_tokens:
media_type = MediaTypeToken.CONCERT
elif is_season_range:
media_type = MediaTypeToken.TV_COMPLETE
elif (
edition in {"COMPLETE", "INTEGRALE", "COLLECTION"}
or upper_tokens & integrale_tokens
) and season is None:
media_type = MediaTypeToken.TV_COMPLETE
elif season is not None:
media_type = MediaTypeToken.TV_SHOW
elif any((quality, source, codec, year)):
media_type = MediaTypeToken.MOVIE
else:
media_type = MediaTypeToken.UNKNOWN
return {
"title": title,
"title_sanitized": kb.sanitize_for_fs(title),
"year": year,
"season": season,
"episode": episode,
"episode_end": episode_end,
"quality": quality,
"source": source,
"codec": codec,
"group": group,
"media_type": media_type,
"site_tag": site_tag,
"languages": tuple(languages),
"audio_codec": audio_codec,
"audio_channels": audio_channels,
"bit_depth": bit_depth,
"hdr_format": hdr_format,
"edition": edition,
"distributor": distributor,
}
@@ -1,47 +0,0 @@
"""Group schema value objects.
A :class:`GroupSchema` describes the canonical chunk layout of releases
from a known group (KONTRAST, RARBG, ELiTE, …). It is the EASY-road
contract: when a release ends in ``-<GROUP>`` and we know the group,
the annotator walks the schema instead of running the heuristic SHITTY
matchers.
Schemas are loaded from ``knowledge/release/release_groups/<group>.yaml``
by an infrastructure adapter and surfaced via the
:class:`~alfred.domain.release.ports.knowledge.ReleaseKnowledge` port.
"""
from __future__ import annotations
from dataclasses import dataclass
from .tokens import TokenRole
@dataclass(frozen=True)
class SchemaChunk:
"""One entry in a group's chunk order.
``role`` is the :class:`TokenRole` the chunk maps to. ``optional``
is True for chunks that may be absent (e.g. ``year`` on TV releases,
``source`` on bare ELiTE TV releases).
"""
role: TokenRole
optional: bool = False
@dataclass(frozen=True)
class GroupSchema:
"""Schema for a known release group.
``chunks`` is the left-to-right canonical order. The annotator walks
tokens and chunks in lockstep: an optional chunk that doesn't match
the current token is skipped (the chunk index advances, the token
index stays), a mandatory chunk that doesn't match aborts the EASY
path and falls back to SHITTY.
"""
name: str
separator: str
chunks: tuple[SchemaChunk, ...]
@@ -1,139 +0,0 @@
"""Parse-confidence scoring.
``parse_release`` returns a :class:`ParseReport` alongside its
:class:`ParsedRelease`. The report carries:
- ``confidence``: integer 0100 derived from which structural and
technical fields got populated, minus a penalty per UNKNOWN token
left in the annotated stream.
- ``road``: which of the three roads the parse took
(:class:`Road.EASY` / :class:`Road.SHITTY` / :class:`Road.PATH_OF_PAIN`).
- ``unknown_tokens``: textual residue, useful for diagnostics.
- ``missing_critical``: structural fields the score-tally found absent
(e.g. ``("year", "media_type")``) — the caller can use this to drive
PoP recovery (questions, LLM call).
All weights, penalties and thresholds come from the injected knowledge
base (``kb.scoring``), itself loaded from
``alfred/knowledge/release/scoring.yaml``. No magic numbers here.
The scoring functions are pure — they consume the annotated token list
and the resulting :class:`ParsedRelease` and return the report. They are
called by ``services.parse_release`` after ``assemble`` has run.
"""
from __future__ import annotations
from enum import Enum
from ..ports.knowledge import ReleaseKnowledge
from alfred.domain.releases_TO_CHECK.value_objects_old_question_mark import ParsedRelease
from .tokens import Token, TokenRole
class Road(str, Enum):
"""How the parser handled a given release name.
Distinct from :class:`~alfred.domain.release.value_objects.TokenizationRoute`,
which records the tokenization route (DIRECT / SANITIZED / AI). Road
is about confidence in the *result*, not the *method*.
"""
EASY = "easy" # group schema matched — structural annotation
SHITTY = "shitty" # no schema, dict-driven annotation, score ≥ threshold
PATH_OF_PAIN = "path_of_pain" # score below threshold, needs help
# Critical structural fields — their absence drives the
# ``missing_critical`` list in the report.
_CRITICAL_FIELDS: tuple[str, ...] = ("title", "media_type", "year")
def _is_tv_shaped(parsed: ParsedRelease) -> bool:
"""Season/episode weights only count for releases that *look* like TV."""
return parsed.season is not None
def compute_score(
parsed: ParsedRelease,
annotated: list[Token],
kb: ReleaseKnowledge,
) -> int:
"""Compute a 0100 confidence score for the parse.
Each populated field contributes its weight from
``kb.scoring["weights"]``. Season/episode only count when the parse
looks like TV. ``group == "UNKNOWN"`` is treated as absent.
Then a penalty is subtracted per residual UNKNOWN token in
``annotated``, capped at ``penalties["max_unknown_penalty"]``.
Result is clamped to ``[0, 100]``.
"""
weights = kb.scoring["weights"]
penalties = kb.scoring["penalties"]
score = 0
if parsed.title:
score += weights.get("title", 0)
if parsed.media_type and parsed.media_type.value != "unknown":
score += weights.get("media_type", 0)
if parsed.year is not None:
score += weights.get("year", 0)
if _is_tv_shaped(parsed):
if parsed.season is not None:
score += weights.get("season", 0)
if parsed.episode is not None:
score += weights.get("episode", 0)
if parsed.quality:
score += weights.get("resolution", 0)
if parsed.source:
score += weights.get("source", 0)
if parsed.codec:
score += weights.get("codec", 0)
if parsed.group and parsed.group != "UNKNOWN":
score += weights.get("group", 0)
unknown_count = sum(1 for t in annotated if t.role is TokenRole.UNKNOWN)
raw_penalty = unknown_count * penalties.get("unknown_token", 0)
capped_penalty = min(raw_penalty, penalties.get("max_unknown_penalty", 0))
score -= capped_penalty
return max(0, min(100, score))
def collect_unknown_tokens(annotated: list[Token]) -> tuple[str, ...]:
"""Return the text of every token still tagged UNKNOWN."""
return tuple(t.text for t in annotated if t.role is TokenRole.UNKNOWN)
def collect_missing_critical(parsed: ParsedRelease) -> tuple[str, ...]:
"""Return the names of critical structural fields that are absent."""
missing: list[str] = []
if not parsed.title:
missing.append("title")
if not parsed.media_type or parsed.media_type.value == "unknown":
missing.append("media_type")
if parsed.year is None:
missing.append("year")
return tuple(missing)
def decide_road(
score: int,
has_schema: bool,
kb: ReleaseKnowledge,
) -> Road:
"""Pick the road the parse took.
EASY is decided structurally: if a known group schema matched, the
annotation walked the schema, and that's enough — the score does not
veto EASY. Otherwise the score decides between SHITTY and
PATH_OF_PAIN using ``kb.scoring["thresholds"]["shitty_min"]``.
"""
if has_schema:
return Road.EASY
threshold = kb.scoring["thresholds"].get("shitty_min", 60)
if score >= threshold:
return Road.SHITTY
return Road.PATH_OF_PAIN
@@ -1,120 +0,0 @@
"""Release domain — parsing service.
Thin orchestrator over the annotate-based pipeline in
:mod:`alfred.domain.release.parser.pipeline`. Responsibilities:
* Strip a leading/trailing ``[site.tag]`` and decide ``parse_path``.
* Reject malformed names (forbidden characters) → ``parse_path=AI`` so
the LLM can clean them up.
* Otherwise call the v2 pipeline (tokenize → annotate → assemble) and
wrap the result in :class:`ParsedRelease`.
* Score the result and decide the road (EASY / SHITTY / PATH_OF_PAIN)
via :mod:`alfred.domain.release.parser.scoring`.
The public entry point is :func:`parse_release`, which returns
``(ParsedRelease, ParseReport)``. The report carries the confidence
score, the road, and diagnostic info for downstream callers.
"""
from __future__ import annotations
from alfred.domain.releases_TO_CHECK.parser import scoring as _scoring, pipeline as _v2
from alfred.domain.releases_TO_CHECK.ports import ReleaseKnowledge
from alfred.domain.releases_TO_CHECK.value_objects_old_question_mark import MediaTypeToken, ParsedRelease, ParseReport, TokenizationRoute
def parse_release(
name: str, kb: ReleaseKnowledge
) -> tuple[ParsedRelease, ParseReport]:
"""Parse a release name.
Returns a tuple ``(ParsedRelease, ParseReport)``. The structural VO
is unchanged from the previous single-return contract; the report
is new and carries the confidence score + road decision.
Flow:
1. Strip a leading/trailing ``[site.tag]`` if present (sets
``parse_path="sanitized"``).
2. If the remainder still contains truly forbidden chars (anything
not in the configured separators), short-circuit to
``media_type="unknown"`` / ``parse_path="ai"`` and emit a
PATH_OF_PAIN report — the LLM handles these.
3. Otherwise run the v2 pipeline: tokenize → annotate (EASY when a
group schema is known, SHITTY otherwise) → assemble → score.
"""
parse_path = TokenizationRoute.DIRECT
# Apostrophes inside titles ("Don't", "L'avare") are common and should
# not push the release through the AI fallback. Strip them up front so
# both strip_site_tag and tokenize see "Dont" / "Lavare", which is good
# enough for token-level matching. The raw name is preserved on the VO.
working_name = name
if "'" in working_name:
working_name = working_name.replace("'", "")
parse_path = TokenizationRoute.SANITIZED
clean, site_tag = _v2.strip_site_tag(working_name)
if site_tag is not None:
parse_path = TokenizationRoute.SANITIZED
if not _is_well_formed(clean, kb):
parsed = ParsedRelease(
raw=name,
clean=clean,
title=clean,
title_sanitized=kb.sanitize_for_fs(clean),
year=None,
season=None,
episode=None,
episode_end=None,
quality=None,
source=None,
codec=None,
group="UNKNOWN",
media_type=MediaTypeToken.UNKNOWN,
site_tag=site_tag,
parse_path=TokenizationRoute.AI,
)
report = ParseReport(
confidence=0,
road=_scoring.Road.PATH_OF_PAIN.value,
unknown_tokens=(clean,),
missing_critical=("title", "media_type", "year"),
)
return parsed, report
tokens, v2_tag = _v2.tokenize(working_name, kb)
annotated = _v2.annotate(tokens, kb)
fields = _v2.assemble(annotated, v2_tag, name, kb)
parsed = ParsedRelease(
raw=name,
clean=clean,
parse_path=parse_path,
**fields,
)
has_schema = _v2.has_known_schema(tokens, kb)
score = _scoring.compute_score(parsed, annotated, kb)
road = _scoring.decide_road(score, has_schema, kb)
report = ParseReport(
confidence=score,
road=road.value,
unknown_tokens=_scoring.collect_unknown_tokens(annotated),
missing_critical=_scoring.collect_missing_critical(parsed),
)
return parsed, report
def _is_well_formed(name: str, kb: ReleaseKnowledge) -> bool:
"""Return True if ``name`` contains no forbidden characters per scene
naming rules.
Characters listed as token separators (spaces, brackets, parens, …)
are NOT considered malforming — the tokenizer handles them. Only
truly broken chars like ``@``, ``#``, ``!``, ``%`` make a name
malformed.
"""
tokenizable = set(kb.separators)
return not any(c in name for c in kb.forbidden_chars if c not in tokenizable)
@@ -1,90 +0,0 @@
"""Token value objects for the annotate-based parser.
A :class:`Token` carries both the original substring and its position in
the original release name's token stream. A :class:`TokenRole` is the
semantic tag assigned by the annotator.
Why VOs instead of bare ``str``: the annotate step needs to flag tokens
without consuming them (a token may carry residual info — e.g. a
``codec-GROUP`` token contributes both a CODEC and a GROUP role). Tracking
the index also lets later stages reason about *order* (year must come
after title, group must be rightmost, etc.) without re-scanning the list.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
class TokenRole(str, Enum):
"""Semantic role a token can take after annotation.
A token starts as ``UNKNOWN`` and may be promoted by the annotator.
``str``-backed for cheap comparisons and YAML/JSON interop.
Roles split into three families:
- **structural**: TITLE / YEAR / SEASON_EPISODE / GROUP — drive folder
and filename naming.
- **technical**: RESOLUTION / SOURCE / CODEC / AUDIO_CODEC /
AUDIO_CHANNELS / BIT_DEPTH / HDR / EDITION / LANGUAGE — feed
``tech_string`` and metadata fields.
- **meta**: SITE_TAG (stripped pre-tokenize), SEPARATOR (kept for the
assemble step if a release uses spaces that need preservation in the
title), UNKNOWN (residual, contributes to the SHITTY score penalty).
"""
UNKNOWN = "unknown"
# Structural
TITLE = "title"
YEAR = "year"
SEASON_EPISODE = "season_episode"
GROUP = "group"
# Technical
RESOLUTION = "resolution"
SOURCE = "source"
CODEC = "codec"
AUDIO_CODEC = "audio_codec"
AUDIO_CHANNELS = "audio_channels"
BIT_DEPTH = "bit_depth"
HDR = "hdr"
EDITION = "edition"
LANGUAGE = "language"
DISTRIBUTOR = "distributor"
# Meta
SITE_TAG = "site_tag"
@dataclass(frozen=True)
class Token:
"""An atomic token from a release name.
``text`` is the substring exactly as it appeared after tokenization
(case preserved — uppercase comparisons happen at match time).
``index`` is the 0-based position in the tokenized stream, used by
downstream stages to enforce ordering invariants.
``role`` defaults to :attr:`TokenRole.UNKNOWN`. The annotator returns
new :class:`Token` instances with the role set rather than mutating
(the dataclass is frozen). ``extra`` carries role-specific payload
when the token text alone isn't enough (e.g. a ``codec-GROUP`` token
annotated as CODEC may record the group name in ``extra["group"]``).
"""
text: str
index: int
role: TokenRole = TokenRole.UNKNOWN
extra: dict[str, str] = field(default_factory=dict)
def with_role(self, role: TokenRole, **extra: str) -> Token:
"""Return a copy of this token with ``role`` (and optional ``extra``)."""
merged = {**self.extra, **extra} if extra else self.extra
return Token(text=self.text, index=self.index, role=role, extra=merged)
@property
def is_annotated(self) -> bool:
return self.role is not TokenRole.UNKNOWN
@@ -1,10 +0,0 @@
"""Domain ports for the release domain.
Protocol-based abstractions that decouple ``parse_release`` and
``ParsedRelease`` from any concrete knowledge-base loader. The
infrastructure layer provides the adapter that satisfies this contract.
"""
from .knowledge import ReleaseKnowledge
__all__ = ["ReleaseKnowledge"]
@@ -1,91 +0,0 @@
"""ReleaseKnowledge port — the read-only query surface that
``parse_release`` and ``ParsedRelease`` need from the release knowledge
base, expressed as a structural Protocol so the domain never imports any
concrete loader.
The concrete YAML-backed implementation lives in
``alfred/infrastructure/knowledge/release_kb.py``. Tests can supply any
object that satisfies this shape (e.g. a simple dataclass).
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Protocol
if TYPE_CHECKING:
from ..parser.schema import GroupSchema
class ReleaseKnowledge(Protocol):
"""Read-only snapshot of release-name parsing knowledge."""
# --- Token sets used by the tokenizer / matchers ---
resolutions: set[str]
sources: set[str]
codecs: set[str]
distributors: set[str]
language_tokens: set[str]
forbidden_chars: set[str]
hdr_extra: set[str]
# --- Structured knowledge (loaded from YAML as dicts) ---
audio: dict
video_meta: dict
editions: dict
media_type_tokens: dict
# --- Tokenizer separators ---
separators: list[str]
# --- Parse scoring (Phase A) ---
#
# ``scoring`` is a dict with three keys:
# - ``weights``: dict[field_name, int] field weight contribution
# - ``penalties``: {"unknown_token": int, "max_unknown_penalty": int}
# - ``thresholds``: {"shitty_min": int} SHITTY vs PATH_OF_PAIN cutoff
#
# Concrete values come from ``alfred/knowledge/release/scoring.yaml``.
# The loader fills in safe defaults so this dict is always populated.
scoring: dict
# --- ffprobe → scene-token translation tables (consumed by
# ``application.release.enrich_from_probe``). Domain parsing itself
# doesn't touch these — exposed on the same KB to keep release
# knowledge in a single ownership point.
#
# Shape:
# - ``video_codec``: dict[str, str] ffprobe lower → scene token
# - ``audio_codec``: dict[str, str] ffprobe lower → scene token
# - ``audio_channels``: dict[int, str] channel count → layout ---
probe_mappings: dict
# --- File-extension sets (used by application/infra modules that work
# directly with filesystem paths, e.g. media-type detection, video
# lookup). Domain parsing itself doesn't touch these. ---
video_extensions: set[str]
non_video_extensions: set[str]
subtitle_extensions: set[str]
metadata_extensions: set[str]
# --- Filesystem sanitization (Option B: pre-sanitize at parse time) ---
def sanitize_for_fs(self, text: str) -> str:
"""Strip filesystem-forbidden characters from ``text``."""
...
# --- Release group schemas (EASY path) ---
def group_schema(self, name: str) -> GroupSchema | None:
"""Return the parsing schema for the named release group, or
``None`` if the group is unknown (caller falls back to SHITTY).
Lookup is case-insensitive: ``"KONTRAST"``, ``"kontrast"`` and
``"Kontrast"`` all resolve to the same schema.
"""
...
@@ -1,79 +0,0 @@
"""Repository ports for the filesystem release domain.
One repository per aggregate root:
* :class:`SeriesReleaseRepository` — persists :class:`SeriesRelease`
(one per TV show).
* :class:`MovieReleaseRepository` — persists :class:`MovieRelease`
(one per movie).
Implementations live in the infrastructure layer. The
``DotAlfred*ReleaseRepository`` concrete classes write the per-show /
per-movie ``.alfred`` sidecar (the release-only sidecar — the
``.alfred.index`` library file is handled by separate
``LibraryIndex`` repositories defined alongside the TMDB-side
aggregates).
"""
from abc import ABC, abstractmethod
from ..shared_TO_CHECK.value_objects import TmdbId
from .entities import MovieRelease, SeriesRelease
class SeriesReleaseRepository(ABC):
"""
Abstract repository for :class:`SeriesRelease` aggregates.
Persistence is per-show: each call to :meth:`save` writes the full
aggregate (all seasons + all episode files + tracks) atomically.
"""
@abstractmethod
def save(self, release: SeriesRelease) -> None:
"""Persist the full SeriesRelease aggregate."""
@abstractmethod
def find_by_tmdb_id(self, tmdb_id: TmdbId) -> SeriesRelease | None:
"""Load the SeriesRelease for ``tmdb_id``, or None if absent."""
@abstractmethod
def find_all(self) -> list[SeriesRelease]:
"""Load all SeriesRelease aggregates known to the store."""
@abstractmethod
def delete(self, tmdb_id: TmdbId) -> bool:
"""Remove the aggregate. Returns True if it existed and was deleted."""
@abstractmethod
def exists(self, tmdb_id: TmdbId) -> bool:
"""True if the aggregate exists in the store."""
class MovieReleaseRepository(ABC):
"""
Abstract repository for :class:`MovieRelease` aggregates.
Mirrors :class:`SeriesReleaseRepository`; the movie aggregate is a
single file so persistence is naturally atomic per movie.
"""
@abstractmethod
def save(self, release: MovieRelease) -> None:
"""Persist the MovieRelease aggregate."""
@abstractmethod
def find_by_tmdb_id(self, tmdb_id: TmdbId) -> MovieRelease | None:
"""Load the MovieRelease for ``tmdb_id``, or None if absent."""
@abstractmethod
def find_all(self) -> list[MovieRelease]:
"""Load all MovieRelease aggregates known to the store."""
@abstractmethod
def delete(self, tmdb_id: TmdbId) -> bool:
"""Remove the aggregate. Returns True if it existed and was deleted."""
@abstractmethod
def exists(self, tmdb_id: TmdbId) -> bool:
"""True if the aggregate exists in the store."""
@@ -1,89 +0,0 @@
"""Value objects for the filesystem release domain."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from ..shared_TO_CHECK.exceptions import ValidationError
from ..tv_shows.value_objects import EpisodeNumber
class ReleaseMode(str, Enum):
"""
Filesystem layout of a season release.
Determined structurally by the walker — not a guess:
* ``PACK`` — the season folder contains N video files directly.
A single release group posted the whole season as N files in
one folder.
* ``EPISODIC`` — the season folder contains N sub-folders, each
holding one episode (and its adjacent subs / nfo / etc.).
Episodes were acquired one-by-one, possibly from different
release groups.
The mode is stored explicitly on :class:`SeasonRelease`; the
walker never has to re-derive it after the first scan.
"""
PACK = "pack"
EPISODIC = "episodic"
@dataclass(frozen=True)
class EpisodeRange:
"""
Episode coverage of one physical release file.
A single-episode file (``Show.S01E02.mkv``) is represented as
``EpisodeRange(start=E02, end=E02)``.
A multi-episode file (``Show.S01E02E03E04.mkv``) is represented as
``EpisodeRange(start=E02, end=E04)``. Ranges are inclusive on both
ends and must satisfy ``end >= start``.
The VO carries no opinion about file paths or tracks — those live on
:class:`EpisodeRelease`. ``EpisodeRange`` is purely about which TMDB
episode slots a given physical file covers.
"""
start: EpisodeNumber
end: EpisodeNumber
def __post_init__(self) -> None:
if not isinstance(self.start, EpisodeNumber):
raise ValidationError(
f"EpisodeRange.start must be EpisodeNumber, got {type(self.start)}"
)
if not isinstance(self.end, EpisodeNumber):
raise ValidationError(
f"EpisodeRange.end must be EpisodeNumber, got {type(self.end)}"
)
if self.end.value < self.start.value:
raise ValidationError(
f"EpisodeRange end ({self.end}) must be >= start ({self.start})"
)
def count(self) -> int:
"""Number of TMDB episodes covered by this range (inclusive)."""
return self.end.value - self.start.value + 1
def numbers(self) -> tuple[EpisodeNumber, ...]:
"""All :class:`EpisodeNumber` values covered, in ascending order."""
return tuple(
EpisodeNumber(n)
for n in range(self.start.value, self.end.value + 1)
)
def is_single(self) -> bool:
"""True if the range covers exactly one episode (``start == end``)."""
return self.start == self.end
def __str__(self) -> str:
if self.is_single():
return f"E{self.start.value:02d}"
return f"E{self.start.value:02d}-E{self.end.value:02d}"
def __repr__(self) -> str:
return f"EpisodeRange({self.start.value}, {self.end.value})"
@@ -1,183 +0,0 @@
"""Release domain — value objects.
This module is **pure**: no I/O, no YAML loading, no knowledge-base
imports. All knowledge that the parser consumes is injected at runtime
via the ``ReleaseKnowledge`` port (see ``ports/knowledge.py``).
``ParsedRelease`` follows Option B of the snapshot-VO design: filesystem
sanitization is performed once at parse time and stored in
``title_sanitized``. The builder methods (``show_folder_name``,
``episode_filename``, etc.) are therefore pure string-formatting and do
**not** need access to any knowledge base — but they require the caller
to pass already-sanitized TMDB strings. The use case is responsible for
calling ``kb.sanitize_for_fs(tmdb_title)`` before invoking the builders.
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from alfred.domain.shared_TO_CHECK.exceptions import ValidationError
class MediaTypeToken(str, Enum):
"""
Canonical values for ``ParsedRelease.media_type``.
Inherits from ``str`` so existing string-based comparisons (``== "movie"``,
JSON serialization, TMDB DTO interop) keep working unchanged. The enum
serves both as documentation and as the set of valid values for
``__post_init__`` validation.
"""
MOVIE = "movie"
TV_SHOW = "tv_show"
TV_COMPLETE = "tv_complete"
DOCUMENTARY = "documentary"
CONCERT = "concert"
OTHER = "other"
UNKNOWN = "unknown"
class TokenizationRoute(str, Enum):
"""How a ``ParsedRelease`` was produced.
Records the **tokenization route** — i.e. whether the release name
was tokenized as-is (``DIRECT``), after a sanitization pass like
site-tag stripping or apostrophe removal (``SANITIZED``), or whether
structural parsing failed and an LLM rebuild is needed (``AI``).
This is **orthogonal** to :class:`~alfred.domain.release.parser.scoring.Road`
(EASY / SHITTY / PATH_OF_PAIN), which captures parser confidence and
is recorded on :class:`ParseReport`. Both can vary independently —
a SANITIZED name can still land on the EASY road if a group schema
matches the tokens after stripping.
``str``-backed for the same reasons as :class:`MediaTypeToken`."""
DIRECT = "direct"
SANITIZED = "sanitized"
AI = "ai"
def _strip_episode_from_normalized(normalized: str) -> str:
"""
Remove all episode parts (Exx) from a normalized release name, keeping Sxx.
Oz.S03E01.1080p... → Oz.S03.1080p...
Archer.S14E09E10E11.1080p... → Archer.S14.1080p...
"""
tokens = normalized.split(".")
result = []
for tok in tokens:
upper = tok.upper()
# Token is SxxExx... — keep only the Sxx part
if len(upper) >= 3 and upper[0] == "S" and upper[1:3].isdigit():
result.append(tok[:3]) # "S" + two digits
else:
result.append(tok)
return ".".join(result)
@dataclass(frozen=True)
class ParseReport:
"""Diagnostic report attached to a :class:`ParsedRelease`.
``parse_release`` returns ``(ParsedRelease, ParseReport)``. The
report describes *how confident* the parser is in the result and
*which road* produced it. It is intentionally separate from
``ParsedRelease`` so the structural VO stays free of meta-concerns
about its own quality.
Fields:
- ``confidence``: integer 0100 (see :func:`parser.scoring.compute_score`).
- ``road``: ``"easy"`` / ``"shitty"`` / ``"path_of_pain"`` — distinct
from ``ParsedRelease.parse_path`` (which describes the
tokenization route, not the confidence tier).
- ``unknown_tokens``: tokens that finished annotation with role
UNKNOWN, in order of appearance.
- ``missing_critical``: names of critical structural fields the
parser couldn't fill (subset of ``{"title", "media_type", "year"}``).
"""
confidence: int
road: str # one of parser.scoring.Road values
unknown_tokens: tuple[str, ...] = ()
missing_critical: tuple[str, ...] = ()
def __post_init__(self) -> None:
if not (0 <= self.confidence <= 100):
raise ValidationError(
f"ParseReport.confidence out of range: {self.confidence}"
)
@dataclass(frozen=True)
class ParsedRelease:
"""Structured representation of a parsed release name.
``title_sanitized`` carries the filesystem-safe form of ``title`` (computed
by the parser at construction time using the injected knowledge base).
Builder methods rely on it being already-sanitized — see module docstring.
Frozen: enrichment passes (``detect_media_type``, ``enrich_from_probe``)
return a **new** ``ParsedRelease`` via ``dataclasses.replace`` rather
than mutating in place. ``languages`` is a tuple for the same reason.
"""
raw: str # original release name (untouched)
title: str # show/movie title (dots, no year/season/tech)
title_sanitized: str # title with filesystem-forbidden chars stripped
year: int | None # movie year or show start year (from TMDB)
season: int | None # season number (None for movies)
episode: int | None # first episode number (None if season-pack)
episode_end: int | None # last episode for multi-ep (None otherwise)
quality: str | None # 1080p, 2160p, …
source: str | None # WEBRip, BluRay, …
codec: str | None # x265, HEVC, …
group: str # release group, "UNKNOWN" if missing
media_type: MediaTypeToken = MediaTypeToken.UNKNOWN
site_tag: str | None = (
None # site watermark stripped from name, e.g. "TGx", "OxTorrent.vc"
)
parse_path: TokenizationRoute = TokenizationRoute.DIRECT
languages: tuple[str, ...] = () # ("MULTI", "VFF"), ("FRENCH",), …
audio_codec: str | None = None # "DTS-HD.MA", "DDP", "EAC3", …
audio_channels: str | None = None # "5.1", "7.1", "2.0", …
bit_depth: str | None = None # "10bit", "8bit", …
hdr_format: str | None = None # "DV", "HDR10", "DV.HDR10", …
edition: str | None = None # "UNRATED", "EXTENDED", "DIRECTORS.CUT", …
distributor: str | None = None # "NF", "AMZN", "DSNP", … (streaming origin)
def __post_init__(self) -> None:
if not self.raw:
raise ValidationError("ParsedRelease.raw cannot be empty")
if not self.group:
raise ValidationError("ParsedRelease.group cannot be empty")
if self.episode_end is not None:
if not (0 <= self.episode_end <= 9999):
raise ValidationError(
f"ParsedRelease.episode_end out of range: {self.episode_end}"
)
if self.episode is not None and self.episode_end < self.episode:
raise ValidationError(
f"ParsedRelease.episode_end ({self.episode_end}) < "
f"episode ({self.episode})"
)
if not isinstance(self.media_type, MediaTypeToken):
raise ValidationError(
f"ParsedRelease.media_type must be a MediaTypeToken, "
f"got {type(self.media_type).__name__}: {self.media_type!r}"
)
if not isinstance(self.parse_path, TokenizationRoute):
raise ValidationError(
f"ParsedRelease.parse_path must be a TokenizationRoute, "
f"got {type(self.parse_path).__name__}: {self.parse_path!r}"
)
@property
def is_season_pack(self) -> bool:
return self.season is not None and self.episode is None
@@ -1,39 +0,0 @@
"""FileEntry — frozen snapshot of one filesystem entry.
Produced by a ``FilesystemScanner`` adapter and consumed by the domain.
The domain never calls ``Path.iterdir``, ``Path.is_file``, ``Path.stat``
or ``open()`` directly; it reasons from these snapshots only. One scan =
one I/O round-trip; no callbacks back to disk.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class FileEntry:
"""Frozen snapshot of one filesystem entry, taken at scan time.
The entry carries enough metadata for the domain to classify and order
files without re-querying the OS. ``size`` is expressed in bytes and is
``None`` for directories and for files whose size could not be read.
"""
path: Path
is_file: bool
is_dir: bool
size: int | None
@property
def name(self) -> str:
return self.path.name
@property
def stem(self) -> str:
return self.path.stem
@property
def suffix(self) -> str:
return self.path.suffix
-192
View File
@@ -1,192 +0,0 @@
"""Media — file-level track types (video/audio/subtitle) and MediaInfo container.
These are the **container-view** dataclasses, populated from ffprobe output and
used across the project to describe the content of a media file.
Not to be confused with ``alfred.domain.subtitles.entities.SubtitleScanResult``
which models a subtitle being **scanned/matched** (with confidence, raw tokens,
file path, etc.). The two coexist by design — they describe the same real-world
concept seen from two different bounded contexts.
"""
from __future__ import annotations
from dataclasses import dataclass, field
__all__ = [
"AudioTrack",
"MediaInfo",
"SubtitleTrack",
"VideoTrack",
]
# ─────────────────────────────────────────────────────────────────────────────
# Track types — one frozen dataclass per stream kind
# ─────────────────────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class AudioTrack:
"""A single audio track as reported by ffprobe."""
index: int
codec: str | None # aac, ac3, eac3, dts, truehd, flac, …
channels: int | None # 2, 6 (5.1), 8 (7.1), …
channel_layout: str | None # stereo, 5.1, 7.1, …
language: str | None # ISO 639-2: fre, eng, und, …
is_default: bool = False
@dataclass(frozen=True)
class SubtitleTrack:
"""A single embedded subtitle track as reported by ffprobe.
ffprobe reports ``forced`` / ``default`` / ``hearing_impaired`` as
independent disposition flags — we mirror that shape directly.
``is_sdh`` flags hearing-impaired tracks (called "SDH" in the
Anglo-Saxon distribution world: subtitles for the deaf and hard of
hearing, with non-speech audio cues). v2 ``.alfred`` sidecars
persist this flag explicitly; v1's ``type: "sdh"`` string overload
is gone.
"""
index: int
codec: str | None # subrip, ass, hdmv_pgs_subtitle, …
language: str | None # ISO 639-2: fre, eng, und, …
is_default: bool = False
is_forced: bool = False
is_sdh: bool = False
@dataclass(frozen=True)
class VideoTrack:
"""A single video track as reported by ffprobe.
A media file typically has one video track but can have several (alt
camera angles, attached thumbnail images reported as still-image streams,
etc.), hence the list[VideoTrack] on MediaInfo.
"""
index: int
codec: str | None # h264, hevc, av1, …
width: int | None
height: int | None
is_default: bool = False
@property
def resolution(self) -> str | None:
"""
Best-effort resolution string: 2160p, 1080p, 720p, …
Width takes priority over height to handle widescreen/cinema crops
(e.g. 1920×960 scope → 1080p, not 720p). Falls back to height when
width is unavailable.
"""
match (self.width, self.height):
case (None, None):
return None
case (w, h) if w is not None:
match True:
case _ if w >= 3840:
return "2160p"
case _ if w >= 1920:
return "1080p"
case _ if w >= 1280:
return "720p"
case _ if w >= 720:
return "576p"
case _ if w >= 640:
return "480p"
case _:
return f"{h}p" if h else f"{w}w"
case (None, h):
match True:
case _ if h >= 2160:
return "2160p"
case _ if h >= 1080:
return "1080p"
case _ if h >= 720:
return "720p"
case _ if h >= 576:
return "576p"
case _ if h >= 480:
return "480p"
case _:
return f"{h}p"
# ─────────────────────────────────────────────────────────────────────────────
# MediaInfo — assembles video/audio/subtitle tracks for a media file
# ─────────────────────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class MediaInfo:
"""
File-level media metadata extracted by ffprobe — immutable snapshot.
Symmetric design: every stream type is a tuple of typed track objects
(immutable on purpose — a MediaInfo is a frozen view of one ffprobe run,
not a mutable collection to append to).
Backwards-compatible flat accessors (``resolution``, ``width``, …) read
from the first video track when present.
"""
video_tracks: tuple[VideoTrack, ...] = field(default_factory=tuple)
audio_tracks: tuple[AudioTrack, ...] = field(default_factory=tuple)
subtitle_tracks: tuple[SubtitleTrack, ...] = field(default_factory=tuple)
# File-level (from ffprobe ``format`` block, not from any single stream)
duration_seconds: float | None = None
bitrate_kbps: int | None = None
# ──────────────────────────────────────────────────────────────────────
# Video conveniences — read the first video track
# ──────────────────────────────────────────────────────────────────────
@property
def primary_video(self) -> VideoTrack | None:
return self.video_tracks[0] if self.video_tracks else None
@property
def width(self) -> int | None:
v = self.primary_video
return v.width if v else None
@property
def height(self) -> int | None:
v = self.primary_video
return v.height if v else None
@property
def video_codec(self) -> str | None:
v = self.primary_video
return v.codec if v else None
@property
def resolution(self) -> str | None:
v = self.primary_video
return v.resolution if v else None
# ──────────────────────────────────────────────────────────────────────
# Audio conveniences
# ──────────────────────────────────────────────────────────────────────
@property
def audio_languages(self) -> list[str]:
"""Unique audio languages across all tracks (ISO 639-2)."""
seen: set[str] = set()
result: list[str] = []
for track in self.audio_tracks:
if track.language and track.language not in seen:
seen.add(track.language)
result.append(track.language)
return result
@property
def is_multi_audio(self) -> bool:
"""True if more than one audio language is present."""
return len(self.audio_languages) > 1
@@ -1,18 +0,0 @@
"""Ports — Protocol interfaces the domain depends on.
Adapters live in ``alfred/infrastructure/`` and implement these protocols.
Domain code never imports infrastructure; it accepts a port via constructor
injection and calls it. Tests can pass in-memory fakes that satisfy the
Protocol without going through real I/O.
"""
from .filesystem_scanner import FilesystemScanner
from .language_repository import LanguageRepository
from .media_prober import MediaProber, SubtitleStreamInfo
__all__ = [
"FilesystemScanner",
"LanguageRepository",
"MediaProber",
"SubtitleStreamInfo",
]
@@ -1,33 +0,0 @@
"""FilesystemScanner port — abstracts filesystem inspection.
The domain never calls ``Path.iterdir``, ``Path.is_file``, ``Path.stat`` or
``open()`` directly. It asks the scanner for a ``FileEntry`` snapshot and
reasons from there. One scan = one I/O round-trip; no callbacks back to disk.
"""
from __future__ import annotations
from pathlib import Path
from typing import Protocol
from alfred.domain.shared_TO_CHECK.file_entry import FileEntry
class FilesystemScanner(Protocol):
"""Read-only filesystem inspection."""
def scan_dir(self, path: Path) -> list[FileEntry]:
"""Return sorted entries directly inside ``path``.
Returns an empty list when ``path`` is not a directory or is
unreadable. Adapters must not raise.
"""
...
def stat(self, path: Path) -> FileEntry | None:
"""Stat a single path; ``None`` when it doesn't exist or is unreadable."""
...
def read_text(self, path: Path, encoding: str = "utf-8") -> str | None:
"""Read a text file in one go; ``None`` on any error."""
...
@@ -1,36 +0,0 @@
"""LanguageRepository port — abstracts canonical language lookup.
The adapter (typically loading from ISO 639 YAML knowledge) maps a wide
range of raw forms (codes, English/native names, aliases) onto the
canonical :class:`Language` value object. Domain code accepts the port
via constructor injection; tests can pass a small in-memory fake.
"""
from __future__ import annotations
from typing import Protocol
from alfred.domain.shared_TO_CHECK.value_objects import Language
class LanguageRepository(Protocol):
"""Canonical language lookup."""
def from_iso(self, code: str) -> Language | None:
"""Look up by canonical ISO 639-2/B code (case-insensitive)."""
...
def from_any(self, raw: str) -> Language | None:
"""Look up by any known representation: ISO code, name, alias.
Case-insensitive. Returns ``None`` when the raw form is unknown.
"""
...
def all(self) -> list[Language]:
"""Return all known languages, in a stable order."""
...
def __contains__(self, raw: str) -> bool: ...
def __len__(self) -> int: ...
@@ -1,52 +0,0 @@
"""MediaProber port — abstracts media stream inspection (e.g. ffprobe).
The adapter (typically wrapping ffprobe) maps low-level container metadata
into the small set of stream attributes the domain reasons about. Replacing
ffprobe with another tool only requires a new adapter — domain stays put.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Protocol
if TYPE_CHECKING:
from alfred.domain.shared_TO_CHECK.media import MediaInfo
@dataclass(frozen=True)
class SubtitleStreamInfo:
"""A single embedded subtitle stream, as seen by the prober.
``language`` is the raw language tag emitted by the container (typically
ISO 639-2 like ``"fre"``, ``"eng"``); may be empty/None when the stream
has no language tag. The domain resolves it to a canonical ``Language``
via the knowledge base.
"""
language: str | None
is_hearing_impaired: bool
is_forced: bool
class MediaProber(Protocol):
"""Inspect a media file's stream metadata."""
def list_subtitle_streams(self, video: Path) -> list[SubtitleStreamInfo]:
"""Return all subtitle streams in ``video``.
Returns an empty list when the file is missing, unreadable, or has
no subtitle streams. Adapters must not raise.
"""
...
def probe(self, video: Path) -> MediaInfo | None:
"""Return the full :class:`MediaInfo` for ``video``, or ``None``.
Covers all stream families (video, audio, subtitle) plus
file-level duration / bitrate. ``None`` signals that ffprobe is
unavailable or the file can't be read — adapters must not
raise.
"""
...

Some files were not shown because too many files have changed in this diff Show More