feat: Added shorts_generator.py with 3 format presets and stage_generat…

- "backend/pipeline/shorts_generator.py"
- "backend/pipeline/stages.py"

GSD-Task: S03/T02
This commit is contained in:
jlightner 2026-04-04 09:47:40 +00:00
parent 11d58e09dd
commit e0a6458bdc
2 changed files with 321 additions and 0 deletions

View file

@ -0,0 +1,132 @@
"""FFmpeg clip extraction with format presets for shorts generation.
Pure functions no DB access, no Celery dependency. Tested independently.
"""
from __future__ import annotations
import logging
import subprocess
from dataclasses import dataclass
from pathlib import Path
from models import FormatPreset
logger = logging.getLogger(__name__)
FFMPEG_TIMEOUT_SECS = 300
@dataclass(frozen=True)
class PresetSpec:
"""Resolution and ffmpeg video filter for a format preset."""
width: int
height: int
vf_filter: str
PRESETS: dict[FormatPreset, PresetSpec] = {
FormatPreset.vertical: PresetSpec(
width=1080,
height=1920,
vf_filter="scale=1080:-2,pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black",
),
FormatPreset.square: PresetSpec(
width=1080,
height=1080,
vf_filter="crop=min(iw\\,ih):min(iw\\,ih),scale=1080:1080",
),
FormatPreset.horizontal: PresetSpec(
width=1920,
height=1080,
vf_filter="scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2:black",
),
}
def resolve_video_path(video_source_root: str, file_path: str) -> Path:
"""Join root + relative path and validate the file exists.
Args:
video_source_root: Base directory for video files (e.g. /videos).
file_path: Relative path stored in SourceVideo.file_path.
Returns:
Resolved absolute Path.
Raises:
FileNotFoundError: If the resolved path doesn't exist or isn't a file.
"""
resolved = Path(video_source_root) / file_path
if not resolved.is_file():
raise FileNotFoundError(
f"Video file not found: {resolved} "
f"(root={video_source_root!r}, relative={file_path!r})"
)
return resolved
def extract_clip(
input_path: Path | str,
output_path: Path | str,
start_secs: float,
end_secs: float,
vf_filter: str,
) -> None:
"""Extract a clip from a video file using ffmpeg.
Seeks to *start_secs*, encodes until *end_secs*, and applies *vf_filter*.
Uses ``-c:v libx264 -preset fast -crf 23`` for reasonable quality/speed.
Args:
input_path: Source video file.
output_path: Destination mp4 file (parent dir must exist).
start_secs: Start time in seconds.
end_secs: End time in seconds.
vf_filter: ffmpeg ``-vf`` filter string.
Raises:
subprocess.CalledProcessError: If ffmpeg exits non-zero.
subprocess.TimeoutExpired: If ffmpeg exceeds the timeout.
ValueError: If start >= end.
"""
duration = end_secs - start_secs
if duration <= 0:
raise ValueError(
f"Invalid clip range: start={start_secs}s end={end_secs}s "
f"(duration={duration}s)"
)
cmd = [
"ffmpeg",
"-y", # overwrite output
"-ss", str(start_secs), # seek before input (fast)
"-i", str(input_path),
"-t", str(duration),
"-vf", vf_filter,
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "aac",
"-b:a", "128k",
"-movflags", "+faststart", # web-friendly mp4
str(output_path),
]
logger.info(
"ffmpeg: extracting %.1fs clip from %s%s",
duration, input_path, output_path,
)
result = subprocess.run(
cmd,
capture_output=True,
timeout=FFMPEG_TIMEOUT_SECS,
)
if result.returncode != 0:
stderr_text = result.stderr.decode("utf-8", errors="replace")[-2000:]
logger.error("ffmpeg failed (rc=%d): %s", result.returncode, stderr_text)
raise subprocess.CalledProcessError(
result.returncode, cmd, output=result.stdout, stderr=result.stderr,
)

View file

