#!/usr/bin/env python3 """ Chrysopedia — Whisper Transcription Script Desktop transcription tool for extracting timestamped text from video files using OpenAI's Whisper model (large-v3). Designed to run on a machine with an NVIDIA GPU (e.g., RTX 4090). Outputs JSON matching the Chrysopedia spec format: { "source_file": "filename.mp4", "creator_folder": "CreatorName", "duration_seconds": 7243, "segments": [ { "start": 0.0, "end": 4.52, "text": "...", "words": [{"word": "Hey", "start": 0.0, "end": 0.28}, ...] } ] } """ from __future__ import annotations import argparse import json import logging import os import shutil import subprocess import sys import tempfile import time from pathlib import Path # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s" logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) logger = logging.getLogger("chrysopedia.transcribe") # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- SUPPORTED_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv"} DEFAULT_MODEL = "large-v3" DEFAULT_DEVICE = "cuda" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def check_ffmpeg() -> bool: """Return True if ffmpeg is available on PATH.""" return shutil.which("ffmpeg") is not None def get_audio_duration(video_path: Path) -> float | None: """Use ffprobe to get duration in seconds. Returns None on failure.""" ffprobe = shutil.which("ffprobe") if ffprobe is None: return None try: result = subprocess.run( [ ffprobe, "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(video_path), ], capture_output=True, text=True, timeout=30, ) return float(result.stdout.strip()) except (subprocess.TimeoutExpired, ValueError, OSError) as exc: logger.warning("Could not determine duration for %s: %s", video_path.name, exc) return None def extract_audio(video_path: Path, audio_path: Path) -> None: """Extract audio from video to 16kHz mono WAV using ffmpeg.""" logger.info("Extracting audio: %s -> %s", video_path.name, audio_path.name) cmd = [ "ffmpeg", "-i", str(video_path), "-vn", # no video "-acodec", "pcm_s16le", # 16-bit PCM "-ar", "16000", # 16kHz (Whisper expects this) "-ac", "1", # mono "-y", # overwrite str(audio_path), ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) if result.returncode != 0: raise RuntimeError( f"ffmpeg audio extraction failed (exit {result.returncode}): {result.stderr[:500]}" ) def transcribe_audio( audio_path: Path, model_name: str = DEFAULT_MODEL, device: str = DEFAULT_DEVICE, ) -> dict: """Run Whisper on the audio file and return the raw result dict.""" # Import whisper here so --help works without the dependency installed try: import whisper # type: ignore[import-untyped] except ImportError: logger.error( "openai-whisper is not installed. " "Install it with: pip install openai-whisper" ) sys.exit(1) logger.info("Loading Whisper model '%s' on device '%s'...", model_name, device) t0 = time.time() model = whisper.load_model(model_name, device=device) logger.info("Model loaded in %.1f s", time.time() - t0) logger.info("Transcribing %s ...", audio_path.name) t0 = time.time() result = model.transcribe( str(audio_path), word_timestamps=True, verbose=False, ) elapsed = time.time() - t0 logger.info( "Transcription complete in %.1f s (%.1fx real-time)", elapsed, (result.get("duration", elapsed) / elapsed) if elapsed > 0 else 0, ) return result def format_output( whisper_result: dict, source_file: str, creator_folder: str, duration_seconds: float | None, ) -> dict: """Convert Whisper result to the Chrysopedia spec JSON format.""" segments = [] for seg in whisper_result.get("segments", []): words = [] for w in seg.get("words", []): words.append( { "word": w.get("word", "").strip(), "start": round(w.get("start", 0.0), 2), "end": round(w.get("end", 0.0), 2), } ) segments.append( { "start": round(seg.get("start", 0.0), 2), "end": round(seg.get("end", 0.0), 2), "text": seg.get("text", "").strip(), "words": words, } ) # Use duration from ffprobe if available, otherwise from whisper if duration_seconds is None: duration_seconds = whisper_result.get("duration", 0.0) return { "source_file": source_file, "creator_folder": creator_folder, "duration_seconds": round(duration_seconds), "segments": segments, } def infer_creator_folder(video_path: Path) -> str: """ Infer creator folder name from directory structure. Expected layout: /path/to//video.mp4 Falls back to parent directory name. """ return video_path.parent.name def output_path_for(video_path: Path, output_dir: Path) -> Path: """Compute the output JSON path for a given video file.""" return output_dir / f"{video_path.stem}.json" def process_single( video_path: Path, output_dir: Path, model_name: str, device: str, creator_folder: str | None = None, ) -> Path | None: """ Process a single video file. Returns the output path on success, None if skipped. """ out_path = output_path_for(video_path, output_dir) # Resumability: skip if output already exists if out_path.exists(): logger.info("SKIP (output exists): %s", out_path) return None logger.info("Processing: %s", video_path) # Determine creator folder folder = creator_folder or infer_creator_folder(video_path) # Get duration via ffprobe duration = get_audio_duration(video_path) if duration is not None: logger.info("Video duration: %.0f s (%.1f min)", duration, duration / 60) # Extract audio to temp file with tempfile.TemporaryDirectory(prefix="chrysopedia_") as tmpdir: audio_path = Path(tmpdir) / "audio.wav" extract_audio(video_path, audio_path) # Transcribe whisper_result = transcribe_audio(audio_path, model_name, device) # Format and write output output = format_output(whisper_result, video_path.name, folder, duration) output_dir.mkdir(parents=True, exist_ok=True) with open(out_path, "w", encoding="utf-8") as f: json.dump(output, f, indent=2, ensure_ascii=False) segment_count = len(output["segments"]) logger.info("Wrote %s (%d segments)", out_path, segment_count) return out_path def find_videos(input_path: Path) -> list[Path]: """Find all supported video files in a directory (non-recursive).""" videos = sorted( p for p in input_path.iterdir() if p.is_file() and p.suffix.lower() in SUPPORTED_EXTENSIONS ) return videos # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="transcribe", description=( "Chrysopedia Whisper Transcription — extract timestamped transcripts " "from video files using OpenAI's Whisper model." ), formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" " # Single file\n" " python transcribe.py --input video.mp4 --output-dir ./transcripts\n" "\n" " # Batch mode (all videos in directory)\n" " python transcribe.py --input ./videos/ --output-dir ./transcripts\n" "\n" " # Use a smaller model on CPU\n" " python transcribe.py --input video.mp4 --model base --device cpu\n" ), ) parser.add_argument( "--input", required=True, type=str, help="Path to a video file or directory of video files", ) parser.add_argument( "--output-dir", required=True, type=str, help="Directory to write transcript JSON files", ) parser.add_argument( "--model", default=DEFAULT_MODEL, type=str, help=f"Whisper model name (default: {DEFAULT_MODEL})", ) parser.add_argument( "--device", default=DEFAULT_DEVICE, type=str, help=f"Compute device: cuda, cpu (default: {DEFAULT_DEVICE})", ) parser.add_argument( "--creator", default=None, type=str, help="Override creator folder name (default: inferred from parent directory)", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Enable debug logging", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Validate ffmpeg availability if not check_ffmpeg(): logger.error( "ffmpeg is not installed or not on PATH. " "Install it with: sudo apt install ffmpeg (or equivalent)" ) return 1 input_path = Path(args.input).resolve() output_dir = Path(args.output_dir).resolve() if not input_path.exists(): logger.error("Input path does not exist: %s", input_path) return 1 # Single file mode if input_path.is_file(): if input_path.suffix.lower() not in SUPPORTED_EXTENSIONS: logger.error( "Unsupported file type '%s'. Supported: %s", input_path.suffix, ", ".join(sorted(SUPPORTED_EXTENSIONS)), ) return 1 result = process_single( input_path, output_dir, args.model, args.device, args.creator ) if result is None: logger.info("Nothing to do (output already exists).") return 0 # Batch mode (directory) if input_path.is_dir(): videos = find_videos(input_path) if not videos: logger.warning("No supported video files found in %s", input_path) return 0 logger.info("Found %d video(s) in %s", len(videos), input_path) processed = 0 skipped = 0 failed = 0 for i, video in enumerate(videos, 1): logger.info("--- [%d/%d] %s ---", i, len(videos), video.name) try: result = process_single( video, output_dir, args.model, args.device, args.creator ) if result is not None: processed += 1 else: skipped += 1 except Exception: logger.exception("FAILED: %s", video.name) failed += 1 logger.info( "Batch complete: %d processed, %d skipped, %d failed", processed, skipped, failed, ) return 1 if failed > 0 else 0 logger.error("Input is neither a file nor a directory: %s", input_path) return 1 if __name__ == "__main__": sys.exit(main())