chrysopedia/whisper/transcribe.py
jlightner 4b0914b12b fix: restore complete project tree from ub01 canonical state
Auto-mode commit 7aa33cd accidentally deleted 78 files (14,814 lines) during M005
execution. Subsequent commits rebuilt some frontend files but backend/, alembic/,
tests/, whisper/, docker configs, and prompts were never restored in this repo.

This commit restores the full project tree by syncing from ub01's working directory,
which has all M001-M007 features running in production containers.

Restored: backend/ (config, models, routers, database, redis, search_service, worker),
alembic/ (6 migrations), docker/ (Dockerfiles, nginx, compose), prompts/ (4 stages),
tests/, whisper/, README.md, .env.example, chrysopedia-spec.md
2026-03-31 02:10:41 +00:00

393 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Chrysopedia — Whisper Transcription Script
Desktop transcription tool for extracting timestamped text from video files
using OpenAI's Whisper model (large-v3). Designed to run on a machine with
an NVIDIA GPU (e.g., RTX 4090).
Outputs JSON matching the Chrysopedia spec format:
{
"source_file": "filename.mp4",
"creator_folder": "CreatorName",
"duration_seconds": 7243,
"segments": [
{
"start": 0.0,
"end": 4.52,
"text": "...",
"words": [{"word": "Hey", "start": 0.0, "end": 0.28}, ...]
}
]
}
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import time
from pathlib import Path
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
logger = logging.getLogger("chrysopedia.transcribe")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SUPPORTED_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv"}
DEFAULT_MODEL = "large-v3"
DEFAULT_DEVICE = "cuda"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def check_ffmpeg() -> bool:
"""Return True if ffmpeg is available on PATH."""
return shutil.which("ffmpeg") is not None
def get_audio_duration(video_path: Path) -> float | None:
"""Use ffprobe to get duration in seconds. Returns None on failure."""
ffprobe = shutil.which("ffprobe")
if ffprobe is None:
return None
try:
result = subprocess.run(
[
ffprobe,
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(video_path),
],
capture_output=True,
text=True,
timeout=30,
)
return float(result.stdout.strip())
except (subprocess.TimeoutExpired, ValueError, OSError) as exc:
logger.warning("Could not determine duration for %s: %s", video_path.name, exc)
return None
def extract_audio(video_path: Path, audio_path: Path) -> None:
"""Extract audio from video to 16kHz mono WAV using ffmpeg."""
logger.info("Extracting audio: %s -> %s", video_path.name, audio_path.name)
cmd = [
"ffmpeg",
"-i", str(video_path),
"-vn", # no video
"-acodec", "pcm_s16le", # 16-bit PCM
"-ar", "16000", # 16kHz (Whisper expects this)
"-ac", "1", # mono
"-y", # overwrite
str(audio_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
raise RuntimeError(
f"ffmpeg audio extraction failed (exit {result.returncode}): {result.stderr[:500]}"
)
def transcribe_audio(
audio_path: Path,
model_name: str = DEFAULT_MODEL,
device: str = DEFAULT_DEVICE,
) -> dict:
"""Run Whisper on the audio file and return the raw result dict."""
# Import whisper here so --help works without the dependency installed
try:
import whisper # type: ignore[import-untyped]
except ImportError:
logger.error(
"openai-whisper is not installed. "
"Install it with: pip install openai-whisper"
)
sys.exit(1)
logger.info("Loading Whisper model '%s' on device '%s'...", model_name, device)
t0 = time.time()
model = whisper.load_model(model_name, device=device)
logger.info("Model loaded in %.1f s", time.time() - t0)
logger.info("Transcribing %s ...", audio_path.name)
t0 = time.time()
result = model.transcribe(
str(audio_path),
word_timestamps=True,
verbose=False,
)
elapsed = time.time() - t0
logger.info(
"Transcription complete in %.1f s (%.1fx real-time)",
elapsed,
(result.get("duration", elapsed) / elapsed) if elapsed > 0 else 0,
)
return result
def format_output(
whisper_result: dict,
source_file: str,
creator_folder: str,
duration_seconds: float | None,
) -> dict:
"""Convert Whisper result to the Chrysopedia spec JSON format."""
segments = []
for seg in whisper_result.get("segments", []):
words = []
for w in seg.get("words", []):
words.append(
{
"word": w.get("word", "").strip(),
"start": round(w.get("start", 0.0), 2),
"end": round(w.get("end", 0.0), 2),
}
)
segments.append(
{
"start": round(seg.get("start", 0.0), 2),
"end": round(seg.get("end", 0.0), 2),
"text": seg.get("text", "").strip(),
"words": words,
}
)
# Use duration from ffprobe if available, otherwise from whisper
if duration_seconds is None:
duration_seconds = whisper_result.get("duration", 0.0)
return {
"source_file": source_file,
"creator_folder": creator_folder,
"duration_seconds": round(duration_seconds),
"segments": segments,
}
def infer_creator_folder(video_path: Path) -> str:
"""
Infer creator folder name from directory structure.
Expected layout: /path/to/<CreatorName>/video.mp4
Falls back to parent directory name.
"""
return video_path.parent.name
def output_path_for(video_path: Path, output_dir: Path) -> Path:
"""Compute the output JSON path for a given video file."""
return output_dir / f"{video_path.stem}.json"
def process_single(
video_path: Path,
output_dir: Path,
model_name: str,
device: str,
creator_folder: str | None = None,
) -> Path | None:
"""
Process a single video file. Returns the output path on success, None if skipped.
"""
out_path = output_path_for(video_path, output_dir)
# Resumability: skip if output already exists
if out_path.exists():
logger.info("SKIP (output exists): %s", out_path)
return None
logger.info("Processing: %s", video_path)
# Determine creator folder
folder = creator_folder or infer_creator_folder(video_path)
# Get duration via ffprobe
duration = get_audio_duration(video_path)
if duration is not None:
logger.info("Video duration: %.0f s (%.1f min)", duration, duration / 60)
# Extract audio to temp file
with tempfile.TemporaryDirectory(prefix="chrysopedia_") as tmpdir:
audio_path = Path(tmpdir) / "audio.wav"
extract_audio(video_path, audio_path)
# Transcribe
whisper_result = transcribe_audio(audio_path, model_name, device)
# Format and write output
output = format_output(whisper_result, video_path.name, folder, duration)
output_dir.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
segment_count = len(output["segments"])
logger.info("Wrote %s (%d segments)", out_path, segment_count)
return out_path
def find_videos(input_path: Path) -> list[Path]:
"""Find all supported video files in a directory (non-recursive)."""
videos = sorted(
p for p in input_path.iterdir()
if p.is_file() and p.suffix.lower() in SUPPORTED_EXTENSIONS
)
return videos
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="transcribe",
description=(
"Chrysopedia Whisper Transcription — extract timestamped transcripts "
"from video files using OpenAI's Whisper model."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" # Single file\n"
" python transcribe.py --input video.mp4 --output-dir ./transcripts\n"
"\n"
" # Batch mode (all videos in directory)\n"
" python transcribe.py --input ./videos/ --output-dir ./transcripts\n"
"\n"
" # Use a smaller model on CPU\n"
" python transcribe.py --input video.mp4 --model base --device cpu\n"
),
)
parser.add_argument(
"--input",
required=True,
type=str,
help="Path to a video file or directory of video files",
)
parser.add_argument(
"--output-dir",
required=True,
type=str,
help="Directory to write transcript JSON files",
)
parser.add_argument(
"--model",
default=DEFAULT_MODEL,
type=str,
help=f"Whisper model name (default: {DEFAULT_MODEL})",
)
parser.add_argument(
"--device",
default=DEFAULT_DEVICE,
type=str,
help=f"Compute device: cuda, cpu (default: {DEFAULT_DEVICE})",
)
parser.add_argument(
"--creator",
default=None,
type=str,
help="Override creator folder name (default: inferred from parent directory)",
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Enable debug logging",
)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Validate ffmpeg availability
if not check_ffmpeg():
logger.error(
"ffmpeg is not installed or not on PATH. "
"Install it with: sudo apt install ffmpeg (or equivalent)"
)
return 1
input_path = Path(args.input).resolve()
output_dir = Path(args.output_dir).resolve()
if not input_path.exists():
logger.error("Input path does not exist: %s", input_path)
return 1
# Single file mode
if input_path.is_file():
if input_path.suffix.lower() not in SUPPORTED_EXTENSIONS:
logger.error(
"Unsupported file type '%s'. Supported: %s",
input_path.suffix,
", ".join(sorted(SUPPORTED_EXTENSIONS)),
)
return 1
result = process_single(
input_path, output_dir, args.model, args.device, args.creator
)
if result is None:
logger.info("Nothing to do (output already exists).")
return 0
# Batch mode (directory)
if input_path.is_dir():
videos = find_videos(input_path)
if not videos:
logger.warning("No supported video files found in %s", input_path)
return 0
logger.info("Found %d video(s) in %s", len(videos), input_path)
processed = 0
skipped = 0
failed = 0
for i, video in enumerate(videos, 1):
logger.info("--- [%d/%d] %s ---", i, len(videos), video.name)
try:
result = process_single(
video, output_dir, args.model, args.device, args.creator
)
if result is not None:
processed += 1
else:
skipped += 1
except Exception:
logger.exception("FAILED: %s", video.name)
failed += 1
logger.info(
"Batch complete: %d processed, %d skipped, %d failed",
processed, skipped, failed,
)
return 1 if failed > 0 else 0
logger.error("Input is neither a file nor a directory: %s", input_path)
return 1
if __name__ == "__main__":
sys.exit(main())