diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..39e28d4 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,20 @@ +fastapi>=0.115.0,<1.0 +uvicorn[standard]>=0.32.0,<1.0 +sqlalchemy[asyncio]>=2.0,<3.0 +asyncpg>=0.30.0,<1.0 +alembic>=1.14.0,<2.0 +pydantic>=2.0,<3.0 +pydantic-settings>=2.0,<3.0 +celery[redis]>=5.4.0,<6.0 +redis>=5.0,<6.0 +python-dotenv>=1.0,<2.0 +python-multipart>=0.0.9,<1.0 +httpx>=0.27.0,<1.0 +openai>=1.0,<2.0 +qdrant-client>=1.9,<2.0 +pyyaml>=6.0,<7.0 +psycopg2-binary>=2.9,<3.0 +watchdog>=4.0,<5.0 +# Test dependencies +pytest>=8.0,<10.0 +pytest-asyncio>=0.24,<1.0 diff --git a/backend/watcher.py b/backend/watcher.py new file mode 100644 index 0000000..43c8de3 --- /dev/null +++ b/backend/watcher.py @@ -0,0 +1,244 @@ +"""Transcript folder watcher — monitors a directory for new JSON files and POSTs them to the ingest API. + +Designed to run as a standalone service (or Docker container) that watches for Whisper +transcript JSON files dropped via SCP/rsync and automatically ingests them into Chrysopedia. + +Uses PollingObserver (not inotify) for reliability on ZFS and network-mounted filesystems. + +Environment variables: + WATCH_FOLDER — directory to monitor (default: /watch) + WATCHER_API_URL — ingest endpoint URL (default: http://chrysopedia-api:8000/api/v1/ingest) + WATCHER_STABILITY_SECONDS — seconds file size must be stable before processing (default: 2) + WATCHER_POLL_INTERVAL — seconds between filesystem polls (default: 5) +""" + +import json +import logging +import os +import shutil +import sys +import time +from pathlib import Path + +import httpx +from watchdog.events import FileSystemEventHandler +from watchdog.observers.polling import PollingObserver + +# ── Configuration ──────────────────────────────────────────────────────────── + +WATCH_FOLDER = os.environ.get("WATCH_FOLDER", "/watch") +API_URL = os.environ.get("WATCHER_API_URL", "http://chrysopedia-api:8000/api/v1/ingest") +STABILITY_SECONDS = float(os.environ.get("WATCHER_STABILITY_SECONDS", "2")) +POLL_INTERVAL = float(os.environ.get("WATCHER_POLL_INTERVAL", "5")) + +REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"} + +logger = logging.getLogger("watcher") + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def wait_for_stability(path: str, seconds: float = STABILITY_SECONDS) -> bool: + """Wait until a file's size is unchanged for *seconds* consecutive seconds. + + Returns True when stable, False if the file disappears during the wait. + Polls every 0.5 s. + """ + prev_size = -1 + stable_since: float | None = None + + while True: + try: + current_size = os.path.getsize(path) + except OSError: + logger.warning("File disappeared during stability wait: %s", path) + return False + + if current_size == prev_size: + if stable_since is None: + stable_since = time.monotonic() + elif time.monotonic() - stable_since >= seconds: + return True + else: + prev_size = current_size + stable_since = None + + time.sleep(0.5) + + +def validate_transcript(data: dict) -> str | None: + """Validate that *data* has the required transcript keys. + + Returns None on success, or a human-readable error string on failure. + """ + if not isinstance(data, dict): + return "Top-level JSON value is not an object" + missing = REQUIRED_KEYS - data.keys() + if missing: + return f"Missing required keys: {', '.join(sorted(missing))}" + if not isinstance(data.get("segments"), list): + return "'segments' must be an array" + return None + + +def post_to_ingest(filepath: str) -> tuple[bool, str]: + """Read *filepath*, validate its JSON, and POST it to the ingest API. + + Returns (success: bool, detail: str). + """ + # Read and parse + try: + raw_bytes = Path(filepath).read_bytes() + except OSError as exc: + return False, f"File read error: {exc}" + + if not raw_bytes: + return False, "File is empty" + + try: + data = json.loads(raw_bytes) + except json.JSONDecodeError as exc: + return False, f"JSON parse error: {exc}" + + # Validate structure + error = validate_transcript(data) + if error: + return False, f"Validation error: {error}" + + # POST as multipart upload (field name: "file") + filename = os.path.basename(filepath) + try: + with httpx.Client(timeout=30.0) as client: + response = client.post( + API_URL, + files={"file": (filename, raw_bytes, "application/json")}, + ) + except httpx.TimeoutException: + return False, f"HTTP timeout after 30s contacting {API_URL}" + except httpx.ConnectError as exc: + return False, f"Connection error: {exc}" + except httpx.HTTPError as exc: + return False, f"HTTP error: {exc}" + + if response.status_code >= 400: + body = response.text[:500] # cap sidecar size + return False, f"HTTP {response.status_code}: {body}" + + return True, f"HTTP {response.status_code} — ingested successfully" + + +def move_to_processed(filepath: str, watch_folder: str) -> None: + """Move *filepath* into the ``processed/`` subfolder of *watch_folder*.""" + dest_dir = os.path.join(watch_folder, "processed") + os.makedirs(dest_dir, exist_ok=True) + dest = os.path.join(dest_dir, os.path.basename(filepath)) + shutil.move(filepath, dest) + logger.info("Moved to processed: %s", dest) + + +def move_to_failed(filepath: str, error: str, watch_folder: str) -> None: + """Move *filepath* into the ``failed/`` subfolder and write a ``.error`` sidecar.""" + dest_dir = os.path.join(watch_folder, "failed") + os.makedirs(dest_dir, exist_ok=True) + dest = os.path.join(dest_dir, os.path.basename(filepath)) + shutil.move(filepath, dest) + + sidecar = dest + ".error" + try: + Path(sidecar).write_text(error, encoding="utf-8") + except OSError as exc: + logger.error("Failed to write error sidecar %s: %s", sidecar, exc) + + logger.warning("Moved to failed: %s — %s", dest, error) + + +# ── Watchdog handler ───────────────────────────────────────────────────────── + +class TranscriptHandler(FileSystemEventHandler): + """Handle new ``.json`` files in the watch folder.""" + + def __init__(self, watch_folder: str) -> None: + super().__init__() + self.watch_folder = watch_folder + + def on_created(self, event): # noqa: ANN001 + if event.is_directory: + return + + filepath = event.src_path + + # Only process .json files + if not filepath.lower().endswith(".json"): + return + + # Ignore files inside processed/ or failed/ subdirs + rel = os.path.relpath(filepath, self.watch_folder) + parts = Path(rel).parts + if parts and parts[0] in ("processed", "failed"): + return + + logger.info("Detected new file: %s", filepath) + + # Wait for file to finish writing (SCP/rsync) + if not wait_for_stability(filepath): + logger.warning("File vanished before stable: %s", filepath) + return + + logger.info("File stable, processing: %s", filepath) + + # Attempt ingest + success, detail = post_to_ingest(filepath) + + if success: + logger.info("Ingest succeeded for %s: %s", filepath, detail) + try: + move_to_processed(filepath, self.watch_folder) + except OSError as exc: + logger.error("Failed to move %s to processed/: %s", filepath, exc) + else: + logger.error("Ingest failed for %s: %s", filepath, detail) + try: + move_to_failed(filepath, detail, self.watch_folder) + except OSError as exc: + logger.error("Failed to move %s to failed/: %s", filepath, exc) + + +# ── Main ───────────────────────────────────────────────────────────────────── + +def main() -> None: + """Entry point — start the polling observer and block until interrupted.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + stream=sys.stdout, + ) + + watch_path = os.path.abspath(WATCH_FOLDER) + logger.info("Starting watcher on %s (poll every %.1fs, stability %.1fs)", + watch_path, POLL_INTERVAL, STABILITY_SECONDS) + + # Ensure base dirs exist + os.makedirs(watch_path, exist_ok=True) + os.makedirs(os.path.join(watch_path, "processed"), exist_ok=True) + os.makedirs(os.path.join(watch_path, "failed"), exist_ok=True) + + handler = TranscriptHandler(watch_path) + observer = PollingObserver(timeout=POLL_INTERVAL) + observer.schedule(handler, watch_path, recursive=True) + observer.start() + + logger.info("Watcher running. Press Ctrl+C to stop.") + try: + while observer.is_alive(): + observer.join(timeout=1) + except KeyboardInterrupt: + logger.info("Shutting down watcher.") + finally: + observer.stop() + observer.join() + + logger.info("Watcher stopped.") + + +if __name__ == "__main__": + main()