feat: split resolve_destination, persona-driven prompts, qBittorrent relocation

Destination resolution
- Replace the single ResolveDestinationUseCase with four dedicated
  functions, one per release type:
    resolve_season_destination    (pack season, folder move)
    resolve_episode_destination   (single episode, file move)
    resolve_movie_destination     (movie, file move)
    resolve_series_destination    (multi-season pack, folder move)
- Each returns a dedicated DTO carrying only the fields relevant to
  that release type — no more polymorphic ResolvedDestination with
  half the fields unused depending on the case.
- Looser series folder matching: exact computed-name match is reused
  silently; any deviation (different group, multiple candidates) now
  prompts the user with all options including the computed name.

Agent tools
- Four new tools wrapping the use cases above; old resolve_destination
  removed from the registry.
- New move_to_destination tool: create_folder + move, chained — used
  after a resolve_* call to perform the actual relocation.
- Low-level filesystem_operations module (create_folder, move via mv)
  for instant same-FS renames (ZFS).

Prompt & persona
- New PromptBuilder (alfred/agent/prompt.py) replacing prompts.py:
  identity + personality block, situational expressions, memory
  schema, episodic/STM/config context, tool catalogue.
- Per-user expression system: knowledge/users/common.yaml +
  {username}.yaml are merged at runtime; one phrase per situation
  (greeting/success/error/...) is sampled into the system prompt.

qBittorrent integration
- Credentials now come from settings (qbittorrent_url/username/password)
  instead of hardcoded defaults.
- New client methods: find_by_name, set_location, recheck — the trio
  needed to update a torrent's save path and re-verify after a move.
- Host→container path translation settings (qbittorrent_host_path /
  qbittorrent_container_path) for docker-mounted setups.

Subtitles
- Identifier: strip parenthesized qualifiers (simplified, brazil…) at
  tokenization; new _tokenize_suffix used for the episode_subfolder
  pattern so episode-stem tokens no longer pollute language detection.
- Placer: extract _build_dest_name so it can be reused by the new
  dry_run path in ManageSubtitlesUseCase.
- Knowledge: add yue, ell, ind, msa, rus, vie, heb, tam, tel, tha,
  hin, ukr; add 'fre' to fra; add 'simplified'/'traditional' to zho.

Misc
- LTM workspace: add 'trash' folder slot.
- Default LLM provider switched to deepseek.
- testing/debug_release.py: CLI to parse a release, hit TMDB, and
  dry-run the destination resolution end-to-end.
