feat: Built backend/watcher.py with PollingObserver-based folder watchi…
- "backend/watcher.py" - "backend/requirements.txt" GSD-Task: S03/T01
This commit is contained in:
parent
ff1b24867f
commit
d218a85e4e
2 changed files with 264 additions and 0 deletions
20
backend/requirements.txt
Normal file
20
backend/requirements.txt
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
fastapi>=0.115.0,<1.0
|
||||||
|
uvicorn[standard]>=0.32.0,<1.0
|
||||||
|
sqlalchemy[asyncio]>=2.0,<3.0
|
||||||
|
asyncpg>=0.30.0,<1.0
|
||||||
|
alembic>=1.14.0,<2.0
|
||||||
|
pydantic>=2.0,<3.0
|
||||||
|
pydantic-settings>=2.0,<3.0
|
||||||
|
celery[redis]>=5.4.0,<6.0
|
||||||
|
redis>=5.0,<6.0
|
||||||
|
python-dotenv>=1.0,<2.0
|
||||||
|
python-multipart>=0.0.9,<1.0
|
||||||
|
httpx>=0.27.0,<1.0
|
||||||
|
openai>=1.0,<2.0
|
||||||
|
qdrant-client>=1.9,<2.0
|
||||||
|
pyyaml>=6.0,<7.0
|
||||||
|
psycopg2-binary>=2.9,<3.0
|
||||||
|
watchdog>=4.0,<5.0
|
||||||
|
# Test dependencies
|
||||||
|
pytest>=8.0,<10.0
|
||||||
|
pytest-asyncio>=0.24,<1.0
|
||||||
244
backend/watcher.py
Normal file
244
backend/watcher.py
Normal file
|
|
@ -0,0 +1,244 @@
|
||||||
|
"""Transcript folder watcher — monitors a directory for new JSON files and POSTs them to the ingest API.
|
||||||
|
|
||||||
|
Designed to run as a standalone service (or Docker container) that watches for Whisper
|
||||||
|
transcript JSON files dropped via SCP/rsync and automatically ingests them into Chrysopedia.
|
||||||
|
|
||||||
|
Uses PollingObserver (not inotify) for reliability on ZFS and network-mounted filesystems.
|
||||||
|
|
||||||
|
Environment variables:
|
||||||
|
WATCH_FOLDER — directory to monitor (default: /watch)
|
||||||
|
WATCHER_API_URL — ingest endpoint URL (default: http://chrysopedia-api:8000/api/v1/ingest)
|
||||||
|
WATCHER_STABILITY_SECONDS — seconds file size must be stable before processing (default: 2)
|
||||||
|
WATCHER_POLL_INTERVAL — seconds between filesystem polls (default: 5)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from watchdog.events import FileSystemEventHandler
|
||||||
|
from watchdog.observers.polling import PollingObserver
|
||||||
|
|
||||||
|
# ── Configuration ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
WATCH_FOLDER = os.environ.get("WATCH_FOLDER", "/watch")
|
||||||
|
API_URL = os.environ.get("WATCHER_API_URL", "http://chrysopedia-api:8000/api/v1/ingest")
|
||||||
|
STABILITY_SECONDS = float(os.environ.get("WATCHER_STABILITY_SECONDS", "2"))
|
||||||
|
POLL_INTERVAL = float(os.environ.get("WATCHER_POLL_INTERVAL", "5"))
|
||||||
|
|
||||||
|
REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"}
|
||||||
|
|
||||||
|
logger = logging.getLogger("watcher")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def wait_for_stability(path: str, seconds: float = STABILITY_SECONDS) -> bool:
|
||||||
|
"""Wait until a file's size is unchanged for *seconds* consecutive seconds.
|
||||||
|
|
||||||
|
Returns True when stable, False if the file disappears during the wait.
|
||||||
|
Polls every 0.5 s.
|
||||||
|
"""
|
||||||
|
prev_size = -1
|
||||||
|
stable_since: float | None = None
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
current_size = os.path.getsize(path)
|
||||||
|
except OSError:
|
||||||
|
logger.warning("File disappeared during stability wait: %s", path)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if current_size == prev_size:
|
||||||
|
if stable_since is None:
|
||||||
|
stable_since = time.monotonic()
|
||||||
|
elif time.monotonic() - stable_since >= seconds:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
prev_size = current_size
|
||||||
|
stable_since = None
|
||||||
|
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_transcript(data: dict) -> str | None:
|
||||||
|
"""Validate that *data* has the required transcript keys.
|
||||||
|
|
||||||
|
Returns None on success, or a human-readable error string on failure.
|
||||||
|
"""
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
return "Top-level JSON value is not an object"
|
||||||
|
missing = REQUIRED_KEYS - data.keys()
|
||||||
|
if missing:
|
||||||
|
return f"Missing required keys: {', '.join(sorted(missing))}"
|
||||||
|
if not isinstance(data.get("segments"), list):
|
||||||
|
return "'segments' must be an array"
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def post_to_ingest(filepath: str) -> tuple[bool, str]:
|
||||||
|
"""Read *filepath*, validate its JSON, and POST it to the ingest API.
|
||||||
|
|
||||||
|
Returns (success: bool, detail: str).
|
||||||
|
"""
|
||||||
|
# Read and parse
|
||||||
|
try:
|
||||||
|
raw_bytes = Path(filepath).read_bytes()
|
||||||
|
except OSError as exc:
|
||||||
|
return False, f"File read error: {exc}"
|
||||||
|
|
||||||
|
if not raw_bytes:
|
||||||
|
return False, "File is empty"
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = json.loads(raw_bytes)
|
||||||
|
except json.JSONDecodeError as exc:
|
||||||
|
return False, f"JSON parse error: {exc}"
|
||||||
|
|
||||||
|
# Validate structure
|
||||||
|
error = validate_transcript(data)
|
||||||
|
if error:
|
||||||
|
return False, f"Validation error: {error}"
|
||||||
|
|
||||||
|
# POST as multipart upload (field name: "file")
|
||||||
|
filename = os.path.basename(filepath)
|
||||||
|
try:
|
||||||
|
with httpx.Client(timeout=30.0) as client:
|
||||||
|
response = client.post(
|
||||||
|
API_URL,
|
||||||
|
files={"file": (filename, raw_bytes, "application/json")},
|
||||||
|
)
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
return False, f"HTTP timeout after 30s contacting {API_URL}"
|
||||||
|
except httpx.ConnectError as exc:
|
||||||
|
return False, f"Connection error: {exc}"
|
||||||
|
except httpx.HTTPError as exc:
|
||||||
|
return False, f"HTTP error: {exc}"
|
||||||
|
|
||||||
|
if response.status_code >= 400:
|
||||||
|
body = response.text[:500] # cap sidecar size
|
||||||
|
return False, f"HTTP {response.status_code}: {body}"
|
||||||
|
|
||||||
|
return True, f"HTTP {response.status_code} — ingested successfully"
|
||||||
|
|
||||||
|
|
||||||
|
def move_to_processed(filepath: str, watch_folder: str) -> None:
|
||||||
|
"""Move *filepath* into the ``processed/`` subfolder of *watch_folder*."""
|
||||||
|
dest_dir = os.path.join(watch_folder, "processed")
|
||||||
|
os.makedirs(dest_dir, exist_ok=True)
|
||||||
|
dest = os.path.join(dest_dir, os.path.basename(filepath))
|
||||||
|
shutil.move(filepath, dest)
|
||||||
|
logger.info("Moved to processed: %s", dest)
|
||||||
|
|
||||||
|
|
||||||
|
def move_to_failed(filepath: str, error: str, watch_folder: str) -> None:
|
||||||
|
"""Move *filepath* into the ``failed/`` subfolder and write a ``.error`` sidecar."""
|
||||||
|
dest_dir = os.path.join(watch_folder, "failed")
|
||||||
|
os.makedirs(dest_dir, exist_ok=True)
|
||||||
|
dest = os.path.join(dest_dir, os.path.basename(filepath))
|
||||||
|
shutil.move(filepath, dest)
|
||||||
|
|
||||||
|
sidecar = dest + ".error"
|
||||||
|
try:
|
||||||
|
Path(sidecar).write_text(error, encoding="utf-8")
|
||||||
|
except OSError as exc:
|
||||||
|
logger.error("Failed to write error sidecar %s: %s", sidecar, exc)
|
||||||
|
|
||||||
|
logger.warning("Moved to failed: %s — %s", dest, error)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Watchdog handler ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TranscriptHandler(FileSystemEventHandler):
|
||||||
|
"""Handle new ``.json`` files in the watch folder."""
|
||||||
|
|
||||||
|
def __init__(self, watch_folder: str) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.watch_folder = watch_folder
|
||||||
|
|
||||||
|
def on_created(self, event): # noqa: ANN001
|
||||||
|
if event.is_directory:
|
||||||
|
return
|
||||||
|
|
||||||
|
filepath = event.src_path
|
||||||
|
|
||||||
|
# Only process .json files
|
||||||
|
if not filepath.lower().endswith(".json"):
|
||||||
|
return
|
||||||
|
|
||||||
|
# Ignore files inside processed/ or failed/ subdirs
|
||||||
|
rel = os.path.relpath(filepath, self.watch_folder)
|
||||||
|
parts = Path(rel).parts
|
||||||
|
if parts and parts[0] in ("processed", "failed"):
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Detected new file: %s", filepath)
|
||||||
|
|
||||||
|
# Wait for file to finish writing (SCP/rsync)
|
||||||
|
if not wait_for_stability(filepath):
|
||||||
|
logger.warning("File vanished before stable: %s", filepath)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("File stable, processing: %s", filepath)
|
||||||
|
|
||||||
|
# Attempt ingest
|
||||||
|
success, detail = post_to_ingest(filepath)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
logger.info("Ingest succeeded for %s: %s", filepath, detail)
|
||||||
|
try:
|
||||||
|
move_to_processed(filepath, self.watch_folder)
|
||||||
|
except OSError as exc:
|
||||||
|
logger.error("Failed to move %s to processed/: %s", filepath, exc)
|
||||||
|
else:
|
||||||
|
logger.error("Ingest failed for %s: %s", filepath, detail)
|
||||||
|
try:
|
||||||
|
move_to_failed(filepath, detail, self.watch_folder)
|
||||||
|
except OSError as exc:
|
||||||
|
logger.error("Failed to move %s to failed/: %s", filepath, exc)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""Entry point — start the polling observer and block until interrupted."""
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
||||||
|
stream=sys.stdout,
|
||||||
|
)
|
||||||
|
|
||||||
|
watch_path = os.path.abspath(WATCH_FOLDER)
|
||||||
|
logger.info("Starting watcher on %s (poll every %.1fs, stability %.1fs)",
|
||||||
|
watch_path, POLL_INTERVAL, STABILITY_SECONDS)
|
||||||
|
|
||||||
|
# Ensure base dirs exist
|
||||||
|
os.makedirs(watch_path, exist_ok=True)
|
||||||
|
os.makedirs(os.path.join(watch_path, "processed"), exist_ok=True)
|
||||||
|
os.makedirs(os.path.join(watch_path, "failed"), exist_ok=True)
|
||||||
|
|
||||||
|
handler = TranscriptHandler(watch_path)
|
||||||
|
observer = PollingObserver(timeout=POLL_INTERVAL)
|
||||||
|
observer.schedule(handler, watch_path, recursive=True)
|
||||||
|
observer.start()
|
||||||
|
|
||||||
|
logger.info("Watcher running. Press Ctrl+C to stop.")
|
||||||
|
try:
|
||||||
|
while observer.is_alive():
|
||||||
|
observer.join(timeout=1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Shutting down watcher.")
|
||||||
|
finally:
|
||||||
|
observer.stop()
|
||||||
|
observer.join()
|
||||||
|
|
||||||
|
logger.info("Watcher stopped.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Reference in a new issue