From 0007528e770c6bee0b3d341746fffa5f6154b7bd Mon Sep 17 00:00:00 2001 From: jlightner Date: Sat, 4 Apr 2026 09:47:40 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Added=20shorts=5Fgenerator.py=20with=20?= =?UTF-8?q?3=20format=20presets=20and=20stage=5Fgenerat=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/pipeline/shorts_generator.py" - "backend/pipeline/stages.py" GSD-Task: S03/T02 --- .gsd/milestones/M023/slices/S03/S03-PLAN.md | 2 +- .../M023/slices/S03/tasks/T01-VERIFY.json | 30 +++ .../M023/slices/S03/tasks/T02-SUMMARY.md | 83 ++++++++ backend/pipeline/shorts_generator.py | 132 ++++++++++++ backend/pipeline/stages.py | 189 ++++++++++++++++++ 5 files changed, 435 insertions(+), 1 deletion(-) create mode 100644 .gsd/milestones/M023/slices/S03/tasks/T01-VERIFY.json create mode 100644 .gsd/milestones/M023/slices/S03/tasks/T02-SUMMARY.md create mode 100644 backend/pipeline/shorts_generator.py diff --git a/.gsd/milestones/M023/slices/S03/S03-PLAN.md b/.gsd/milestones/M023/slices/S03/S03-PLAN.md index 47024d7..fb5d5f3 100644 --- a/.gsd/milestones/M023/slices/S03/S03-PLAN.md +++ b/.gsd/milestones/M023/slices/S03/S03-PLAN.md @@ -52,7 +52,7 @@ Set up all infrastructure for the shorts pipeline: new SQLAlchemy model, Alembic - Estimate: 30m - Files: backend/models.py, backend/config.py, docker/Dockerfile.api, docker-compose.yml, alembic/versions/025_add_generated_shorts.py - Verify: cd backend && python -c "from models import GeneratedShort, FormatPreset, ShortStatus; print('OK')" && grep -q ffmpeg ../docker/Dockerfile.api && grep -q video_source_path config.py -- [ ] **T02: Build ffmpeg clip generator module and Celery task with MinIO upload** — ## Description +- [x] **T02: Added shorts_generator.py with 3 format presets and stage_generate_shorts Celery task with MinIO upload and per-preset error handling** — ## Description Create the pure ffmpeg wrapper module with 3 format presets, then wire a Celery task that reads an approved highlight, resolves the video file path, generates clips for each preset, uploads to MinIO, and updates DB status. diff --git a/.gsd/milestones/M023/slices/S03/tasks/T01-VERIFY.json b/.gsd/milestones/M023/slices/S03/tasks/T01-VERIFY.json new file mode 100644 index 0000000..8def520 --- /dev/null +++ b/.gsd/milestones/M023/slices/S03/tasks/T01-VERIFY.json @@ -0,0 +1,30 @@ +{ + "schemaVersion": 1, + "taskId": "T01", + "unitId": "M023/S03/T01", + "timestamp": 1775295816039, + "passed": false, + "discoverySource": "task-plan", + "checks": [ + { + "command": "cd backend", + "exitCode": 0, + "durationMs": 9, + "verdict": "pass" + }, + { + "command": "grep -q ffmpeg ../docker/Dockerfile.api", + "exitCode": 2, + "durationMs": 9, + "verdict": "fail" + }, + { + "command": "grep -q video_source_path config.py", + "exitCode": 2, + "durationMs": 10, + "verdict": "fail" + } + ], + "retryAttempt": 1, + "maxRetries": 2 +} diff --git a/.gsd/milestones/M023/slices/S03/tasks/T02-SUMMARY.md b/.gsd/milestones/M023/slices/S03/tasks/T02-SUMMARY.md new file mode 100644 index 0000000..3dbb12a --- /dev/null +++ b/.gsd/milestones/M023/slices/S03/tasks/T02-SUMMARY.md @@ -0,0 +1,83 @@ +--- +id: T02 +parent: S03 +milestone: M023 +provides: [] +requires: [] +affects: [] +key_files: ["backend/pipeline/shorts_generator.py", "backend/pipeline/stages.py"] +key_decisions: ["Lazy imports inside Celery task for shorts_generator and model types to avoid circular imports", "Per-preset independent processing with isolated error handling"] +patterns_established: [] +drill_down_paths: [] +observability_surfaces: [] +duration: "" +verification_result: "All task and slice verification checks pass: shorts_generator module imports OK, stage_generate_shorts task imports and registers in Celery OK, model imports OK, ffmpeg in Dockerfile confirmed, video_source_path in config confirmed, chrysopedia_videos volume mount in docker-compose.yml confirmed." +completed_at: 2026-04-04T09:47:33.243Z +blocker_discovered: false +--- + +# T02: Added shorts_generator.py with 3 format presets and stage_generate_shorts Celery task with MinIO upload and per-preset error handling + +> Added shorts_generator.py with 3 format presets and stage_generate_shorts Celery task with MinIO upload and per-preset error handling + +## What Happened +--- +id: T02 +parent: S03 +milestone: M023 +key_files: + - backend/pipeline/shorts_generator.py + - backend/pipeline/stages.py +key_decisions: + - Lazy imports inside Celery task for shorts_generator and model types to avoid circular imports + - Per-preset independent processing with isolated error handling +duration: "" +verification_result: passed +completed_at: 2026-04-04T09:47:33.246Z +blocker_discovered: false +--- + +# T02: Added shorts_generator.py with 3 format presets and stage_generate_shorts Celery task with MinIO upload and per-preset error handling + +**Added shorts_generator.py with 3 format presets and stage_generate_shorts Celery task with MinIO upload and per-preset error handling** + +## What Happened + +Created backend/pipeline/shorts_generator.py with PRESETS dict (vertical 1080x1920, square 1080x1080, horizontal 1920x1080), extract_clip() using ffmpeg subprocess with 300s timeout, and resolve_video_path() with file existence validation. Added stage_generate_shorts Celery task to stages.py that loads an approved HighlightCandidate, resolves the video file, and processes each FormatPreset independently — creating GeneratedShort rows, extracting clips to /tmp, uploading to MinIO, and updating status. Each preset failure is isolated; temp files are cleaned in finally blocks. + +## Verification + +All task and slice verification checks pass: shorts_generator module imports OK, stage_generate_shorts task imports and registers in Celery OK, model imports OK, ffmpeg in Dockerfile confirmed, video_source_path in config confirmed, chrysopedia_videos volume mount in docker-compose.yml confirmed. + +## Verification Evidence + +| # | Command | Exit Code | Verdict | Duration | +|---|---------|-----------|---------|----------| +| 1 | `cd backend && python -c "from pipeline.shorts_generator import extract_clip, PRESETS, resolve_video_path; print('OK')"` | 0 | ✅ pass | 500ms | +| 2 | `cd backend && python -c "from pipeline.stages import stage_generate_shorts; print('OK')"` | 0 | ✅ pass | 800ms | +| 3 | `grep -q 'stage_generate_shorts' backend/pipeline/stages.py` | 0 | ✅ pass | 50ms | +| 4 | `cd backend && python -c "from models import GeneratedShort, FormatPreset, ShortStatus; print('OK')"` | 0 | ✅ pass | 500ms | +| 5 | `grep ffmpeg docker/Dockerfile.api` | 0 | ✅ pass | 50ms | +| 6 | `grep video_source_path backend/config.py` | 0 | ✅ pass | 50ms | +| 7 | `grep chrysopedia_videos docker-compose.yml` | 0 | ✅ pass | 50ms | + + +## Deviations + +None. + +## Known Issues + +None. + +## Files Created/Modified + +- `backend/pipeline/shorts_generator.py` +- `backend/pipeline/stages.py` + + +## Deviations +None. + +## Known Issues +None. diff --git a/backend/pipeline/shorts_generator.py b/backend/pipeline/shorts_generator.py new file mode 100644 index 0000000..b5f63a3 --- /dev/null +++ b/backend/pipeline/shorts_generator.py @@ -0,0 +1,132 @@ +"""FFmpeg clip extraction with format presets for shorts generation. + +Pure functions — no DB access, no Celery dependency. Tested independently. +""" + +from __future__ import annotations + +import logging +import subprocess +from dataclasses import dataclass +from pathlib import Path + +from models import FormatPreset + +logger = logging.getLogger(__name__) + +FFMPEG_TIMEOUT_SECS = 300 + + +@dataclass(frozen=True) +class PresetSpec: + """Resolution and ffmpeg video filter for a format preset.""" + width: int + height: int + vf_filter: str + + +PRESETS: dict[FormatPreset, PresetSpec] = { + FormatPreset.vertical: PresetSpec( + width=1080, + height=1920, + vf_filter="scale=1080:-2,pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black", + ), + FormatPreset.square: PresetSpec( + width=1080, + height=1080, + vf_filter="crop=min(iw\\,ih):min(iw\\,ih),scale=1080:1080", + ), + FormatPreset.horizontal: PresetSpec( + width=1920, + height=1080, + vf_filter="scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2:black", + ), +} + + +def resolve_video_path(video_source_root: str, file_path: str) -> Path: + """Join root + relative path and validate the file exists. + + Args: + video_source_root: Base directory for video files (e.g. /videos). + file_path: Relative path stored in SourceVideo.file_path. + + Returns: + Resolved absolute Path. + + Raises: + FileNotFoundError: If the resolved path doesn't exist or isn't a file. + """ + resolved = Path(video_source_root) / file_path + if not resolved.is_file(): + raise FileNotFoundError( + f"Video file not found: {resolved} " + f"(root={video_source_root!r}, relative={file_path!r})" + ) + return resolved + + +def extract_clip( + input_path: Path | str, + output_path: Path | str, + start_secs: float, + end_secs: float, + vf_filter: str, +) -> None: + """Extract a clip from a video file using ffmpeg. + + Seeks to *start_secs*, encodes until *end_secs*, and applies *vf_filter*. + Uses ``-c:v libx264 -preset fast -crf 23`` for reasonable quality/speed. + + Args: + input_path: Source video file. + output_path: Destination mp4 file (parent dir must exist). + start_secs: Start time in seconds. + end_secs: End time in seconds. + vf_filter: ffmpeg ``-vf`` filter string. + + Raises: + subprocess.CalledProcessError: If ffmpeg exits non-zero. + subprocess.TimeoutExpired: If ffmpeg exceeds the timeout. + ValueError: If start >= end. + """ + duration = end_secs - start_secs + if duration <= 0: + raise ValueError( + f"Invalid clip range: start={start_secs}s end={end_secs}s " + f"(duration={duration}s)" + ) + + cmd = [ + "ffmpeg", + "-y", # overwrite output + "-ss", str(start_secs), # seek before input (fast) + "-i", str(input_path), + "-t", str(duration), + "-vf", vf_filter, + "-c:v", "libx264", + "-preset", "fast", + "-crf", "23", + "-c:a", "aac", + "-b:a", "128k", + "-movflags", "+faststart", # web-friendly mp4 + str(output_path), + ] + + logger.info( + "ffmpeg: extracting %.1fs clip from %s → %s", + duration, input_path, output_path, + ) + + result = subprocess.run( + cmd, + capture_output=True, + timeout=FFMPEG_TIMEOUT_SECS, + ) + + if result.returncode != 0: + stderr_text = result.stderr.decode("utf-8", errors="replace")[-2000:] + logger.error("ffmpeg failed (rc=%d): %s", result.returncode, stderr_text) + raise subprocess.CalledProcessError( + result.returncode, cmd, output=result.stdout, stderr=result.stderr, + ) diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index 5f7e895..d7cc032 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -2860,3 +2860,192 @@ def extract_personality_profile(self, creator_id: str) -> str: raise self.retry(exc=exc) finally: session.close() + + +# ── Stage: Shorts Generation ───────────────────────────────────────────────── + +@celery_app.task(bind=True, max_retries=1, default_retry_delay=60) +def stage_generate_shorts(self, highlight_candidate_id: str) -> str: + """Generate video shorts for an approved highlight candidate. + + Creates one GeneratedShort row per FormatPreset, extracts the clip via + ffmpeg, uploads to MinIO, and updates status. Each preset is independent — + a failure on one does not block the others. + + Returns the highlight_candidate_id on completion. + """ + from pipeline.shorts_generator import PRESETS, extract_clip, resolve_video_path + from models import FormatPreset, GeneratedShort, ShortStatus + + start = time.monotonic() + session = _get_sync_session() + settings = get_settings() + + try: + # ── Load highlight with joined relations ──────────────────────── + highlight = session.execute( + select(HighlightCandidate) + .where(HighlightCandidate.id == highlight_candidate_id) + ).scalar_one_or_none() + + if highlight is None: + logger.error( + "Highlight candidate not found: %s", highlight_candidate_id, + ) + return highlight_candidate_id + + if highlight.status.value != "approved": + logger.warning( + "Highlight %s status is %s, expected approved — skipping", + highlight_candidate_id, highlight.status.value, + ) + return highlight_candidate_id + + # Check for already-processing shorts (reject duplicate runs) + existing_processing = session.execute( + select(func.count()) + .where(GeneratedShort.highlight_candidate_id == highlight_candidate_id) + .where(GeneratedShort.status == ShortStatus.processing) + ).scalar() + if existing_processing and existing_processing > 0: + logger.warning( + "Highlight %s already has %d processing shorts — rejecting duplicate", + highlight_candidate_id, existing_processing, + ) + return highlight_candidate_id + + # Eager-load relations + key_moment = highlight.key_moment + source_video = highlight.source_video + + # ── Resolve video file path ───────────────────────────────────── + try: + video_path = resolve_video_path( + settings.video_source_path, source_video.file_path, + ) + except FileNotFoundError as fnf: + logger.error( + "Video file missing for highlight %s: %s", + highlight_candidate_id, fnf, + ) + # Mark all presets as failed + for preset in FormatPreset: + spec = PRESETS[preset] + short = GeneratedShort( + highlight_candidate_id=highlight_candidate_id, + format_preset=preset, + width=spec.width, + height=spec.height, + status=ShortStatus.failed, + error_message=str(fnf), + ) + session.add(short) + session.commit() + return highlight_candidate_id + + # ── Compute effective start/end (trim overrides) ──────────────── + clip_start = highlight.trim_start if highlight.trim_start is not None else key_moment.start_time + clip_end = highlight.trim_end if highlight.trim_end is not None else key_moment.end_time + + logger.info( + "Generating shorts for highlight=%s video=%s [%.1f–%.1f]s", + highlight_candidate_id, source_video.file_path, + clip_start, clip_end, + ) + + # ── Process each preset independently ─────────────────────────── + for preset in FormatPreset: + spec = PRESETS[preset] + preset_start = time.monotonic() + + # Create DB row (status=processing) + short = GeneratedShort( + highlight_candidate_id=highlight_candidate_id, + format_preset=preset, + width=spec.width, + height=spec.height, + status=ShortStatus.processing, + duration_secs=clip_end - clip_start, + ) + session.add(short) + session.commit() + session.refresh(short) + + tmp_path = Path(f"/tmp/short_{short.id}_{preset.value}.mp4") + minio_key = f"shorts/{highlight_candidate_id}/{preset.value}.mp4" + + try: + # Extract clip + extract_clip( + input_path=video_path, + output_path=tmp_path, + start_secs=clip_start, + end_secs=clip_end, + vf_filter=spec.vf_filter, + ) + + # Upload to MinIO + file_size = tmp_path.stat().st_size + with open(tmp_path, "rb") as f: + from minio_client import upload_file + upload_file( + object_key=minio_key, + data=f, + length=file_size, + content_type="video/mp4", + ) + + # Update DB row — complete + short.status = ShortStatus.complete + short.file_size_bytes = file_size + short.minio_object_key = minio_key + session.commit() + + elapsed_preset = time.monotonic() - preset_start + logger.info( + "Short generated: highlight=%s preset=%s " + "size=%d bytes duration=%.1fs elapsed=%.1fs", + highlight_candidate_id, preset.value, + file_size, clip_end - clip_start, elapsed_preset, + ) + + except Exception as exc: + session.rollback() + # Re-fetch the short row after rollback + session.refresh(short) + short.status = ShortStatus.failed + short.error_message = str(exc)[:2000] + session.commit() + + elapsed_preset = time.monotonic() - preset_start + logger.error( + "Short failed: highlight=%s preset=%s " + "error=%s elapsed=%.1fs", + highlight_candidate_id, preset.value, + str(exc)[:500], elapsed_preset, + ) + + finally: + # Clean up temp file + if tmp_path.exists(): + try: + tmp_path.unlink() + except OSError: + pass + + elapsed = time.monotonic() - start + logger.info( + "Shorts generation complete for highlight=%s in %.1fs", + highlight_candidate_id, elapsed, + ) + return highlight_candidate_id + + except Exception as exc: + session.rollback() + logger.error( + "Shorts generation failed for highlight=%s: %s", + highlight_candidate_id, exc, + ) + raise self.retry(exc=exc) + finally: + session.close()