"""Transcript folder watcher — monitors a directory for new JSON files and POSTs them to the ingest API. Designed to run as a standalone service (or Docker container) that watches for Whisper transcript JSON files dropped via SCP/rsync and automatically ingests them into Chrysopedia. Uses PollingObserver (not inotify) for reliability on ZFS and network-mounted filesystems. Environment variables: WATCH_FOLDER — directory to monitor (default: /watch) WATCHER_API_URL — ingest endpoint URL (default: http://chrysopedia-api:8000/api/v1/ingest) WATCHER_STABILITY_SECONDS — seconds file size must be stable before processing (default: 2) WATCHER_POLL_INTERVAL — seconds between filesystem polls (default: 5) """ import json import logging import os import shutil import sys import time from pathlib import Path import httpx from watchdog.events import FileSystemEventHandler from watchdog.observers.polling import PollingObserver # ── Configuration ──────────────────────────────────────────────────────────── WATCH_FOLDER = os.environ.get("WATCH_FOLDER", "/watch") API_URL = os.environ.get("WATCHER_API_URL", "http://chrysopedia-api:8000/api/v1/ingest") STABILITY_SECONDS = float(os.environ.get("WATCHER_STABILITY_SECONDS", "2")) POLL_INTERVAL = float(os.environ.get("WATCHER_POLL_INTERVAL", "5")) REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"} logger = logging.getLogger("watcher") # ── Helpers ────────────────────────────────────────────────────────────────── def wait_for_stability(path: str, seconds: float = STABILITY_SECONDS) -> bool: """Wait until a file's size is unchanged for *seconds* consecutive seconds. Returns True when stable, False if the file disappears during the wait. Polls every 0.5 s. """ prev_size = -1 stable_since: float | None = None while True: try: current_size = os.path.getsize(path) except OSError: logger.warning("File disappeared during stability wait: %s", path) return False if current_size == prev_size: if stable_since is None: stable_since = time.monotonic() elif time.monotonic() - stable_since >= seconds: return True else: prev_size = current_size stable_since = None time.sleep(0.5) def validate_transcript(data: dict) -> str | None: """Validate that *data* has the required transcript keys. Returns None on success, or a human-readable error string on failure. """ if not isinstance(data, dict): return "Top-level JSON value is not an object" missing = REQUIRED_KEYS - data.keys() if missing: return f"Missing required keys: {', '.join(sorted(missing))}" if not isinstance(data.get("segments"), list): return "'segments' must be an array" return None def post_to_ingest(filepath: str) -> tuple[bool, str]: """Read *filepath*, validate its JSON, and POST it to the ingest API. Returns (success: bool, detail: str). """ # Read and parse try: raw_bytes = Path(filepath).read_bytes() except OSError as exc: return False, f"File read error: {exc}" if not raw_bytes: return False, "File is empty" try: data = json.loads(raw_bytes) except json.JSONDecodeError as exc: return False, f"JSON parse error: {exc}" # Validate structure error = validate_transcript(data) if error: return False, f"Validation error: {error}" # POST as multipart upload (field name: "file") filename = os.path.basename(filepath) try: with httpx.Client(timeout=30.0) as client: response = client.post( API_URL, files={"file": (filename, raw_bytes, "application/json")}, ) except httpx.TimeoutException: return False, f"HTTP timeout after 30s contacting {API_URL}" except httpx.ConnectError as exc: return False, f"Connection error: {exc}" except httpx.HTTPError as exc: return False, f"HTTP error: {exc}" if response.status_code >= 400: body = response.text[:500] # cap sidecar size return False, f"HTTP {response.status_code}: {body}" return True, f"HTTP {response.status_code} — ingested successfully" def move_to_processed(filepath: str, watch_folder: str) -> None: """Move *filepath* into the ``processed/`` subfolder of *watch_folder*.""" dest_dir = os.path.join(watch_folder, "processed") os.makedirs(dest_dir, exist_ok=True) dest = os.path.join(dest_dir, os.path.basename(filepath)) shutil.move(filepath, dest) logger.info("Moved to processed: %s", dest) def move_to_failed(filepath: str, error: str, watch_folder: str) -> None: """Move *filepath* into the ``failed/`` subfolder and write a ``.error`` sidecar.""" dest_dir = os.path.join(watch_folder, "failed") os.makedirs(dest_dir, exist_ok=True) dest = os.path.join(dest_dir, os.path.basename(filepath)) shutil.move(filepath, dest) sidecar = dest + ".error" try: Path(sidecar).write_text(error, encoding="utf-8") except OSError as exc: logger.error("Failed to write error sidecar %s: %s", sidecar, exc) logger.warning("Moved to failed: %s — %s", dest, error) # ── Watchdog handler ───────────────────────────────────────────────────────── class TranscriptHandler(FileSystemEventHandler): """Handle new ``.json`` files in the watch folder.""" def __init__(self, watch_folder: str) -> None: super().__init__() self.watch_folder = watch_folder def on_created(self, event): # noqa: ANN001 if event.is_directory: return filepath = event.src_path # Only process .json files if not filepath.lower().endswith(".json"): return # Ignore files inside processed/ or failed/ subdirs rel = os.path.relpath(filepath, self.watch_folder) parts = Path(rel).parts if parts and parts[0] in ("processed", "failed"): return logger.info("Detected new file: %s", filepath) # Wait for file to finish writing (SCP/rsync) if not wait_for_stability(filepath): logger.warning("File vanished before stable: %s", filepath) return logger.info("File stable, processing: %s", filepath) # Attempt ingest success, detail = post_to_ingest(filepath) if success: logger.info("Ingest succeeded for %s: %s", filepath, detail) try: move_to_processed(filepath, self.watch_folder) except OSError as exc: logger.error("Failed to move %s to processed/: %s", filepath, exc) else: logger.error("Ingest failed for %s: %s", filepath, detail) try: move_to_failed(filepath, detail, self.watch_folder) except OSError as exc: logger.error("Failed to move %s to failed/: %s", filepath, exc) # ── Main ───────────────────────────────────────────────────────────────────── def main() -> None: """Entry point — start the polling observer and block until interrupted.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout, ) watch_path = os.path.abspath(WATCH_FOLDER) logger.info("Starting watcher on %s (poll every %.1fs, stability %.1fs)", watch_path, POLL_INTERVAL, STABILITY_SECONDS) # Ensure base dirs exist os.makedirs(watch_path, exist_ok=True) os.makedirs(os.path.join(watch_path, "processed"), exist_ok=True) os.makedirs(os.path.join(watch_path, "failed"), exist_ok=True) handler = TranscriptHandler(watch_path) observer = PollingObserver(timeout=POLL_INTERVAL) observer.schedule(handler, watch_path, recursive=True) observer.start() logger.info("Watcher running. Press Ctrl+C to stop.") try: while observer.is_alive(): observer.join(timeout=1) except KeyboardInterrupt: logger.info("Shutting down watcher.") finally: observer.stop() observer.join() logger.info("Watcher stopped.") if __name__ == "__main__": main()