@ -2860,3 +2860,192 @@ def extract_personality_profile(self, creator_id: str) -> str:
raise self.retry(exc=exc) raise self.retry(exc=exc)
finally: finally:
session.close() session.close()
# ── Stage: Shorts Generation ─────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=1, default_retry_delay=60)
def stage_generate_shorts(self, highlight_candidate_id: str) -> str:
"""Generate video shorts for an approved highlight candidate.
Creates one GeneratedShort row per FormatPreset, extracts the clip via
ffmpeg, uploads to MinIO, and updates status. Each preset is independent
a failure on one does not block the others.
Returns the highlight_candidate_id on completion.
"""
from pipeline.shorts_generator import PRESETS, extract_clip, resolve_video_path
from models import FormatPreset, GeneratedShort, ShortStatus
start = time.monotonic()
session = _get_sync_session()
settings = get_settings()
try:
# ── Load highlight with joined relations ────────────────────────
highlight = session.execute(
select(HighlightCandidate)
.where(HighlightCandidate.id == highlight_candidate_id)
).scalar_one_or_none()
if highlight is None:
logger.error(
"Highlight candidate not found: %s", highlight_candidate_id,
)
return highlight_candidate_id
if highlight.status.value != "approved":
logger.warning(
"Highlight %s status is %s, expected approved — skipping",
highlight_candidate_id, highlight.status.value,
)
return highlight_candidate_id
# Check for already-processing shorts (reject duplicate runs)
existing_processing = session.execute(
select(func.count())
.where(GeneratedShort.highlight_candidate_id == highlight_candidate_id)
.where(GeneratedShort.status == ShortStatus.processing)
).scalar()
if existing_processing and existing_processing > 0:
logger.warning(
"Highlight %s already has %d processing shorts — rejecting duplicate",
highlight_candidate_id, existing_processing,
)
return highlight_candidate_id
# Eager-load relations
key_moment = highlight.key_moment
source_video = highlight.source_video
# ── Resolve video file path ─────────────────────────────────────
try:
video_path = resolve_video_path(
settings.video_source_path, source_video.file_path,
)
except FileNotFoundError as fnf:
logger.error(
"Video file missing for highlight %s: %s",
highlight_candidate_id, fnf,
)
# Mark all presets as failed
for preset in FormatPreset:
spec = PRESETS[preset]
short = GeneratedShort(
highlight_candidate_id=highlight_candidate_id,
format_preset=preset,
width=spec.width,
height=spec.height,
status=ShortStatus.failed,
error_message=str(fnf),
)
session.add(short)
session.commit()
return highlight_candidate_id
# ── Compute effective start/end (trim overrides) ────────────────
clip_start = highlight.trim_start if highlight.trim_start is not None else key_moment.start_time
clip_end = highlight.trim_end if highlight.trim_end is not None else key_moment.end_time
logger.info(
"Generating shorts for highlight=%s video=%s [%.1f%.1f]s",
highlight_candidate_id, source_video.file_path,
clip_start, clip_end,
)
# ── Process each preset independently ───────────────────────────
for preset in FormatPreset:
spec = PRESETS[preset]
preset_start = time.monotonic()
# Create DB row (status=processing)
short = GeneratedShort(
highlight_candidate_id=highlight_candidate_id,
format_preset=preset,
width=spec.width,
height=spec.height,
status=ShortStatus.processing,
duration_secs=clip_end - clip_start,
)
session.add(short)
session.commit()
session.refresh(short)
tmp_path = Path(f"/tmp/short_{short.id}_{preset.value}.mp4")
minio_key = f"shorts/{highlight_candidate_id}/{preset.value}.mp4"
try:
# Extract clip
extract_clip(
input_path=video_path,
output_path=tmp_path,
start_secs=clip_start,
end_secs=clip_end,
vf_filter=spec.vf_filter,
)
# Upload to MinIO
file_size = tmp_path.stat().st_size
with open(tmp_path, "rb") as f:
from minio_client import upload_file
upload_file(
object_key=minio_key,
data=f,
length=file_size,
content_type="video/mp4",
)
# Update DB row — complete
short.status = ShortStatus.complete
short.file_size_bytes = file_size
short.minio_object_key = minio_key
session.commit()
elapsed_preset = time.monotonic() - preset_start
logger.info(
"Short generated: highlight=%s preset=%s "
"size=%d bytes duration=%.1fs elapsed=%.1fs",
highlight_candidate_id, preset.value,
file_size, clip_end - clip_start, elapsed_preset,
)
except Exception as exc:
session.rollback()
# Re-fetch the short row after rollback
session.refresh(short)
short.status = ShortStatus.failed
short.error_message = str(exc)[:2000]
session.commit()
elapsed_preset = time.monotonic() - preset_start
logger.error(
"Short failed: highlight=%s preset=%s "
"error=%s elapsed=%.1fs",
highlight_candidate_id, preset.value,
str(exc)[:500], elapsed_preset,
)
finally:
# Clean up temp file
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass
elapsed = time.monotonic() - start
logger.info(
"Shorts generation complete for highlight=%s in %.1fs",
highlight_candidate_id, elapsed,
)
return highlight_candidate_id
except Exception as exc:
session.rollback()
logger.error(
"Shorts generation failed for highlight=%s: %s",
highlight_candidate_id, exc,
)
raise self.retry(exc=exc)
finally:
session.close()