"""FFmpeg clip extraction with format presets for shorts generation. Pure functions — no DB access, no Celery dependency. Tested independently. """ from __future__ import annotations import logging import subprocess from dataclasses import dataclass from pathlib import Path from models import FormatPreset logger = logging.getLogger(__name__) FFMPEG_TIMEOUT_SECS = 300 @dataclass(frozen=True) class PresetSpec: """Resolution and ffmpeg video filter for a format preset.""" width: int height: int vf_filter: str PRESETS: dict[FormatPreset, PresetSpec] = { FormatPreset.vertical: PresetSpec( width=1080, height=1920, vf_filter="scale=1080:-2,pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black", ), FormatPreset.square: PresetSpec( width=1080, height=1080, vf_filter="crop=min(iw\\,ih):min(iw\\,ih),scale=1080:1080", ), FormatPreset.horizontal: PresetSpec( width=1920, height=1080, vf_filter="scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2:black", ), } def resolve_video_path(video_source_root: str, file_path: str) -> Path: """Join root + relative path and validate the file exists. Args: video_source_root: Base directory for video files (e.g. /videos). file_path: Relative path stored in SourceVideo.file_path. Returns: Resolved absolute Path. Raises: FileNotFoundError: If the resolved path doesn't exist or isn't a file. """ resolved = Path(video_source_root) / file_path if not resolved.is_file(): raise FileNotFoundError( f"Video file not found: {resolved} " f"(root={video_source_root!r}, relative={file_path!r})" ) return resolved def extract_clip( input_path: Path | str, output_path: Path | str, start_secs: float, end_secs: float, vf_filter: str, ass_path: Path | str | None = None, ) -> None: """Extract a clip from a video file using ffmpeg. Seeks to *start_secs*, encodes until *end_secs*, and applies *vf_filter*. Uses ``-c:v libx264 -preset fast -crf 23`` for reasonable quality/speed. When *ass_path* is provided, the ASS subtitle filter is appended to the video filter chain so that captions are burned into the output video. Args: input_path: Source video file. output_path: Destination mp4 file (parent dir must exist). start_secs: Start time in seconds. end_secs: End time in seconds. vf_filter: ffmpeg ``-vf`` filter string. ass_path: Optional path to an ASS subtitle file. When provided, ``ass=`` is appended to the filter chain. Raises: subprocess.CalledProcessError: If ffmpeg exits non-zero. subprocess.TimeoutExpired: If ffmpeg exceeds the timeout. ValueError: If start >= end. """ duration = end_secs - start_secs if duration <= 0: raise ValueError( f"Invalid clip range: start={start_secs}s end={end_secs}s " f"(duration={duration}s)" ) # Build the video filter chain — ASS burn-in comes after scale/pad effective_vf = vf_filter if ass_path is not None: # Escape colons and backslashes in the path for ffmpeg filter syntax escaped = str(ass_path).replace("\\", "\\\\").replace(":", "\\:") effective_vf = f"{vf_filter},ass={escaped}" cmd = [ "ffmpeg", "-y", # overwrite output "-ss", str(start_secs), # seek before input (fast) "-i", str(input_path), "-t", str(duration), "-vf", effective_vf, "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", # web-friendly mp4 str(output_path), ] logger.info( "ffmpeg: extracting %.1fs clip from %s → %s", duration, input_path, output_path, ) result = subprocess.run( cmd, capture_output=True, timeout=FFMPEG_TIMEOUT_SECS, ) if result.returncode != 0: stderr_text = result.stderr.decode("utf-8", errors="replace")[-2000:] logger.error("ffmpeg failed (rc=%d): %s", result.returncode, stderr_text) raise subprocess.CalledProcessError( result.returncode, cmd, output=result.stdout, stderr=result.stderr, ) def extract_clip_with_template( input_path: Path | str, output_path: Path | str, start_secs: float, end_secs: float, vf_filter: str, ass_path: Path | str | None = None, intro_path: Path | str | None = None, outro_path: Path | str | None = None, ) -> None: """Extract a clip and optionally prepend/append intro/outro cards. If neither intro nor outro is provided, delegates directly to :func:`extract_clip`. When cards are provided, the main clip is extracted to a temp file, then all segments are concatenated via :func:`~pipeline.card_renderer.concat_segments`. Args: input_path: Source video file. output_path: Final destination mp4 file. start_secs: Start time in seconds. end_secs: End time in seconds. vf_filter: ffmpeg ``-vf`` filter string. ass_path: Optional ASS subtitle file path. intro_path: Optional intro card mp4 path. outro_path: Optional outro card mp4 path. Raises: subprocess.CalledProcessError: If any ffmpeg command fails. ValueError: If clip range is invalid. """ has_cards = intro_path is not None or outro_path is not None if not has_cards: # No template cards — simple extraction extract_clip( input_path=input_path, output_path=output_path, start_secs=start_secs, end_secs=end_secs, vf_filter=vf_filter, ass_path=ass_path, ) return # Extract main clip to a temp file for concatenation main_clip_path = Path(str(output_path) + ".main.mp4") try: extract_clip( input_path=input_path, output_path=main_clip_path, start_secs=start_secs, end_secs=end_secs, vf_filter=vf_filter, ass_path=ass_path, ) # Build segment list in order: intro → main → outro segments: list[Path] = [] if intro_path is not None: segments.append(Path(intro_path)) segments.append(main_clip_path) if outro_path is not None: segments.append(Path(outro_path)) from pipeline.card_renderer import concat_segments concat_segments(segments=segments, output_path=Path(output_path)) finally: # Clean up temp main clip if main_clip_path.exists(): try: main_clip_path.unlink() except OSError: pass