diff --git a/backend/pipeline/shorts_generator.py b/backend/pipeline/shorts_generator.py new file mode 100644 index 0000000..b5f63a3 --- /dev/null +++ b/backend/pipeline/shorts_generator.py @@ -0,0 +1,132 @@ +"""FFmpeg clip extraction with format presets for shorts generation. + +Pure functions — no DB access, no Celery dependency. Tested independently. +""" + +from __future__ import annotations + +import logging +import subprocess +from dataclasses import dataclass +from pathlib import Path + +from models import FormatPreset + +logger = logging.getLogger(__name__) + +FFMPEG_TIMEOUT_SECS = 300 + + +@dataclass(frozen=True) +class PresetSpec: + """Resolution and ffmpeg video filter for a format preset.""" + width: int + height: int + vf_filter: str + + +PRESETS: dict[FormatPreset, PresetSpec] = { + FormatPreset.vertical: PresetSpec( + width=1080, + height=1920, + vf_filter="scale=1080:-2,pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black", + ), + FormatPreset.square: PresetSpec( + width=1080, + height=1080, + vf_filter="crop=min(iw\\,ih):min(iw\\,ih),scale=1080:1080", + ), + FormatPreset.horizontal: PresetSpec( + width=1920, + height=1080, + vf_filter="scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2:black", + ), +} + + +def resolve_video_path(video_source_root: str, file_path: str) -> Path: + """Join root + relative path and validate the file exists. + + Args: + video_source_root: Base directory for video files (e.g. /videos). + file_path: Relative path stored in SourceVideo.file_path. + + Returns: + Resolved absolute Path. + + Raises: + FileNotFoundError: If the resolved path doesn't exist or isn't a file. + """ + resolved = Path(video_source_root) / file_path + if not resolved.is_file(): + raise FileNotFoundError( + f"Video file not found: {resolved} " + f"(root={video_source_root!r}, relative={file_path!r})" + ) + return resolved + + +def extract_clip( + input_path: Path | str, + output_path: Path | str, + start_secs: float, + end_secs: float, + vf_filter: str, +) -> None: + """Extract a clip from a video file using ffmpeg. + + Seeks to *start_secs*, encodes until *end_secs*, and applies *vf_filter*. + Uses ``-c:v libx264 -preset fast -crf 23`` for reasonable quality/speed. + + Args: + input_path: Source video file. + output_path: Destination mp4 file (parent dir must exist). + start_secs: Start time in seconds. + end_secs: End time in seconds. + vf_filter: ffmpeg ``-vf`` filter string. + + Raises: + subprocess.CalledProcessError: If ffmpeg exits non-zero. + subprocess.TimeoutExpired: If ffmpeg exceeds the timeout. + ValueError: If start >= end. + """ + duration = end_secs - start_secs + if duration <= 0: + raise ValueError( + f"Invalid clip range: start={start_secs}s end={end_secs}s " + f"(duration={duration}s)" + ) + + cmd = [ + "ffmpeg", + "-y", # overwrite output + "-ss", str(start_secs), # seek before input (fast) + "-i", str(input_path), + "-t", str(duration), + "-vf", vf_filter, + "-c:v", "libx264", + "-preset", "fast", + "-crf", "23", + "-c:a", "aac", + "-b:a", "128k", + "-movflags", "+faststart", # web-friendly mp4 + str(output_path), + ] + + logger.info( + "ffmpeg: extracting %.1fs clip from %s → %s", + duration, input_path, output_path, + ) + + result = subprocess.run( + cmd, + capture_output=True, + timeout=FFMPEG_TIMEOUT_SECS, + ) + + if result.returncode != 0: + stderr_text = result.stderr.decode("utf-8", errors="replace")[-2000:] + logger.error("ffmpeg failed (rc=%d): %s", result.returncode, stderr_text) + raise subprocess.CalledProcessError( + result.returncode, cmd, output=result.stdout, stderr=result.stderr, + ) diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index 5f7e895..d7cc032 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -2860,3 +2860,192 @@ def extract_personality_profile(self, creator_id: str) -> str: raise self.retry(exc=exc) finally: session.close() + + +# ── Stage: Shorts Generation ───────────────────────────────────────────────── + +@celery_app.task(bind=True, max_retries=1, default_retry_delay=60) +def stage_generate_shorts(self, highlight_candidate_id: str) -> str: + """Generate video shorts for an approved highlight candidate. + + Creates one GeneratedShort row per FormatPreset, extracts the clip via + ffmpeg, uploads to MinIO, and updates status. Each preset is independent — + a failure on one does not block the others. + + Returns the highlight_candidate_id on completion. + """ + from pipeline.shorts_generator import PRESETS, extract_clip, resolve_video_path + from models import FormatPreset, GeneratedShort, ShortStatus + + start = time.monotonic() + session = _get_sync_session() + settings = get_settings() + + try: + # ── Load highlight with joined relations ──────────────────────── + highlight = session.execute( + select(HighlightCandidate) + .where(HighlightCandidate.id == highlight_candidate_id) + ).scalar_one_or_none() + + if highlight is None: + logger.error( + "Highlight candidate not found: %s", highlight_candidate_id, + ) + return highlight_candidate_id + + if highlight.status.value != "approved": + logger.warning( + "Highlight %s status is %s, expected approved — skipping", + highlight_candidate_id, highlight.status.value, + ) + return highlight_candidate_id + + # Check for already-processing shorts (reject duplicate runs) + existing_processing = session.execute( + select(func.count()) + .where(GeneratedShort.highlight_candidate_id == highlight_candidate_id) + .where(GeneratedShort.status == ShortStatus.processing) + ).scalar() + if existing_processing and existing_processing > 0: + logger.warning( + "Highlight %s already has %d processing shorts — rejecting duplicate", + highlight_candidate_id, existing_processing, + ) + return highlight_candidate_id + + # Eager-load relations + key_moment = highlight.key_moment + source_video = highlight.source_video + + # ── Resolve video file path ───────────────────────────────────── + try: + video_path = resolve_video_path( + settings.video_source_path, source_video.file_path, + ) + except FileNotFoundError as fnf: + logger.error( + "Video file missing for highlight %s: %s", + highlight_candidate_id, fnf, + ) + # Mark all presets as failed + for preset in FormatPreset: + spec = PRESETS[preset] + short = GeneratedShort( + highlight_candidate_id=highlight_candidate_id, + format_preset=preset, + width=spec.width, + height=spec.height, + status=ShortStatus.failed, + error_message=str(fnf), + ) + session.add(short) + session.commit() + return highlight_candidate_id + + # ── Compute effective start/end (trim overrides) ──────────────── + clip_start = highlight.trim_start if highlight.trim_start is not None else key_moment.start_time + clip_end = highlight.trim_end if highlight.trim_end is not None else key_moment.end_time + + logger.info( + "Generating shorts for highlight=%s video=%s [%.1f–%.1f]s", + highlight_candidate_id, source_video.file_path, + clip_start, clip_end, + ) + + # ── Process each preset independently ─────────────────────────── + for preset in FormatPreset: + spec = PRESETS[preset] + preset_start = time.monotonic() + + # Create DB row (status=processing) + short = GeneratedShort( + highlight_candidate_id=highlight_candidate_id, + format_preset=preset, + width=spec.width, + height=spec.height, + status=ShortStatus.processing, + duration_secs=clip_end - clip_start, + ) + session.add(short) + session.commit() + session.refresh(short) + + tmp_path = Path(f"/tmp/short_{short.id}_{preset.value}.mp4") + minio_key = f"shorts/{highlight_candidate_id}/{preset.value}.mp4" + + try: + # Extract clip + extract_clip( + input_path=video_path, + output_path=tmp_path, + start_secs=clip_start, + end_secs=clip_end, + vf_filter=spec.vf_filter, + ) + + # Upload to MinIO + file_size = tmp_path.stat().st_size + with open(tmp_path, "rb") as f: + from minio_client import upload_file + upload_file( + object_key=minio_key, + data=f, + length=file_size, + content_type="video/mp4", + ) + + # Update DB row — complete + short.status = ShortStatus.complete + short.file_size_bytes = file_size + short.minio_object_key = minio_key + session.commit() + + elapsed_preset = time.monotonic() - preset_start + logger.info( + "Short generated: highlight=%s preset=%s " + "size=%d bytes duration=%.1fs elapsed=%.1fs", + highlight_candidate_id, preset.value, + file_size, clip_end - clip_start, elapsed_preset, + ) + + except Exception as exc: + session.rollback() + # Re-fetch the short row after rollback + session.refresh(short) + short.status = ShortStatus.failed + short.error_message = str(exc)[:2000] + session.commit() + + elapsed_preset = time.monotonic() - preset_start + logger.error( + "Short failed: highlight=%s preset=%s " + "error=%s elapsed=%.1fs", + highlight_candidate_id, preset.value, + str(exc)[:500], elapsed_preset, + ) + + finally: + # Clean up temp file + if tmp_path.exists(): + try: + tmp_path.unlink() + except OSError: + pass + + elapsed = time.monotonic() - start + logger.info( + "Shorts generation complete for highlight=%s in %.1fs", + highlight_candidate_id, elapsed, + ) + return highlight_candidate_id + + except Exception as exc: + session.rollback() + logger.error( + "Shorts generation failed for highlight=%s: %s", + highlight_candidate_id, exc, + ) + raise self.retry(exc=exc) + finally: + session.close()