This commit is contained in:
2026-05-14 05:01:59 +02:00
parent 1723b9fa53
commit e45465d52d
81 changed files with 2904 additions and 896 deletions
+18 -3
View File
@@ -12,7 +12,16 @@ from .dto import (
from .list_folder import ListFolderUseCase
from .manage_subtitles import ManageSubtitlesUseCase
from .move_media import MoveMediaUseCase
from .resolve_destination import ResolveDestinationUseCase, ResolvedDestination
from .resolve_destination import (
ResolvedEpisodeDestination,
ResolvedMovieDestination,
ResolvedSeasonDestination,
ResolvedSeriesDestination,
resolve_episode_destination,
resolve_movie_destination,
resolve_season_destination,
resolve_series_destination,
)
from .set_folder_path import SetFolderPathUseCase
__all__ = [
@@ -21,8 +30,14 @@ __all__ = [
"CreateSeedLinksUseCase",
"MoveMediaUseCase",
"ManageSubtitlesUseCase",
"ResolveDestinationUseCase",
"ResolvedDestination",
"ResolvedSeasonDestination",
"ResolvedEpisodeDestination",
"ResolvedMovieDestination",
"ResolvedSeriesDestination",
"resolve_season_destination",
"resolve_episode_destination",
"resolve_movie_destination",
"resolve_series_destination",
"SetFolderPathResponse",
"ListFolderResponse",
"CreateSeedLinksResponse",
@@ -20,10 +20,10 @@ from __future__ import annotations
from pathlib import Path
from alfred.domain.release.value_objects import (
ParsedRelease,
_METADATA_EXTENSIONS,
_NON_VIDEO_EXTENSIONS,
_VIDEO_EXTENSIONS,
ParsedRelease,
)
+11 -7
View File
@@ -2,7 +2,7 @@
from __future__ import annotations
from dataclasses import dataclass, field
from dataclasses import dataclass
@dataclass
@@ -88,7 +88,11 @@ class PlacedSubtitle:
filename: str
def to_dict(self) -> dict:
return {"source": self.source, "destination": self.destination, "filename": self.filename}
return {
"source": self.source,
"destination": self.destination,
"filename": self.filename,
}
@dataclass
@@ -98,7 +102,7 @@ class UnresolvedTrack:
raw_tokens: list[str]
file_path: str | None = None
file_size_kb: float | None = None
reason: str = "" # "unknown_language" | "low_confidence"
reason: str = "" # "unknown_language" | "low_confidence"
def to_dict(self) -> dict:
return {
@@ -113,8 +117,8 @@ class UnresolvedTrack:
class AvailableSubtitle:
"""One subtitle track available on an embedded media item."""
language: str # ISO 639-2 code
subtitle_type: str # "standard" | "sdh" | "forced" | "unknown"
language: str # ISO 639-2 code
subtitle_type: str # "standard" | "sdh" | "forced" | "unknown"
def to_dict(self) -> dict:
return {"language": self.language, "type": self.subtitle_type}
@@ -124,12 +128,12 @@ class AvailableSubtitle:
class ManageSubtitlesResponse:
"""Response from the manage_subtitles use case."""
status: str # "ok" | "needs_clarification" | "error"
status: str # "ok" | "needs_clarification" | "error"
video_path: str | None = None
placed: list[PlacedSubtitle] | None = None
skipped_count: int = 0
unresolved: list[UnresolvedTrack] | None = None
available: list[AvailableSubtitle] | None = None # embedded tracks summary
available: list[AvailableSubtitle] | None = None # embedded tracks summary
error: str | None = None
message: str | None = None
@@ -10,21 +10,21 @@ _VIDEO_CODEC_MAP = {
"hevc": "x265",
"h264": "x264",
"h265": "x265",
"av1": "AV1",
"vp9": "VP9",
"av1": "AV1",
"vp9": "VP9",
"mpeg4": "XviD",
}
# Map ffprobe audio codec names to scene-style tokens
_AUDIO_CODEC_MAP = {
"eac3": "EAC3",
"ac3": "AC3",
"dts": "DTS",
"truehd": "TrueHD",
"aac": "AAC",
"flac": "FLAC",
"opus": "OPUS",
"mp3": "MP3",
"eac3": "EAC3",
"ac3": "AC3",
"dts": "DTS",
"truehd": "TrueHD",
"aac": "AAC",
"flac": "FLAC",
"opus": "OPUS",
"mp3": "MP3",
"pcm_s16l": "PCM",
"pcm_s24l": "PCM",
}
@@ -50,7 +50,9 @@ def enrich_from_probe(parsed: ParsedRelease, info: MediaInfo) -> None:
parsed.quality = info.resolution
if parsed.codec is None and info.video_codec:
parsed.codec = _VIDEO_CODEC_MAP.get(info.video_codec.lower(), info.video_codec.upper())
parsed.codec = _VIDEO_CODEC_MAP.get(
info.video_codec.lower(), info.video_codec.upper()
)
if parsed.bit_depth is None and info.video_codec:
# ffprobe exposes bit depth via pix_fmt — not in MediaInfo yet, skip for now
@@ -62,10 +64,14 @@ def enrich_from_probe(parsed: ParsedRelease, info: MediaInfo) -> None:
if track:
if parsed.audio_codec is None and track.codec:
parsed.audio_codec = _AUDIO_CODEC_MAP.get(track.codec.lower(), track.codec.upper())
parsed.audio_codec = _AUDIO_CODEC_MAP.get(
track.codec.lower(), track.codec.upper()
)
if parsed.audio_channels is None and track.channels:
parsed.audio_channels = _CHANNEL_MAP.get(track.channels, f"{track.channels}ch")
parsed.audio_channels = _CHANNEL_MAP.get(
track.channels, f"{track.channels}ch"
)
# Languages — merge ffprobe languages with token-level ones
# "und" = undetermined, not useful
@@ -17,7 +17,12 @@ from alfred.infrastructure.persistence.context import get_memory
from alfred.infrastructure.subtitle.metadata_store import SubtitleMetadataStore
from alfred.infrastructure.subtitle.rule_repository import RuleSetRepository
from .dto import AvailableSubtitle, ManageSubtitlesResponse, PlacedSubtitle, UnresolvedTrack
from .dto import (
AvailableSubtitle,
ManageSubtitlesResponse,
PlacedSubtitle,
UnresolvedTrack,
)
logger = logging.getLogger(__name__)
@@ -69,11 +74,12 @@ class ManageSubtitlesUseCase:
season: int | None = None,
episode: int | None = None,
confirmed_pattern_id: str | None = None,
dry_run: bool = False,
) -> ManageSubtitlesResponse:
source_path = Path(source_video)
dest_path = Path(destination_video)
if not source_path.exists():
if not source_path.exists() and not source_path.parent.exists():
return ManageSubtitlesResponse(
status="error",
error="source_not_found",
@@ -108,7 +114,9 @@ class ManageSubtitlesUseCase:
)
if metadata.total_count == 0:
logger.info(f"ManageSubtitles: no subtitle tracks found for {source_path.name}")
logger.info(
f"ManageSubtitles: no subtitle tracks found for {source_path.name}"
)
return ManageSubtitlesResponse(
status="ok",
video_path=destination_video,
@@ -164,6 +172,32 @@ class ManageSubtitlesUseCase:
skipped_count=metadata.total_count,
)
# --- Dry run: skip placement ---
if dry_run:
from alfred.domain.subtitles.services.placer import _build_dest_name
placed_dtos = []
for t in matched:
if not t.file_path:
continue
try:
filename = _build_dest_name(t, dest_path.stem)
except ValueError:
continue
placed_dtos.append(
PlacedSubtitle(
source=str(t.file_path),
destination=str(dest_path.parent / filename),
filename=filename,
)
)
return ManageSubtitlesResponse(
status="ok",
video_path=destination_video,
placed=placed_dtos,
skipped_count=0,
)
# --- Place ---
placer = SubtitlePlacer()
place_result = placer.place(matched, dest_path)
@@ -229,7 +263,9 @@ class ManageSubtitlesUseCase:
return kb.pattern("adjacent")
def _to_unresolved_dto(track: SubtitleTrack, min_confidence: float = 0.7) -> UnresolvedTrack:
def _to_unresolved_dto(
track: SubtitleTrack, min_confidence: float = 0.7
) -> UnresolvedTrack:
reason = "unknown_language" if track.language is None else "low_confidence"
return UnresolvedTrack(
raw_tokens=track.raw_tokens,
@@ -252,7 +288,7 @@ def _pair_placed_with_tracks(
for p in placed:
track = track_by_path.get(p.source)
if track is None and tracks:
track = tracks[0] # positional fallback
track = tracks[0] # positional fallback
if track:
pairs.append((p, track))
return pairs
@@ -1,58 +1,82 @@
"""
ResolveDestinationUseCase — compute the library destination path for a release.
Destination resolution — compute library paths for releases.
Steps:
1. Parse the release name
2. Look up TMDB for title + year (+ episode title if single episode)
3. Scan the library for an existing series folder
4. Apply group-conflict rules
5. Return the computed paths (or needs_clarification if ambiguous)
Four distinct use cases, one per release type:
- resolve_season_destination : season pack (folder move)
- resolve_episode_destination : single episode (file move)
- resolve_movie_destination : movie (file move)
- resolve_series_destination : complete series multi-season pack (folder move)
Each returns a dedicated DTO with only the fields that make sense for that type.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from dataclasses import dataclass
from pathlib import Path
from alfred.domain.release import ParsedRelease, parse_release
from alfred.domain.release import parse_release
from alfred.infrastructure.persistence import get_memory
logger = logging.getLogger(__name__)
# Characters forbidden on Windows filesystems (served via NFS)
_WIN_FORBIDDEN = re.compile(r'[?:*"<>|\\]')
def _sanitise(text: str) -> str:
def _sanitize(text: str) -> str:
return _WIN_FORBIDDEN.sub("", text)
def _find_existing_tvshow_folders(
tv_root: Path, tmdb_title: str, tmdb_year: int
) -> list[str]:
"""Return folder names in tv_root that match title + year prefix."""
if not tv_root.exists():
return []
clean_title = _sanitize(tmdb_title).replace(" ", ".")
prefix = f"{clean_title}.{tmdb_year}".lower()
return sorted(
entry.name
for entry in tv_root.iterdir()
if entry.is_dir() and entry.name.lower().startswith(prefix)
)
def _get_tv_root() -> Path | None:
memory = get_memory()
tv_root = memory.ltm.library_paths.get("tv_show")
return Path(tv_root) if tv_root else None
# ---------------------------------------------------------------------------
# DTOs
# ---------------------------------------------------------------------------
@dataclass
class ResolvedDestination:
"""All computed paths for a release, ready to hand to move_media."""
class ResolvedSeasonDestination:
"""Paths for a season pack — folder move, no individual file paths."""
status: str # "ok" | "needs_clarification" | "error"
status: str # "ok" | "needs_clarification" | "error"
# Populated on "ok"
library_file: str | None = None # absolute path of the destination video file
series_folder: str | None = None # absolute path of the series root folder
season_folder: str | None = None # absolute path of the season subfolder
series_folder_name: str | None = None # just the folder name (for display)
# ok
series_folder: str | None = (
None # /tv_shows/A.Knight.of.the.Seven.Kingdoms.2024.1080p.WEBRip.x265-KONTRAST
)
season_folder: str | None = (
None # .../A.Knight.of.the.Seven.Kingdoms.S01.1080p.WEBRip.x265-KONTRAST
)
series_folder_name: str | None = None
season_folder_name: str | None = None
filename: str | None = None
is_new_series_folder: bool = False # True if we're creating the folder
is_new_series_folder: bool = False
# Populated on "needs_clarification"
# needs_clarification
question: str | None = None
options: list[str] | None = None # existing group folder names to pick from
options: list[str] | None = None
# Populated on "error"
# error
error: str | None = None
message: str | None = None
@@ -67,189 +91,366 @@ class ResolvedDestination:
}
return {
"status": self.status,
"library_file": self.library_file,
"series_folder": self.series_folder,
"season_folder": self.season_folder,
"series_folder_name": self.series_folder_name,
"season_folder_name": self.season_folder_name,
"is_new_series_folder": self.is_new_series_folder,
}
@dataclass
class ResolvedEpisodeDestination:
"""Paths for a single episode — file move."""
status: str
# ok
series_folder: str | None = None
season_folder: str | None = None
library_file: str | None = None # full path to destination .mkv
series_folder_name: str | None = None
season_folder_name: str | None = None
filename: str | None = None
is_new_series_folder: bool = False
# needs_clarification
question: str | None = None
options: list[str] | None = None
# error
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.status == "error":
return {"status": self.status, "error": self.error, "message": self.message}
if self.status == "needs_clarification":
return {
"status": self.status,
"question": self.question,
"options": self.options or [],
}
return {
"status": self.status,
"series_folder": self.series_folder,
"season_folder": self.season_folder,
"library_file": self.library_file,
"series_folder_name": self.series_folder_name,
"season_folder_name": self.season_folder_name,
"filename": self.filename,
"is_new_series_folder": self.is_new_series_folder,
}
@dataclass
class ResolvedMovieDestination:
"""Paths for a movie — file move."""
status: str
# ok
movie_folder: str | None = None
library_file: str | None = None
movie_folder_name: str | None = None
filename: str | None = None
is_new_folder: bool = False
# needs_clarification
question: str | None = None
options: list[str] | None = None
# error
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.status == "error":
return {"status": self.status, "error": self.error, "message": self.message}
if self.status == "needs_clarification":
return {
"status": self.status,
"question": self.question,
"options": self.options or [],
}
return {
"status": self.status,
"movie_folder": self.movie_folder,
"library_file": self.library_file,
"movie_folder_name": self.movie_folder_name,
"filename": self.filename,
"is_new_folder": self.is_new_folder,
}
@dataclass
class ResolvedSeriesDestination:
"""Paths for a complete multi-season series pack — folder move."""
status: str
# ok
series_folder: str | None = None
series_folder_name: str | None = None
is_new_series_folder: bool = False
# needs_clarification
question: str | None = None
options: list[str] | None = None
# error
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.status == "error":
return {"status": self.status, "error": self.error, "message": self.message}
if self.status == "needs_clarification":
return {
"status": self.status,
"question": self.question,
"options": self.options or [],
}
return {
"status": self.status,
"series_folder": self.series_folder,
"series_folder_name": self.series_folder_name,
"is_new_series_folder": self.is_new_series_folder,
}
# ---------------------------------------------------------------------------
# Use case
# Use cases
# ---------------------------------------------------------------------------
class ResolveDestinationUseCase:
def resolve_season_destination(
release_name: str,
tmdb_title: str,
tmdb_year: int,
confirmed_folder: str | None = None,
) -> ResolvedSeasonDestination:
"""
Compute the full destination path for a media file being organised.
Compute destination paths for a season pack.
The caller provides:
- release_name: the raw release folder/file name
- source_file: path to the actual video file (to get extension)
- tmdb_title: canonical title from TMDB
- tmdb_year: release year from TMDB
- tmdb_episode_title: episode title from TMDB (None for movies / season packs)
- confirmed_folder: if the user already answered needs_clarification, pass
the chosen folder name here to skip the check
Returns a ResolvedDestination.
Returns series_folder + season_folder. No file paths — the whole
source folder is moved as-is into season_folder.
"""
def execute(
self,
release_name: str,
source_file: str,
tmdb_title: str,
tmdb_year: int,
tmdb_episode_title: str | None = None,
confirmed_folder: str | None = None,
) -> ResolvedDestination:
parsed = parse_release(release_name)
ext = Path(source_file).suffix # ".mkv"
if parsed.media_type == "movie":
return self._resolve_movie(parsed, tmdb_title, tmdb_year, ext)
if parsed.media_type == "tv_show":
return self._resolve_tvshow(
parsed, tmdb_title, tmdb_year, tmdb_episode_title, ext, confirmed_folder
)
return ResolvedDestination(
tv_root = _get_tv_root()
if not tv_root:
return ResolvedSeasonDestination(
status="error",
error="unsupported_media_type",
message=(
f"Cannot organize '{release_name}': detected as '{parsed.media_type}'. "
"Only movies and TV shows are supported."
),
error="library_not_set",
message="TV show library path is not configured.",
)
# ------------------------------------------------------------------
# Movie
# ------------------------------------------------------------------
parsed = parse_release(release_name)
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year))
def _resolve_movie(
self, parsed: ParsedRelease, tmdb_title: str, tmdb_year: int, ext: str
) -> ResolvedDestination:
memory = get_memory()
movies_root = memory.ltm.library_paths.get("movie")
if not movies_root:
return ResolvedDestination(
status="error",
error="library_not_set",
message="Movie library path is not configured.",
)
if confirmed_folder:
series_folder_name = confirmed_folder
is_new = not (tv_root / confirmed_folder).exists()
else:
existing = _find_existing_tvshow_folders(tv_root, tmdb_title, tmdb_year)
folder_name = _sanitise(parsed.movie_folder_name(tmdb_title, tmdb_year))
filename = _sanitise(parsed.movie_filename(tmdb_title, tmdb_year, ext))
folder_path = Path(movies_root) / folder_name
file_path = folder_path / filename
return ResolvedDestination(
status="ok",
library_file=str(file_path),
series_folder=str(folder_path),
series_folder_name=folder_name,
filename=filename,
is_new_series_folder=not folder_path.exists(),
)
# ------------------------------------------------------------------
# TV show
# ------------------------------------------------------------------
def _resolve_tvshow(
self,
parsed: ParsedRelease,
tmdb_title: str,
tmdb_year: int,
tmdb_episode_title: str | None,
ext: str,
confirmed_folder: str | None,
) -> ResolvedDestination:
memory = get_memory()
tv_root = memory.ltm.library_paths.get("tv_show")
if not tv_root:
return ResolvedDestination(
status="error",
error="library_not_set",
message="TV show library path is not configured.",
)
tv_root_path = Path(tv_root)
# --- Find existing series folders for this title ---
existing = _find_existing_series_folders(tv_root_path, tmdb_title, tmdb_year)
# --- Determine series folder name ---
if confirmed_folder:
series_folder_name = confirmed_folder
is_new = not (tv_root_path / confirmed_folder).exists()
elif len(existing) == 0:
# No existing folder — create with release group
series_folder_name = _sanitise(parsed.show_folder_name(tmdb_title, tmdb_year))
if len(existing) == 0:
series_folder_name = computed_name
is_new = True
elif len(existing) == 1:
# Exactly one match — use it regardless of group
elif len(existing) == 1 and existing[0] == computed_name:
# Exact match — use it
series_folder_name = existing[0]
is_new = False
else:
# Multiple folders — ask user
return ResolvedDestination(
# One folder with a different name, or multiple — ask user
options = existing + (
[computed_name] if computed_name not in existing else []
)
return ResolvedSeasonDestination(
status="needs_clarification",
question=(
f"Multiple folders found for '{tmdb_title}' in your library. "
f"Which one should I use for this release ({parsed.group})?"
f"Un dossier série existe déjà pour '{tmdb_title}' "
f"mais son nom diffère du nom calculé ({computed_name}). "
f"Lequel utiliser ?"
),
options=existing,
options=options,
)
# --- Build paths ---
season_folder_name = parsed.season_folder_name()
filename = _sanitise(
parsed.episode_filename(tmdb_episode_title, ext)
if not parsed.is_season_pack
else parsed.season_folder_name() + ext
season_folder_name = parsed.season_folder_name()
series_path = tv_root / series_folder_name
season_path = series_path / season_folder_name
return ResolvedSeasonDestination(
status="ok",
series_folder=str(series_path),
season_folder=str(season_path),
series_folder_name=series_folder_name,
season_folder_name=season_folder_name,
is_new_series_folder=is_new,
)
def resolve_episode_destination(
release_name: str,
source_file: str,
tmdb_title: str,
tmdb_year: int,
tmdb_episode_title: str | None = None,
confirmed_folder: str | None = None,
) -> ResolvedEpisodeDestination:
"""
Compute destination paths for a single episode file.
Returns series_folder + season_folder + library_file (full path to .mkv).
"""
tv_root = _get_tv_root()
if not tv_root:
return ResolvedEpisodeDestination(
status="error",
error="library_not_set",
message="TV show library path is not configured.",
)
series_path = tv_root_path / series_folder_name
season_path = series_path / season_folder_name
file_path = season_path / filename
parsed = parse_release(release_name)
ext = Path(source_file).suffix
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year))
return ResolvedDestination(
status="ok",
library_file=str(file_path),
series_folder=str(series_path),
season_folder=str(season_path),
series_folder_name=series_folder_name,
season_folder_name=season_folder_name,
filename=filename,
is_new_series_folder=is_new,
if confirmed_folder:
series_folder_name = confirmed_folder
is_new = not (tv_root / confirmed_folder).exists()
else:
existing = _find_existing_tvshow_folders(tv_root, tmdb_title, tmdb_year)
if len(existing) == 0:
series_folder_name = computed_name
is_new = True
elif len(existing) == 1 and existing[0] == computed_name:
series_folder_name = existing[0]
is_new = False
else:
options = existing + (
[computed_name] if computed_name not in existing else []
)
return ResolvedEpisodeDestination(
status="needs_clarification",
question=(
f"Un dossier série existe déjà pour '{tmdb_title}' "
f"mais son nom diffère du nom calculé ({computed_name}). "
f"Lequel utiliser ?"
),
options=options,
)
season_folder_name = parsed.season_folder_name()
filename = _sanitize(parsed.episode_filename(tmdb_episode_title, ext))
series_path = tv_root / series_folder_name
season_path = series_path / season_folder_name
file_path = season_path / filename
return ResolvedEpisodeDestination(
status="ok",
series_folder=str(series_path),
season_folder=str(season_path),
library_file=str(file_path),
series_folder_name=series_folder_name,
season_folder_name=season_folder_name,
filename=filename,
is_new_series_folder=is_new,
)
def resolve_movie_destination(
release_name: str,
source_file: str,
tmdb_title: str,
tmdb_year: int,
) -> ResolvedMovieDestination:
"""
Compute destination paths for a movie file.
Returns movie_folder + library_file (full path to .mkv).
"""
memory = get_memory()
movies_root = memory.ltm.library_paths.get("movie")
if not movies_root:
return ResolvedMovieDestination(
status="error",
error="library_not_set",
message="Movie library path is not configured.",
)
parsed = parse_release(release_name)
ext = Path(source_file).suffix
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
folder_name = _sanitize(parsed.movie_folder_name(tmdb_title, tmdb_year))
filename = _sanitize(parsed.movie_filename(tmdb_title, tmdb_year, ext))
def _find_existing_series_folders(tv_root: Path, tmdb_title: str, tmdb_year: int) -> list[str]:
folder_path = Path(movies_root) / folder_name
file_path = folder_path / filename
return ResolvedMovieDestination(
status="ok",
movie_folder=str(folder_path),
library_file=str(file_path),
movie_folder_name=folder_name,
filename=filename,
is_new_folder=not folder_path.exists(),
)
def resolve_series_destination(
release_name: str,
tmdb_title: str,
tmdb_year: int,
confirmed_folder: str | None = None,
) -> ResolvedSeriesDestination:
"""
Return names of folders in tv_root that match the given title + year.
Compute destination path for a complete multi-season series pack.
Matching is loose: normalised title (dots, no special chars) + year must
appear at the start of the folder name.
Returns only series_folder — the whole pack lands directly inside it.
"""
if not tv_root.exists():
return []
tv_root = _get_tv_root()
if not tv_root:
return ResolvedSeriesDestination(
status="error",
error="library_not_set",
message="TV show library path is not configured.",
)
# Build a normalised prefix to match against: "Oz.1997"
clean_title = _sanitise(tmdb_title).replace(" ", ".")
prefix = f"{clean_title}.{tmdb_year}".lower()
parsed = parse_release(release_name)
computed_name = _sanitize(parsed.show_folder_name(tmdb_title, tmdb_year))
matches = []
for entry in tv_root.iterdir():
if entry.is_dir() and entry.name.lower().startswith(prefix):
matches.append(entry.name)
if confirmed_folder:
series_folder_name = confirmed_folder
is_new = not (tv_root / confirmed_folder).exists()
else:
existing = _find_existing_tvshow_folders(tv_root, tmdb_title, tmdb_year)
return sorted(matches)
if len(existing) == 0:
series_folder_name = computed_name
is_new = True
elif len(existing) == 1 and existing[0] == computed_name:
series_folder_name = existing[0]
is_new = False
else:
options = existing + (
[computed_name] if computed_name not in existing else []
)
return ResolvedSeriesDestination(
status="needs_clarification",
question=(
f"Un dossier série existe déjà pour '{tmdb_title}' "
f"mais son nom diffère du nom calculé ({computed_name}). "
f"Lequel utiliser ?"
),
options=options,
)
series_path = tv_root / series_folder_name
return ResolvedSeriesDestination(
status="ok",
series_folder=str(series_path),
series_folder_name=series_folder_name,
is_new_series_folder=is_new,
)