Auto-mode commit 7aa33cd accidentally deleted 78 files (14,814 lines) during M005
execution. Subsequent commits rebuilt some frontend files but backend/, alembic/,
tests/, whisper/, docker configs, and prompts were never restored in this repo.
This commit restores the full project tree by syncing from ub01's working directory,
which has all M001-M007 features running in production containers.
Restored: backend/ (config, models, routers, database, redis, search_service, worker),
alembic/ (6 migrations), docker/ (Dockerfiles, nginx, compose), prompts/ (4 stages),
tests/, whisper/, README.md, .env.example, chrysopedia-spec.md
393 lines
12 KiB
Python
393 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Chrysopedia — Whisper Transcription Script
|
|
|
|
Desktop transcription tool for extracting timestamped text from video files
|
|
using OpenAI's Whisper model (large-v3). Designed to run on a machine with
|
|
an NVIDIA GPU (e.g., RTX 4090).
|
|
|
|
Outputs JSON matching the Chrysopedia spec format:
|
|
{
|
|
"source_file": "filename.mp4",
|
|
"creator_folder": "CreatorName",
|
|
"duration_seconds": 7243,
|
|
"segments": [
|
|
{
|
|
"start": 0.0,
|
|
"end": 4.52,
|
|
"text": "...",
|
|
"words": [{"word": "Hey", "start": 0.0, "end": 0.28}, ...]
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
|
|
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
|
|
logger = logging.getLogger("chrysopedia.transcribe")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SUPPORTED_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv"}
|
|
DEFAULT_MODEL = "large-v3"
|
|
DEFAULT_DEVICE = "cuda"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def check_ffmpeg() -> bool:
|
|
"""Return True if ffmpeg is available on PATH."""
|
|
return shutil.which("ffmpeg") is not None
|
|
|
|
|
|
def get_audio_duration(video_path: Path) -> float | None:
|
|
"""Use ffprobe to get duration in seconds. Returns None on failure."""
|
|
ffprobe = shutil.which("ffprobe")
|
|
if ffprobe is None:
|
|
return None
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
ffprobe,
|
|
"-v", "error",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
str(video_path),
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
return float(result.stdout.strip())
|
|
except (subprocess.TimeoutExpired, ValueError, OSError) as exc:
|
|
logger.warning("Could not determine duration for %s: %s", video_path.name, exc)
|
|
return None
|
|
|
|
|
|
def extract_audio(video_path: Path, audio_path: Path) -> None:
|
|
"""Extract audio from video to 16kHz mono WAV using ffmpeg."""
|
|
logger.info("Extracting audio: %s -> %s", video_path.name, audio_path.name)
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i", str(video_path),
|
|
"-vn", # no video
|
|
"-acodec", "pcm_s16le", # 16-bit PCM
|
|
"-ar", "16000", # 16kHz (Whisper expects this)
|
|
"-ac", "1", # mono
|
|
"-y", # overwrite
|
|
str(audio_path),
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(
|
|
f"ffmpeg audio extraction failed (exit {result.returncode}): {result.stderr[:500]}"
|
|
)
|
|
|
|
|
|
def transcribe_audio(
|
|
audio_path: Path,
|
|
model_name: str = DEFAULT_MODEL,
|
|
device: str = DEFAULT_DEVICE,
|
|
) -> dict:
|
|
"""Run Whisper on the audio file and return the raw result dict."""
|
|
# Import whisper here so --help works without the dependency installed
|
|
try:
|
|
import whisper # type: ignore[import-untyped]
|
|
except ImportError:
|
|
logger.error(
|
|
"openai-whisper is not installed. "
|
|
"Install it with: pip install openai-whisper"
|
|
)
|
|
sys.exit(1)
|
|
|
|
logger.info("Loading Whisper model '%s' on device '%s'...", model_name, device)
|
|
t0 = time.time()
|
|
model = whisper.load_model(model_name, device=device)
|
|
logger.info("Model loaded in %.1f s", time.time() - t0)
|
|
|
|
logger.info("Transcribing %s ...", audio_path.name)
|
|
t0 = time.time()
|
|
result = model.transcribe(
|
|
str(audio_path),
|
|
word_timestamps=True,
|
|
verbose=False,
|
|
)
|
|
elapsed = time.time() - t0
|
|
logger.info(
|
|
"Transcription complete in %.1f s (%.1fx real-time)",
|
|
elapsed,
|
|
(result.get("duration", elapsed) / elapsed) if elapsed > 0 else 0,
|
|
)
|
|
return result
|
|
|
|
|
|
def format_output(
|
|
whisper_result: dict,
|
|
source_file: str,
|
|
creator_folder: str,
|
|
duration_seconds: float | None,
|
|
) -> dict:
|
|
"""Convert Whisper result to the Chrysopedia spec JSON format."""
|
|
segments = []
|
|
for seg in whisper_result.get("segments", []):
|
|
words = []
|
|
for w in seg.get("words", []):
|
|
words.append(
|
|
{
|
|
"word": w.get("word", "").strip(),
|
|
"start": round(w.get("start", 0.0), 2),
|
|
"end": round(w.get("end", 0.0), 2),
|
|
}
|
|
)
|
|
segments.append(
|
|
{
|
|
"start": round(seg.get("start", 0.0), 2),
|
|
"end": round(seg.get("end", 0.0), 2),
|
|
"text": seg.get("text", "").strip(),
|
|
"words": words,
|
|
}
|
|
)
|
|
|
|
# Use duration from ffprobe if available, otherwise from whisper
|
|
if duration_seconds is None:
|
|
duration_seconds = whisper_result.get("duration", 0.0)
|
|
|
|
return {
|
|
"source_file": source_file,
|
|
"creator_folder": creator_folder,
|
|
"duration_seconds": round(duration_seconds),
|
|
"segments": segments,
|
|
}
|
|
|
|
|
|
def infer_creator_folder(video_path: Path) -> str:
|
|
"""
|
|
Infer creator folder name from directory structure.
|
|
|
|
Expected layout: /path/to/<CreatorName>/video.mp4
|
|
Falls back to parent directory name.
|
|
"""
|
|
return video_path.parent.name
|
|
|
|
|
|
def output_path_for(video_path: Path, output_dir: Path) -> Path:
|
|
"""Compute the output JSON path for a given video file."""
|
|
return output_dir / f"{video_path.stem}.json"
|
|
|
|
|
|
def process_single(
|
|
video_path: Path,
|
|
output_dir: Path,
|
|
model_name: str,
|
|
device: str,
|
|
creator_folder: str | None = None,
|
|
) -> Path | None:
|
|
"""
|
|
Process a single video file. Returns the output path on success, None if skipped.
|
|
"""
|
|
out_path = output_path_for(video_path, output_dir)
|
|
|
|
# Resumability: skip if output already exists
|
|
if out_path.exists():
|
|
logger.info("SKIP (output exists): %s", out_path)
|
|
return None
|
|
|
|
logger.info("Processing: %s", video_path)
|
|
|
|
# Determine creator folder
|
|
folder = creator_folder or infer_creator_folder(video_path)
|
|
|
|
# Get duration via ffprobe
|
|
duration = get_audio_duration(video_path)
|
|
if duration is not None:
|
|
logger.info("Video duration: %.0f s (%.1f min)", duration, duration / 60)
|
|
|
|
# Extract audio to temp file
|
|
with tempfile.TemporaryDirectory(prefix="chrysopedia_") as tmpdir:
|
|
audio_path = Path(tmpdir) / "audio.wav"
|
|
extract_audio(video_path, audio_path)
|
|
|
|
# Transcribe
|
|
whisper_result = transcribe_audio(audio_path, model_name, device)
|
|
|
|
# Format and write output
|
|
output = format_output(whisper_result, video_path.name, folder, duration)
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(out_path, "w", encoding="utf-8") as f:
|
|
json.dump(output, f, indent=2, ensure_ascii=False)
|
|
|
|
segment_count = len(output["segments"])
|
|
logger.info("Wrote %s (%d segments)", out_path, segment_count)
|
|
return out_path
|
|
|
|
|
|
def find_videos(input_path: Path) -> list[Path]:
|
|
"""Find all supported video files in a directory (non-recursive)."""
|
|
videos = sorted(
|
|
p for p in input_path.iterdir()
|
|
if p.is_file() and p.suffix.lower() in SUPPORTED_EXTENSIONS
|
|
)
|
|
return videos
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="transcribe",
|
|
description=(
|
|
"Chrysopedia Whisper Transcription — extract timestamped transcripts "
|
|
"from video files using OpenAI's Whisper model."
|
|
),
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=(
|
|
"Examples:\n"
|
|
" # Single file\n"
|
|
" python transcribe.py --input video.mp4 --output-dir ./transcripts\n"
|
|
"\n"
|
|
" # Batch mode (all videos in directory)\n"
|
|
" python transcribe.py --input ./videos/ --output-dir ./transcripts\n"
|
|
"\n"
|
|
" # Use a smaller model on CPU\n"
|
|
" python transcribe.py --input video.mp4 --model base --device cpu\n"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--input",
|
|
required=True,
|
|
type=str,
|
|
help="Path to a video file or directory of video files",
|
|
)
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
required=True,
|
|
type=str,
|
|
help="Directory to write transcript JSON files",
|
|
)
|
|
parser.add_argument(
|
|
"--model",
|
|
default=DEFAULT_MODEL,
|
|
type=str,
|
|
help=f"Whisper model name (default: {DEFAULT_MODEL})",
|
|
)
|
|
parser.add_argument(
|
|
"--device",
|
|
default=DEFAULT_DEVICE,
|
|
type=str,
|
|
help=f"Compute device: cuda, cpu (default: {DEFAULT_DEVICE})",
|
|
)
|
|
parser.add_argument(
|
|
"--creator",
|
|
default=None,
|
|
type=str,
|
|
help="Override creator folder name (default: inferred from parent directory)",
|
|
)
|
|
parser.add_argument(
|
|
"-v", "--verbose",
|
|
action="store_true",
|
|
help="Enable debug logging",
|
|
)
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Validate ffmpeg availability
|
|
if not check_ffmpeg():
|
|
logger.error(
|
|
"ffmpeg is not installed or not on PATH. "
|
|
"Install it with: sudo apt install ffmpeg (or equivalent)"
|
|
)
|
|
return 1
|
|
|
|
input_path = Path(args.input).resolve()
|
|
output_dir = Path(args.output_dir).resolve()
|
|
|
|
if not input_path.exists():
|
|
logger.error("Input path does not exist: %s", input_path)
|
|
return 1
|
|
|
|
# Single file mode
|
|
if input_path.is_file():
|
|
if input_path.suffix.lower() not in SUPPORTED_EXTENSIONS:
|
|
logger.error(
|
|
"Unsupported file type '%s'. Supported: %s",
|
|
input_path.suffix,
|
|
", ".join(sorted(SUPPORTED_EXTENSIONS)),
|
|
)
|
|
return 1
|
|
result = process_single(
|
|
input_path, output_dir, args.model, args.device, args.creator
|
|
)
|
|
if result is None:
|
|
logger.info("Nothing to do (output already exists).")
|
|
return 0
|
|
|
|
# Batch mode (directory)
|
|
if input_path.is_dir():
|
|
videos = find_videos(input_path)
|
|
if not videos:
|
|
logger.warning("No supported video files found in %s", input_path)
|
|
return 0
|
|
|
|
logger.info("Found %d video(s) in %s", len(videos), input_path)
|
|
processed = 0
|
|
skipped = 0
|
|
failed = 0
|
|
|
|
for i, video in enumerate(videos, 1):
|
|
logger.info("--- [%d/%d] %s ---", i, len(videos), video.name)
|
|
try:
|
|
result = process_single(
|
|
video, output_dir, args.model, args.device, args.creator
|
|
)
|
|
if result is not None:
|
|
processed += 1
|
|
else:
|
|
skipped += 1
|
|
except Exception:
|
|
logger.exception("FAILED: %s", video.name)
|
|
failed += 1
|
|
|
|
logger.info(
|
|
"Batch complete: %d processed, %d skipped, %d failed",
|
|
processed, skipped, failed,
|
|
)
|
|
return 1 if failed > 0 else 0
|
|
|
|
logger.error("Input is neither a file nor a directory: %s", input_path)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|