- "backend/pipeline/card_renderer.py" - "backend/pipeline/shorts_generator.py" - "backend/pipeline/stages.py" - "backend/models.py" - "alembic/versions/028_add_shorts_template.py" - "backend/pipeline/test_card_renderer.py" GSD-Task: S04/T02
222 lines
6.8 KiB
Python
222 lines
6.8 KiB
Python
"""FFmpeg clip extraction with format presets for shorts generation.
|
|
|
|
Pure functions — no DB access, no Celery dependency. Tested independently.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from models import FormatPreset
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FFMPEG_TIMEOUT_SECS = 300
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PresetSpec:
|
|
"""Resolution and ffmpeg video filter for a format preset."""
|
|
width: int
|
|
height: int
|
|
vf_filter: str
|
|
|
|
|
|
PRESETS: dict[FormatPreset, PresetSpec] = {
|
|
FormatPreset.vertical: PresetSpec(
|
|
width=1080,
|
|
height=1920,
|
|
vf_filter="scale=1080:-2,pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black",
|
|
),
|
|
FormatPreset.square: PresetSpec(
|
|
width=1080,
|
|
height=1080,
|
|
vf_filter="crop=min(iw\\,ih):min(iw\\,ih),scale=1080:1080",
|
|
),
|
|
FormatPreset.horizontal: PresetSpec(
|
|
width=1920,
|
|
height=1080,
|
|
vf_filter="scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2:black",
|
|
),
|
|
}
|
|
|
|
|
|
def resolve_video_path(video_source_root: str, file_path: str) -> Path:
|
|
"""Join root + relative path and validate the file exists.
|
|
|
|
Args:
|
|
video_source_root: Base directory for video files (e.g. /videos).
|
|
file_path: Relative path stored in SourceVideo.file_path.
|
|
|
|
Returns:
|
|
Resolved absolute Path.
|
|
|
|
Raises:
|
|
FileNotFoundError: If the resolved path doesn't exist or isn't a file.
|
|
"""
|
|
resolved = Path(video_source_root) / file_path
|
|
if not resolved.is_file():
|
|
raise FileNotFoundError(
|
|
f"Video file not found: {resolved} "
|
|
f"(root={video_source_root!r}, relative={file_path!r})"
|
|
)
|
|
return resolved
|
|
|
|
|
|
def extract_clip(
|
|
input_path: Path | str,
|
|
output_path: Path | str,
|
|
start_secs: float,
|
|
end_secs: float,
|
|
vf_filter: str,
|
|
ass_path: Path | str | None = None,
|
|
) -> None:
|
|
"""Extract a clip from a video file using ffmpeg.
|
|
|
|
Seeks to *start_secs*, encodes until *end_secs*, and applies *vf_filter*.
|
|
Uses ``-c:v libx264 -preset fast -crf 23`` for reasonable quality/speed.
|
|
|
|
When *ass_path* is provided, the ASS subtitle filter is appended to the
|
|
video filter chain so that captions are burned into the output video.
|
|
|
|
Args:
|
|
input_path: Source video file.
|
|
output_path: Destination mp4 file (parent dir must exist).
|
|
start_secs: Start time in seconds.
|
|
end_secs: End time in seconds.
|
|
vf_filter: ffmpeg ``-vf`` filter string.
|
|
ass_path: Optional path to an ASS subtitle file. When provided,
|
|
``ass=<path>`` is appended to the filter chain.
|
|
|
|
Raises:
|
|
subprocess.CalledProcessError: If ffmpeg exits non-zero.
|
|
subprocess.TimeoutExpired: If ffmpeg exceeds the timeout.
|
|
ValueError: If start >= end.
|
|
"""
|
|
duration = end_secs - start_secs
|
|
if duration <= 0:
|
|
raise ValueError(
|
|
f"Invalid clip range: start={start_secs}s end={end_secs}s "
|
|
f"(duration={duration}s)"
|
|
)
|
|
|
|
# Build the video filter chain — ASS burn-in comes after scale/pad
|
|
effective_vf = vf_filter
|
|
if ass_path is not None:
|
|
# Escape colons and backslashes in the path for ffmpeg filter syntax
|
|
escaped = str(ass_path).replace("\\", "\\\\").replace(":", "\\:")
|
|
effective_vf = f"{vf_filter},ass={escaped}"
|
|
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y", # overwrite output
|
|
"-ss", str(start_secs), # seek before input (fast)
|
|
"-i", str(input_path),
|
|
"-t", str(duration),
|
|
"-vf", effective_vf,
|
|
"-c:v", "libx264",
|
|
"-preset", "fast",
|
|
"-crf", "23",
|
|
"-c:a", "aac",
|
|
"-b:a", "128k",
|
|
"-movflags", "+faststart", # web-friendly mp4
|
|
str(output_path),
|
|
]
|
|
|
|
logger.info(
|
|
"ffmpeg: extracting %.1fs clip from %s → %s",
|
|
duration, input_path, output_path,
|
|
)
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
timeout=FFMPEG_TIMEOUT_SECS,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
stderr_text = result.stderr.decode("utf-8", errors="replace")[-2000:]
|
|
logger.error("ffmpeg failed (rc=%d): %s", result.returncode, stderr_text)
|
|
raise subprocess.CalledProcessError(
|
|
result.returncode, cmd, output=result.stdout, stderr=result.stderr,
|
|
)
|
|
|
|
|
|
def extract_clip_with_template(
|
|
input_path: Path | str,
|
|
output_path: Path | str,
|
|
start_secs: float,
|
|
end_secs: float,
|
|
vf_filter: str,
|
|
ass_path: Path | str | None = None,
|
|
intro_path: Path | str | None = None,
|
|
outro_path: Path | str | None = None,
|
|
) -> None:
|
|
"""Extract a clip and optionally prepend/append intro/outro cards.
|
|
|
|
If neither intro nor outro is provided, delegates directly to
|
|
:func:`extract_clip`. When cards are provided, the main clip is
|
|
extracted to a temp file, then all segments are concatenated via
|
|
:func:`~pipeline.card_renderer.concat_segments`.
|
|
|
|
Args:
|
|
input_path: Source video file.
|
|
output_path: Final destination mp4 file.
|
|
start_secs: Start time in seconds.
|
|
end_secs: End time in seconds.
|
|
vf_filter: ffmpeg ``-vf`` filter string.
|
|
ass_path: Optional ASS subtitle file path.
|
|
intro_path: Optional intro card mp4 path.
|
|
outro_path: Optional outro card mp4 path.
|
|
|
|
Raises:
|
|
subprocess.CalledProcessError: If any ffmpeg command fails.
|
|
ValueError: If clip range is invalid.
|
|
"""
|
|
has_cards = intro_path is not None or outro_path is not None
|
|
|
|
if not has_cards:
|
|
# No template cards — simple extraction
|
|
extract_clip(
|
|
input_path=input_path,
|
|
output_path=output_path,
|
|
start_secs=start_secs,
|
|
end_secs=end_secs,
|
|
vf_filter=vf_filter,
|
|
ass_path=ass_path,
|
|
)
|
|
return
|
|
|
|
# Extract main clip to a temp file for concatenation
|
|
main_clip_path = Path(str(output_path) + ".main.mp4")
|
|
try:
|
|
extract_clip(
|
|
input_path=input_path,
|
|
output_path=main_clip_path,
|
|
start_secs=start_secs,
|
|
end_secs=end_secs,
|
|
vf_filter=vf_filter,
|
|
ass_path=ass_path,
|
|
)
|
|
|
|
# Build segment list in order: intro → main → outro
|
|
segments: list[Path] = []
|
|
if intro_path is not None:
|
|
segments.append(Path(intro_path))
|
|
segments.append(main_clip_path)
|
|
if outro_path is not None:
|
|
segments.append(Path(outro_path))
|
|
|
|
from pipeline.card_renderer import concat_segments
|
|
concat_segments(segments=segments, output_path=Path(output_path))
|
|
|
|
finally:
|
|
# Clean up temp main clip
|
|
if main_clip_path.exists():
|
|
try:
|
|
main_clip_path.unlink()
|
|
except OSError:
|
|
pass
|