refactor(filesystem): split into 5 atomic free-function ops + use cases

Replaces the monolithic FileManager class + scattered helpers in
alfred/infrastructure/filesystem with five free functions, each
single-responsibility and pathlib-native:

  list_dir / create_dir / link_file / move_file / move_dir

The infra layer now raises typed exceptions (FilesystemError base
+ SourceNotFound / DestinationExists / NotADirectory / NotAFile /
PermissionDenied / CrossDevice / FilesystemOSError) instead of
returning {status: ok|error} dicts. No more get_memory() reads
from infra.

Application layer mirrors the same split: five free use cases
(<op>_use_case) wrap each infra op, guard inputs against escaping
the new DirectoryRoots VO (downloads / torrents / movies /
tv_shows), catch infra exceptions, and return frozen DTOs. Roots
are injected — no global state.

Legacy files kept on disk with _OLD suffix for reference during
the follow-up rewiring (FileManager, MediaOrganizer,
create_folder/move helpers; CreateSeedLinks/ListFolder/MoveMedia/
ManageSubtitles use cases, resolve_destination). They are no
longer exported from __init__, which intentionally breaks current
agent tool wrappers and downstream tests — re-wiring is the next
chunk of work on the unfuck branch.
This commit is contained in:
2026-05-26 19:22:09 +02:00
parent 28304bb162
commit 2df7843d8b
26 changed files with 951 additions and 199 deletions
+32
View File
@@ -15,6 +15,38 @@ callers).
## [Unreleased]
### Changed
- **`filesystem` infra + application rewritten as 5 atomic free
functions.** On branch `unfuck`. Replaces the monolithic
`FileManager` class + scattered helpers with five small, pure ops in
`alfred/infrastructure/filesystem/`: `list_dir`, `create_dir`,
`link_file`, `move_file`, `move_dir`. Each takes `pathlib.Path`
arguments and raises typed exceptions from a dedicated hierarchy
(`FilesystemError``SourceNotFound` / `DestinationExists` /
`NotADirectory` / `NotAFile` / `PermissionDenied` / `CrossDevice` /
`FilesystemOSError`) — no more `{"status": "ok" | "error"}` dicts at
the infra boundary, no more `get_memory()` reads.
- **`filesystem` application: 5 use cases as free functions.** A
matching `<op>_use_case(path, …, roots: DirectoryRoots)` wraps each
infra op, guards inputs against escaping a new `DirectoryRoots` VO
(downloads / torrents / movies / tv_shows), catches infra exceptions,
and returns a frozen `<Op>Response` DTO. Roots are now injected, not
pulled from the global memory singleton.
### Removed
- `FileManager` / `MediaOrganizer` / `create_folder` / `move` from the
public API of `alfred.infrastructure.filesystem`. Their files remain
on disk renamed with an `_OLD` suffix (e.g. `file_manager_OLD.py`) so
the migration can finish on a follow-up commit without losing
reference material. They are no longer re-exported from `__init__`.
- `CreateSeedLinksUseCase` / `ListFolderUseCase` / `MoveMediaUseCase` /
`ManageSubtitlesUseCase` / `resolve_destination` from the public API
of `alfred.application.filesystem`. Same `_OLD` rename treatment.
This intentionally breaks current tool wrappers and tests downstream
— re-wiring is the next chunk of work on this branch.
### Added
- **`.alfred` v2 — Phase 4: v2-shaped `rescan_show` + new
+36 -37
View File
@@ -1,43 +1,42 @@
"""Filesystem use cases."""
"""Filesystem application layer — 5 atomic use cases as free functions.
from .create_seed_links import CreateSeedLinksUseCase
Each use case:
- accepts :class:`pathlib.Path` inputs plus a :class:`DirectoryRoots` VO,
- guards inputs against escaping configured roots,
- calls the matching infra op,
- catches :class:`~alfred.infrastructure.filesystem.FilesystemError` and
returns a frozen DTO with a normalized error code.
No global state, no ``get_memory()``. Roots are injected.
"""
from .create_dir import create_dir_use_case
from .directory_roots import DirectoryRoots
from .dto import (
CreateSeedLinksResponse,
ListFolderResponse,
ManageSubtitlesResponse,
MoveMediaResponse,
PlacedSubtitle,
)
from .list_folder import ListFolderUseCase
from .manage_subtitles import ManageSubtitlesUseCase
from .move_media import MoveMediaUseCase
from .resolve_destination import (
ResolvedEpisodeDestination,
ResolvedMovieDestination,
ResolvedSeasonDestination,
ResolvedSeriesDestination,
resolve_episode_destination,
resolve_movie_destination,
resolve_season_destination,
resolve_series_destination,
CreateDirResponse,
LinkFileResponse,
ListDirResponse,
MoveDirResponse,
MoveFileResponse,
)
from .link_file import link_file_use_case
from .list_dir import list_dir_use_case
from .move_dir import move_dir_use_case
from .move_file import move_file_use_case
__all__ = [
"ListFolderUseCase",
"CreateSeedLinksUseCase",
"MoveMediaUseCase",
"ManageSubtitlesUseCase",
"ResolvedSeasonDestination",
"ResolvedEpisodeDestination",
"ResolvedMovieDestination",
"ResolvedSeriesDestination",
"resolve_season_destination",
"resolve_episode_destination",
"resolve_movie_destination",
"resolve_series_destination",
"ListFolderResponse",
"CreateSeedLinksResponse",
"MoveMediaResponse",
"ManageSubtitlesResponse",
"PlacedSubtitle",
# use cases
"list_dir_use_case",
"create_dir_use_case",
"link_file_use_case",
"move_file_use_case",
"move_dir_use_case",
# VO
"DirectoryRoots",
# DTOs
"ListDirResponse",
"CreateDirResponse",
"LinkFileResponse",
"MoveFileResponse",
"MoveDirResponse",
]
+41
View File
@@ -0,0 +1,41 @@
"""Internal helpers: mapping infra exceptions → error codes.
Kept private (``_errors``) — only the 5 use cases in this package use
it. Centralizes the exception → code translation so every use case
returns consistent error payloads.
"""
from __future__ import annotations
from alfred.infrastructure.filesystem import (
CrossDevice,
DestinationExists,
FilesystemError,
FilesystemOSError,
NotADirectory,
NotAFile,
PermissionDenied,
SourceNotFound,
)
# Application-layer error codes (guard violations, not infra).
PATH_NOT_ALLOWED = "path_not_allowed"
def code_for(exc: FilesystemError) -> str:
"""Return the snake-case error code for an infra exception."""
if isinstance(exc, SourceNotFound):
return "source_not_found"
if isinstance(exc, DestinationExists):
return "destination_exists"
if isinstance(exc, NotADirectory):
return "not_a_directory"
if isinstance(exc, NotAFile):
return "not_a_file"
if isinstance(exc, PermissionDenied):
return "permission_denied"
if isinstance(exc, CrossDevice):
return "cross_device"
if isinstance(exc, FilesystemOSError):
return "filesystem_os_error"
return "filesystem_error"
@@ -0,0 +1,33 @@
"""create_dir use case — create a directory under one of the configured roots."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, create_dir
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import CreateDirResponse
def create_dir_use_case(path: Path, roots: DirectoryRoots) -> CreateDirResponse:
"""Create directory ``path`` (and any missing parents) provided it
lives under one of the configured roots.
Idempotent on the infra side: re-running on an existing directory
returns ``status="ok"``.
"""
if not roots.contains(path):
return CreateDirResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Path is outside configured roots: {path}",
)
try:
create_dir(path)
except FilesystemError as e:
return CreateDirResponse(status="error", error=code_for(e), message=str(e))
return CreateDirResponse(status="ok", path=path)
@@ -0,0 +1,56 @@
"""DirectoryRoots — VO carrying the configured filesystem roots.
Replaces the ad-hoc ``get_memory().ltm.workspace.<x>`` lookups that were
sprinkled across the filesystem use cases. By making roots an explicit
input, use cases become pure (no global state read) and easy to test.
The roots are read once at the tool wrapper boundary (where the agent
config lives) and threaded through the use cases.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class DirectoryRoots:
"""Configured roots of Alfred's filesystem.
All paths must be absolute and existing directories — validation is
expected at the boundary that builds this VO.
Attributes:
downloads: where qBittorrent drops finished torrents.
torrents: where seeding hard-links live (mirrors downloads/).
movies: library root for movies.
tv_shows: library root for TV shows.
"""
downloads: Path
torrents: Path
movies: Path
tv_shows: Path
def all(self) -> tuple[Path, ...]:
"""Return every configured root, in declaration order."""
return (self.downloads, self.torrents, self.movies, self.tv_shows)
def contains(self, path: Path) -> bool:
"""Return True if ``path`` is inside one of the configured roots.
Uses ``Path.resolve()`` to handle symlinks and ``..`` segments,
then ``relative_to`` for an exact within-root check.
"""
try:
resolved = path.resolve()
except OSError:
return False
for root in self.all():
try:
resolved.relative_to(root.resolve())
return True
except (ValueError, OSError):
continue
return False
+62 -139
View File
@@ -1,19 +1,28 @@
"""Filesystem application DTOs."""
"""DTOs for the 5 atomic filesystem use cases.
Each use case returns a small frozen dataclass tagged with a ``status``
field. On error, ``error`` (machine-readable code) and ``message``
(human-readable) are populated; on success, the relevant payload
fields are.
Error codes mirror the infrastructure exception types (lowercased,
snake-cased) — e.g. ``SourceNotFound`` → ``"source_not_found"`` — plus
the application-layer ``"path_not_allowed"`` for guard violations.
"""
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class CopyMediaResponse:
"""Response from copying a media file."""
@dataclass(frozen=True)
class ListDirResponse:
"""Response from ``list_dir_use_case``."""
status: str
source: str | None = None
destination: str | None = None
filename: str | None = None
size: int | None = None
status: str # "ok" | "error"
path: Path | None = None
entries: tuple[Path, ...] = ()
error: str | None = None
message: str | None = None
@@ -22,22 +31,33 @@ class CopyMediaResponse:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": self.source,
"destination": self.destination,
"filename": self.filename,
"size": self.size,
"path": str(self.path) if self.path else None,
"entries": [str(p) for p in self.entries],
}
@dataclass
class MoveMediaResponse:
"""Response from moving a media file."""
@dataclass(frozen=True)
class CreateDirResponse:
"""Response from ``create_dir_use_case``."""
status: str
source: str | None = None
destination: str | None = None
filename: str | None = None
size: int | None = None
path: Path | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {"status": self.status, "path": str(self.path) if self.path else None}
@dataclass(frozen=True)
class LinkFileResponse:
"""Response from ``link_file_use_case``."""
status: str
source: Path | None = None
destination: Path | None = None
error: str | None = None
message: str | None = None
@@ -46,100 +66,18 @@ class MoveMediaResponse:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": self.source,
"destination": self.destination,
"filename": self.filename,
"size": self.size,
"source": str(self.source) if self.source else None,
"destination": str(self.destination) if self.destination else None,
}
@dataclass
class PlacedSubtitle:
"""One subtitle file successfully placed."""
source: str
destination: str
filename: str
def to_dict(self) -> dict:
return {
"source": self.source,
"destination": self.destination,
"filename": self.filename,
}
@dataclass
class UnresolvedTrack:
"""A subtitle track that needs agent clarification before placement."""
raw_tokens: list[str]
file_path: str | None = None
file_size_kb: float | None = None
reason: str = "" # "unknown_language" | "low_confidence"
def to_dict(self) -> dict:
return {
"raw_tokens": self.raw_tokens,
"file_path": self.file_path,
"file_size_kb": self.file_size_kb,
"reason": self.reason,
}
@dataclass
class AvailableSubtitle:
"""One subtitle track available on an embedded media item."""
language: str # ISO 639-2 code
subtitle_type: str # "standard" | "sdh" | "forced" | "unknown"
def to_dict(self) -> dict:
return {"language": self.language, "type": self.subtitle_type}
@dataclass
class ManageSubtitlesResponse:
"""Response from the manage_subtitles use case."""
status: str # "ok" | "needs_clarification" | "error"
video_path: str | None = None
placed: list[PlacedSubtitle] | None = None
skipped_count: int = 0
unresolved: list[UnresolvedTrack] | None = None
available: list[AvailableSubtitle] | None = None # embedded tracks summary
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
result = {
"status": self.status,
"video_path": self.video_path,
"placed": [p.to_dict() for p in (self.placed or [])],
"placed_count": len(self.placed or []),
"skipped_count": self.skipped_count,
}
if self.unresolved:
result["unresolved"] = [u.to_dict() for u in self.unresolved]
result["unresolved_count"] = len(self.unresolved)
if self.available:
result["available"] = [a.to_dict() for a in self.available]
return result
@dataclass
class CreateSeedLinksResponse:
"""Response from creating seed links for a torrent."""
@dataclass(frozen=True)
class MoveFileResponse:
"""Response from ``move_file_use_case``."""
status: str
torrent_subfolder: str | None = None
linked_file: str | None = None
copied_files: list[str] | None = None
copied_count: int = 0
skipped: list[str] | None = None
source: Path | None = None
destination: Path | None = None
error: str | None = None
message: str | None = None
@@ -148,41 +86,26 @@ class CreateSeedLinksResponse:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"torrent_subfolder": self.torrent_subfolder,
"linked_file": self.linked_file,
"copied_files": self.copied_files or [],
"copied_count": self.copied_count,
"skipped": self.skipped or [],
"source": str(self.source) if self.source else None,
"destination": str(self.destination) if self.destination else None,
}
@dataclass
class ListFolderResponse:
"""Response from listing a folder."""
@dataclass(frozen=True)
class MoveDirResponse:
"""Response from ``move_dir_use_case``."""
status: str
folder_type: str | None = None # SHOULD BE A PROPERTY
path: str | None = None # NOT NONE - Should be path
entries: list[str] | None = None # NOT NONE - Empty list of path
count: int | None = None # USELESS
source: Path | None = None
destination: Path | None = None
error: str | None = None
message: str | None = None
def to_dict(self):
"""Convert to dict for agent compatibility."""
result = {"status": self.status}
def to_dict(self) -> dict:
if self.error:
result["error"] = self.error
result["message"] = self.message
else:
if self.folder_type:
result["folder_type"] = self.folder_type
if self.path:
result["path"] = self.path
if self.entries is not None:
result["entries"] = self.entries
if self.count is not None:
result["count"] = self.count
return result
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": str(self.source) if self.source else None,
"destination": str(self.destination) if self.destination else None,
}
+188
View File
@@ -0,0 +1,188 @@
"""Filesystem application DTOs."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class CopyMediaResponse:
"""Response from copying a media file."""
status: str
source: str | None = None
destination: str | None = None
filename: str | None = None
size: int | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": self.source,
"destination": self.destination,
"filename": self.filename,
"size": self.size,
}
@dataclass
class MoveMediaResponse:
"""Response from moving a media file."""
status: str
source: str | None = None
destination: str | None = None
filename: str | None = None
size: int | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"source": self.source,
"destination": self.destination,
"filename": self.filename,
"size": self.size,
}
@dataclass
class PlacedSubtitle:
"""One subtitle file successfully placed."""
source: str
destination: str
filename: str
def to_dict(self) -> dict:
return {
"source": self.source,
"destination": self.destination,
"filename": self.filename,
}
@dataclass
class UnresolvedTrack:
"""A subtitle track that needs agent clarification before placement."""
raw_tokens: list[str]
file_path: str | None = None
file_size_kb: float | None = None
reason: str = "" # "unknown_language" | "low_confidence"
def to_dict(self) -> dict:
return {
"raw_tokens": self.raw_tokens,
"file_path": self.file_path,
"file_size_kb": self.file_size_kb,
"reason": self.reason,
}
@dataclass
class AvailableSubtitle:
"""One subtitle track available on an embedded media item."""
language: str # ISO 639-2 code
subtitle_type: str # "standard" | "sdh" | "forced" | "unknown"
def to_dict(self) -> dict:
return {"language": self.language, "type": self.subtitle_type}
@dataclass
class ManageSubtitlesResponse:
"""Response from the manage_subtitles use case."""
status: str # "ok" | "needs_clarification" | "error"
video_path: str | None = None
placed: list[PlacedSubtitle] | None = None
skipped_count: int = 0
unresolved: list[UnresolvedTrack] | None = None
available: list[AvailableSubtitle] | None = None # embedded tracks summary
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
result = {
"status": self.status,
"video_path": self.video_path,
"placed": [p.to_dict() for p in (self.placed or [])],
"placed_count": len(self.placed or []),
"skipped_count": self.skipped_count,
}
if self.unresolved:
result["unresolved"] = [u.to_dict() for u in self.unresolved]
result["unresolved_count"] = len(self.unresolved)
if self.available:
result["available"] = [a.to_dict() for a in self.available]
return result
@dataclass
class CreateSeedLinksResponse:
"""Response from creating seed links for a torrent."""
status: str
torrent_subfolder: str | None = None
linked_file: str | None = None
copied_files: list[str] | None = None
copied_count: int = 0
skipped: list[str] | None = None
error: str | None = None
message: str | None = None
def to_dict(self) -> dict:
if self.error:
return {"status": self.status, "error": self.error, "message": self.message}
return {
"status": self.status,
"torrent_subfolder": self.torrent_subfolder,
"linked_file": self.linked_file,
"copied_files": self.copied_files or [],
"copied_count": self.copied_count,
"skipped": self.skipped or [],
}
@dataclass
class ListFolderResponse:
"""Response from listing a folder."""
status: str
folder_type: str | None = None # SHOULD BE A PROPERTY
path: str | None = None # NOT NONE - Should be path
entries: list[str] | None = None # NOT NONE - Empty list of path
count: int | None = None # USELESS
error: str | None = None
message: str | None = None
def to_dict(self):
"""Convert to dict for agent compatibility."""
result = {"status": self.status}
if self.error:
result["error"] = self.error
result["message"] = self.message
else:
if self.folder_type:
result["folder_type"] = self.folder_type
if self.path:
result["path"] = self.path
if self.entries is not None:
result["entries"] = self.entries
if self.count is not None:
result["count"] = self.count
return result
@@ -0,0 +1,40 @@
"""link_file use case — hard-link a file from one root to another."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, link_file
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import LinkFileResponse
def link_file_use_case(
src: Path, dst: Path, roots: DirectoryRoots
) -> LinkFileResponse:
"""Hard-link ``src`` to ``dst``. Both must be under configured roots.
The destination parent must already exist — the caller is expected
to have created it via ``create_dir_use_case`` if needed.
"""
if not roots.contains(src):
return LinkFileResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Source is outside configured roots: {src}",
)
if not roots.contains(dst):
return LinkFileResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Destination is outside configured roots: {dst}",
)
try:
link_file(src, dst)
except FilesystemError as e:
return LinkFileResponse(status="error", error=code_for(e), message=str(e))
return LinkFileResponse(status="ok", source=src, destination=dst)
+34
View File
@@ -0,0 +1,34 @@
"""list_dir use case — list a directory after guarding it within roots."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, list_dir
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import ListDirResponse
def list_dir_use_case(path: Path, roots: DirectoryRoots) -> ListDirResponse:
"""List the immediate children of ``path`` if it lives under one of
the configured roots.
Returns a :class:`ListDirResponse`. On guard failure, status is
``"error"`` with ``error="path_not_allowed"``. On infra failure,
status is ``"error"`` with a code mapped from the raised exception.
"""
if not roots.contains(path):
return ListDirResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Path is outside configured roots: {path}",
)
try:
entries = list_dir(path)
except FilesystemError as e:
return ListDirResponse(status="error", error=code_for(e), message=str(e))
return ListDirResponse(status="ok", path=path, entries=tuple(entries))
+36
View File
@@ -0,0 +1,36 @@
"""move_dir use case — move a directory tree between configured roots."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, move_dir
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import MoveDirResponse
def move_dir_use_case(
src: Path, dst: Path, roots: DirectoryRoots
) -> MoveDirResponse:
"""Move directory ``src`` to ``dst``. Both must be under configured roots."""
if not roots.contains(src):
return MoveDirResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Source is outside configured roots: {src}",
)
if not roots.contains(dst):
return MoveDirResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Destination is outside configured roots: {dst}",
)
try:
move_dir(src, dst)
except FilesystemError as e:
return MoveDirResponse(status="error", error=code_for(e), message=str(e))
return MoveDirResponse(status="ok", source=src, destination=dst)
@@ -0,0 +1,36 @@
"""move_file use case — move a file between configured roots."""
from __future__ import annotations
from pathlib import Path
from alfred.infrastructure.filesystem import FilesystemError, move_file
from ._errors import PATH_NOT_ALLOWED, code_for
from .directory_roots import DirectoryRoots
from .dto import MoveFileResponse
def move_file_use_case(
src: Path, dst: Path, roots: DirectoryRoots
) -> MoveFileResponse:
"""Move file ``src`` to ``dst``. Both must be under configured roots."""
if not roots.contains(src):
return MoveFileResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Source is outside configured roots: {src}",
)
if not roots.contains(dst):
return MoveFileResponse(
status="error",
error=PATH_NOT_ALLOWED,
message=f"Destination is outside configured roots: {dst}",
)
try:
move_file(src, dst)
except FilesystemError as e:
return MoveFileResponse(status="error", error=code_for(e), message=str(e))
return MoveFileResponse(status="ok", source=src, destination=dst)
+39 -10
View File
@@ -1,15 +1,44 @@
"""Filesystem operations."""
"""Filesystem infrastructure — 5 atomic ops as free functions.
from .exceptions import FilesystemError, PathTraversalError
from .file_manager import FileManager
from .filesystem_operations import create_folder, move
from .organizer import MediaOrganizer
All ops use :class:`pathlib.Path` and raise typed exceptions from
:mod:`.exceptions` on failure. They do **not** return status dicts and
do **not** read application state (no ``get_memory()``).
The application layer (``alfred.application.filesystem``) is responsible
for guarding paths against escaping configured roots and for wrapping
exceptions into DTOs.
"""
from .create_dir import create_dir
from .exceptions import (
CrossDevice,
DestinationExists,
FilesystemError,
FilesystemOSError,
NotADirectory,
NotAFile,
PermissionDenied,
SourceNotFound,
)
from .link_file import link_file
from .list_dir import list_dir
from .move_dir import move_dir
from .move_file import move_file
__all__ = [
"FileManager",
"MediaOrganizer",
# ops
"list_dir",
"create_dir",
"link_file",
"move_file",
"move_dir",
# exceptions
"FilesystemError",
"PathTraversalError",
"create_folder",
"move",
"SourceNotFound",
"DestinationExists",
"NotADirectory",
"NotAFile",
"PermissionDenied",
"CrossDevice",
"FilesystemOSError",
]
@@ -0,0 +1,33 @@
"""create_dir — create a directory (and missing parents)."""
from __future__ import annotations
import errno
from pathlib import Path
from .exceptions import FilesystemOSError, NotADirectory, PermissionDenied
def create_dir(path: Path) -> None:
"""Create ``path`` and any missing parents.
Idempotent: if ``path`` already exists **and is a directory**, this
is a no-op. If it exists but is not a directory, raises
:class:`NotADirectory`.
Raises:
NotADirectory: ``path`` exists but is not a directory.
PermissionDenied: parent is not writable.
FilesystemOSError: any other ``OSError`` from ``mkdir``.
"""
if path.exists() and not path.is_dir():
raise NotADirectory(path)
try:
path.mkdir(parents=True, exist_ok=True)
except PermissionError as e:
raise PermissionDenied(path, action="create_dir") from e
except OSError as e:
if e.errno == errno.EACCES:
raise PermissionDenied(path, action="create_dir") from e
raise FilesystemOSError("create_dir", path, e) from e
+80 -13
View File
@@ -1,25 +1,92 @@
"""Filesystem exceptions."""
"""Filesystem infrastructure exceptions.
Convention: the 5 atomic ops (``list_dir``, ``create_dir``, ``link_file``,
``move_file``, ``move_dir``) raise typed exceptions on failure rather
than returning ``T | None`` or status dicts. The application layer
catches these and wraps them into DTOs.
Hierarchy:
FilesystemError
SourceNotFound
DestinationExists
NotADirectory
NotAFile
PermissionDenied
CrossDevice
FilesystemOSError # catch-all for unexpected OSError
"""
from __future__ import annotations
from pathlib import Path
class FilesystemError(Exception):
"""Base exception for filesystem operations."""
pass
"""Base exception for all filesystem infrastructure failures."""
class PathTraversalError(FilesystemError):
"""Raised when path traversal attack is detected."""
class SourceNotFound(FilesystemError):
"""The source path does not exist."""
pass
def __init__(self, path: Path):
super().__init__(f"Source does not exist: {path}")
self.path = path
class FileNotFoundError(FilesystemError):
"""Raised when a file is not found."""
class DestinationExists(FilesystemError):
"""The destination already exists (refuse to overwrite)."""
pass
def __init__(self, path: Path):
super().__init__(f"Destination already exists: {path}")
self.path = path
class PermissionDeniedError(FilesystemError):
"""Raised when permission is denied."""
class NotADirectory(FilesystemError):
"""The path exists but is not a directory."""
pass
def __init__(self, path: Path):
super().__init__(f"Not a directory: {path}")
self.path = path
class NotAFile(FilesystemError):
"""The path exists but is not a regular file."""
def __init__(self, path: Path):
super().__init__(f"Not a file: {path}")
self.path = path
class PermissionDenied(FilesystemError):
"""OS denied the requested operation."""
def __init__(self, path: Path, action: str):
super().__init__(f"Permission denied ({action}): {path}")
self.path = path
self.action = action
class CrossDevice(FilesystemError):
"""Source and destination live on different filesystems.
On Alfred's ZFS pool this should never happen. If it does, it is a
configuration bug, not something to silently fall back from.
"""
def __init__(self, source: Path, destination: Path):
super().__init__(
f"Cross-device operation refused: {source}{destination}"
)
self.source = source
self.destination = destination
class FilesystemOSError(FilesystemError):
"""Wraps an unexpected ``OSError`` from the underlying syscall."""
def __init__(self, action: str, path: Path, cause: OSError):
super().__init__(f"{action} failed for {path}: {cause}")
self.action = action
self.path = path
self.cause = cause
@@ -0,0 +1,61 @@
"""link_file — create a hard link from ``src`` to ``dst``.
Hard-link semantics: ``dst`` becomes a second name for the same inode
as ``src``. Zero data copy, instant on any filesystem, and qBittorrent
keeps seeding the original path unaffected since the inode is unchanged.
Requires ``src`` and ``dst`` to live on the same filesystem. On Alfred's
ZFS pool this is always the case; if not, :class:`CrossDevice` is
raised.
"""
from __future__ import annotations
import errno
import os
from pathlib import Path
from .exceptions import (
CrossDevice,
DestinationExists,
FilesystemOSError,
NotAFile,
PermissionDenied,
SourceNotFound,
)
def link_file(src: Path, dst: Path) -> None:
"""Hard-link ``src`` to ``dst``.
The destination parent directory **must already exist** — caller is
responsible for creating it (use ``create_dir`` first if needed).
Raises:
SourceNotFound: ``src`` does not exist.
NotAFile: ``src`` is not a regular file.
SourceNotFound: parent of ``dst`` does not exist.
DestinationExists: ``dst`` already exists.
CrossDevice: source and destination are on different filesystems.
PermissionDenied: cannot create link (permission).
FilesystemOSError: any other ``OSError`` from ``os.link``.
"""
if not src.exists():
raise SourceNotFound(src)
if not src.is_file():
raise NotAFile(src)
if not dst.parent.exists():
raise SourceNotFound(dst.parent)
if dst.exists():
raise DestinationExists(dst)
try:
os.link(src, dst)
except PermissionError as e:
raise PermissionDenied(dst, action="link_file") from e
except OSError as e:
if e.errno == errno.EXDEV:
raise CrossDevice(src, dst) from e
if e.errno == errno.EACCES:
raise PermissionDenied(dst, action="link_file") from e
raise FilesystemOSError("link_file", dst, e) from e
@@ -0,0 +1,40 @@
"""list_dir — return the immediate children of a directory."""
from __future__ import annotations
import errno
from pathlib import Path
from .exceptions import (
FilesystemOSError,
NotADirectory,
PermissionDenied,
SourceNotFound,
)
def list_dir(path: Path) -> list[Path]:
"""Return the immediate children of ``path``, sorted by name.
Returns absolute :class:`~pathlib.Path` objects for both files and
directories. Does not recurse.
Raises:
SourceNotFound: ``path`` does not exist.
NotADirectory: ``path`` exists but is not a directory.
PermissionDenied: the directory is not readable.
FilesystemOSError: any other ``OSError`` from ``iterdir``.
"""
if not path.exists():
raise SourceNotFound(path)
if not path.is_dir():
raise NotADirectory(path)
try:
return sorted(path.iterdir())
except PermissionError as e:
raise PermissionDenied(path, action="list_dir") from e
except OSError as e:
if e.errno == errno.EACCES:
raise PermissionDenied(path, action="list_dir") from e
raise FilesystemOSError("list_dir", path, e) from e
@@ -0,0 +1,52 @@
"""move_dir — move a directory tree."""
from __future__ import annotations
import errno
import os
from pathlib import Path
from .exceptions import (
CrossDevice,
DestinationExists,
FilesystemOSError,
NotADirectory,
PermissionDenied,
SourceNotFound,
)
def move_dir(src: Path, dst: Path) -> None:
"""Move directory ``src`` to ``dst``.
Same-filesystem rename — instant on ZFS, even for huge trees. The
destination parent must already exist; ``dst`` itself must not.
Raises:
SourceNotFound: ``src`` does not exist.
NotADirectory: ``src`` is not a directory.
SourceNotFound: parent of ``dst`` does not exist.
DestinationExists: ``dst`` already exists.
CrossDevice: source and destination are on different filesystems.
PermissionDenied: cannot perform the rename (permission).
FilesystemOSError: any other ``OSError`` from ``os.rename``.
"""
if not src.exists():
raise SourceNotFound(src)
if not src.is_dir():
raise NotADirectory(src)
if not dst.parent.exists():
raise SourceNotFound(dst.parent)
if dst.exists():
raise DestinationExists(dst)
try:
os.rename(src, dst)
except PermissionError as e:
raise PermissionDenied(dst, action="move_dir") from e
except OSError as e:
if e.errno == errno.EXDEV:
raise CrossDevice(src, dst) from e
if e.errno == errno.EACCES:
raise PermissionDenied(dst, action="move_dir") from e
raise FilesystemOSError("move_dir", dst, e) from e
@@ -0,0 +1,52 @@
"""move_file — move a single regular file."""
from __future__ import annotations
import errno
import os
from pathlib import Path
from .exceptions import (
CrossDevice,
DestinationExists,
FilesystemOSError,
NotAFile,
PermissionDenied,
SourceNotFound,
)
def move_file(src: Path, dst: Path) -> None:
"""Move ``src`` to ``dst``.
Same-filesystem rename — instant on ZFS. The destination parent
must already exist.
Raises:
SourceNotFound: ``src`` does not exist.
NotAFile: ``src`` is not a regular file.
SourceNotFound: parent of ``dst`` does not exist.
DestinationExists: ``dst`` already exists.
CrossDevice: source and destination are on different filesystems.
PermissionDenied: cannot perform the rename (permission).
FilesystemOSError: any other ``OSError`` from ``os.rename``.
"""
if not src.exists():
raise SourceNotFound(src)
if not src.is_file():
raise NotAFile(src)
if not dst.parent.exists():
raise SourceNotFound(dst.parent)
if dst.exists():
raise DestinationExists(dst)
try:
os.rename(src, dst)
except PermissionError as e:
raise PermissionDenied(dst, action="move_file") from e
except OSError as e:
if e.errno == errno.EXDEV:
raise CrossDevice(src, dst) from e
if e.errno == errno.EACCES:
raise PermissionDenied(dst, action="move_file") from e
raise FilesystemOSError("move_file", dst, e) from e