refactor(domain): freeze media track value objects

AudioTrack, VideoTrack, SubtitleTrack and MediaInfo are snapshots of a
single ffprobe run — model them as proper immutable value objects.

- @dataclass(frozen=True) on all four
- MediaInfo track collections become tuple[...] instead of list[...]
- ffprobe adapter rewritten to build tuples up-front instead of
  appending/setattr'ing on a constructed instance
This commit is contained in:
2026-05-19 14:17:27 +02:00
parent eb8995cfc3
commit 7cd24f3a31
5 changed files with 32 additions and 20 deletions
+1 -1
View File
@@ -5,7 +5,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@dataclass @dataclass(frozen=True)
class AudioTrack: class AudioTrack:
"""A single audio track as reported by ffprobe.""" """A single audio track as reported by ffprobe."""
+8 -6
View File
@@ -9,19 +9,21 @@ from .subtitle import SubtitleTrack
from .video import VideoTrack from .video import VideoTrack
@dataclass @dataclass(frozen=True)
class MediaInfo: class MediaInfo:
""" """
File-level media metadata extracted by ffprobe. File-level media metadata extracted by ffprobe — immutable snapshot.
Symmetric design: every stream type is a list of typed track objects. Symmetric design: every stream type is a tuple of typed track objects
(immutable on purpose — a MediaInfo is a frozen view of one ffprobe run,
not a mutable collection to append to).
Backwards-compatible flat accessors (``resolution``, ``width``, …) read Backwards-compatible flat accessors (``resolution``, ``width``, …) read
from the first video track when present. from the first video track when present.
""" """
video_tracks: list[VideoTrack] = field(default_factory=list) video_tracks: tuple[VideoTrack, ...] = field(default_factory=tuple)
audio_tracks: list[AudioTrack] = field(default_factory=list) audio_tracks: tuple[AudioTrack, ...] = field(default_factory=tuple)
subtitle_tracks: list[SubtitleTrack] = field(default_factory=list) subtitle_tracks: tuple[SubtitleTrack, ...] = field(default_factory=tuple)
# File-level (from ffprobe ``format`` block, not from any single stream) # File-level (from ffprobe ``format`` block, not from any single stream)
duration_seconds: float | None = None duration_seconds: float | None = None
+1 -1
View File
@@ -14,7 +14,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@dataclass @dataclass(frozen=True)
class SubtitleTrack: class SubtitleTrack:
"""A single embedded subtitle track as reported by ffprobe.""" """A single embedded subtitle track as reported by ffprobe."""
+1 -1
View File
@@ -5,7 +5,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@dataclass @dataclass(frozen=True)
class VideoTrack: class VideoTrack:
"""A single video track as reported by ffprobe. """A single video track as reported by ffprobe.
+21 -11
View File
@@ -57,27 +57,31 @@ def _parse(data: dict) -> MediaInfo:
streams = data.get("streams", []) streams = data.get("streams", [])
fmt = data.get("format", {}) fmt = data.get("format", {})
info = MediaInfo()
# File-level duration/bitrate (ffprobe ``format`` block — independent of streams) # File-level duration/bitrate (ffprobe ``format`` block — independent of streams)
duration_seconds: float | None = None
bitrate_kbps: int | None = None
if "duration" in fmt: if "duration" in fmt:
try: try:
info.duration_seconds = float(fmt["duration"]) duration_seconds = float(fmt["duration"])
except ValueError: except ValueError:
pass pass
if "bit_rate" in fmt: if "bit_rate" in fmt:
try: try:
info.bitrate_kbps = int(fmt["bit_rate"]) // 1000 bitrate_kbps = int(fmt["bit_rate"]) // 1000
except ValueError: except ValueError:
pass pass
video_tracks: list[VideoTrack] = []
audio_tracks: list[AudioTrack] = []
subtitle_tracks: list[SubtitleTrack] = []
for stream in streams: for stream in streams:
codec_type = stream.get("codec_type") codec_type = stream.get("codec_type")
if codec_type == "video": if codec_type == "video":
info.video_tracks.append( video_tracks.append(
VideoTrack( VideoTrack(
index=stream.get("index", len(info.video_tracks)), index=stream.get("index", len(video_tracks)),
codec=stream.get("codec_name"), codec=stream.get("codec_name"),
width=stream.get("width"), width=stream.get("width"),
height=stream.get("height"), height=stream.get("height"),
@@ -86,9 +90,9 @@ def _parse(data: dict) -> MediaInfo:
) )
elif codec_type == "audio": elif codec_type == "audio":
info.audio_tracks.append( audio_tracks.append(
AudioTrack( AudioTrack(
index=stream.get("index", len(info.audio_tracks)), index=stream.get("index", len(audio_tracks)),
codec=stream.get("codec_name"), codec=stream.get("codec_name"),
channels=stream.get("channels"), channels=stream.get("channels"),
channel_layout=stream.get("channel_layout"), channel_layout=stream.get("channel_layout"),
@@ -98,9 +102,9 @@ def _parse(data: dict) -> MediaInfo:
) )
elif codec_type == "subtitle": elif codec_type == "subtitle":
info.subtitle_tracks.append( subtitle_tracks.append(
SubtitleTrack( SubtitleTrack(
index=stream.get("index", len(info.subtitle_tracks)), index=stream.get("index", len(subtitle_tracks)),
codec=stream.get("codec_name"), codec=stream.get("codec_name"),
language=stream.get("tags", {}).get("language"), language=stream.get("tags", {}).get("language"),
is_default=stream.get("disposition", {}).get("default", 0) == 1, is_default=stream.get("disposition", {}).get("default", 0) == 1,
@@ -108,4 +112,10 @@ def _parse(data: dict) -> MediaInfo:
) )
) )
return info return MediaInfo(
video_tracks=tuple(video_tracks),
audio_tracks=tuple(audio_tracks),
subtitle_tracks=tuple(subtitle_tracks),
duration_seconds=duration_seconds,
bitrate_kbps=bitrate_kbps,
)