diff --git a/.mcp.json b/.mcp.json new file mode 100644 index 0000000..cde04ad --- /dev/null +++ b/.mcp.json @@ -0,0 +1,7 @@ +{ + "mcpServers": { + "chrysopedia": { + "url": "http://ub01:8097/mcp" + } + } +} diff --git a/docker-compose.yml b/docker-compose.yml index 8600b6e..a3dd709 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -198,6 +198,41 @@ services: start_period: 10s stop_grace_period: 15s + # ── MCP Server (management tools for AI agents) ── + chrysopedia-mcp: + build: + context: . + dockerfile: docker/Dockerfile.mcp + container_name: chrysopedia-mcp + restart: unless-stopped + environment: + DATABASE_URL: postgresql://${POSTGRES_USER:-chrysopedia}:${POSTGRES_PASSWORD:-changeme}@chrysopedia-db:5432/${POSTGRES_DB:-chrysopedia} + REDIS_URL: redis://chrysopedia-redis:6379/0 + QDRANT_URL: http://chrysopedia-qdrant:6333 + API_URL: http://chrysopedia-api:8000 + PROMPTS_PATH: /prompts + ports: + - "0.0.0.0:8097:8097" + volumes: + - /var/run/docker.sock:/var/run/docker.sock:ro + - ./prompts:/prompts:ro + depends_on: + chrysopedia-db: + condition: service_healthy + chrysopedia-redis: + condition: service_healthy + chrysopedia-api: + condition: service_healthy + networks: + - chrysopedia + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:8097/mcp -o /dev/null || exit 1"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 15s + stop_grace_period: 15s + networks: chrysopedia: driver: bridge diff --git a/docker/Dockerfile.mcp b/docker/Dockerfile.mcp new file mode 100644 index 0000000..2cd46ee --- /dev/null +++ b/docker/Dockerfile.mcp @@ -0,0 +1,19 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl libpq-dev gcc \ + && rm -rf /var/lib/apt/lists/* + +COPY mcp_server/requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY mcp_server/server.py /app/server.py + +EXPOSE 8097 + +HEALTHCHECK --interval=30s --timeout=5s --retries=3 --start-period=10s \ + CMD curl -sf http://localhost:8097/mcp -o /dev/null || exit 1 + +CMD ["python", "server.py"] diff --git a/mcp_server/requirements.txt b/mcp_server/requirements.txt new file mode 100644 index 0000000..95c4fc0 --- /dev/null +++ b/mcp_server/requirements.txt @@ -0,0 +1,5 @@ +mcp[cli]>=1.9.0,<2.0 +psycopg2-binary>=2.9,<3.0 +redis>=5.0,<6.0 +docker>=7.0,<8.0 +uvicorn[standard]>=0.32.0,<1.0 diff --git a/mcp_server/server.py b/mcp_server/server.py new file mode 100644 index 0000000..96947aa --- /dev/null +++ b/mcp_server/server.py @@ -0,0 +1,828 @@ +""" +Chrysopedia MCP Server — Docker-native management tools. + +Runs inside the Docker Compose stack with direct access to PostgreSQL, +Redis (Celery broker), Docker socket, and the filesystem. + +25 tools across 6 domains: pipeline execution, infrastructure health, +content queries, pipeline observability, embeddings, and prompts. +""" + +from __future__ import annotations + +import hashlib +import json +import os +import subprocess +from datetime import datetime, timezone + +import docker +import psycopg2 +import psycopg2.extras +import redis +from mcp.server.fastmcp import FastMCP + +# ── Configuration ───────────────────────────────────────────────────────────── + +DB_URL = os.getenv( + "DATABASE_URL", + "postgresql://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia", +) +REDIS_URL = os.getenv("REDIS_URL", "redis://chrysopedia-redis:6379/0") +QDRANT_URL = os.getenv("QDRANT_URL", "http://chrysopedia-qdrant:6333") +API_URL = os.getenv("API_URL", "http://chrysopedia-api:8000") +PROMPTS_PATH = os.getenv("PROMPTS_PATH", "/prompts") +COMPOSE_PROJECT = "xpltd_chrysopedia" + +mcp = FastMCP( + "Chrysopedia", + stateless_http=True, + json_response=True, + host="0.0.0.0", + port=8097, +) + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _db(): + """Return a new psycopg2 connection (sync, short-lived).""" + return psycopg2.connect(DB_URL, cursor_factory=psycopg2.extras.RealDictCursor) + + +def _redis(): + """Return a Redis client.""" + return redis.from_url(REDIS_URL, decode_responses=True) + + +def _docker(): + """Return a Docker client from the mounted socket.""" + return docker.from_env() + + +def _query(sql: str, params: tuple = ()) -> list[dict]: + """Execute a read query and return rows as dicts.""" + with _db() as conn: + with conn.cursor() as cur: + cur.execute(sql, params) + return [dict(r) for r in cur.fetchall()] + + +def _query_one(sql: str, params: tuple = ()) -> dict | None: + rows = _query(sql, params) + return rows[0] if rows else None + + +def _execute(sql: str, params: tuple = ()) -> int: + """Execute a write query and return rowcount.""" + with _db() as conn: + with conn.cursor() as cur: + cur.execute(sql, params) + conn.commit() + return cur.rowcount + + +def _json_default(obj): + """JSON serializer for types not handled by default.""" + if isinstance(obj, datetime): + return obj.isoformat() + if hasattr(obj, "__str__"): + return str(obj) + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + + +def _json(data) -> str: + return json.dumps(data, default=_json_default, indent=2) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DOMAIN 1: Pipeline Execution +# ═══════════════════════════════════════════════════════════════════════════════ + + +@mcp.tool() +def pipeline_status() -> str: + """Show currently processing, queued, and backlogged videos with counts.""" + processing = _query( + "SELECT id, filename, processing_status, updated_at " + "FROM source_videos WHERE processing_status = 'processing' " + "ORDER BY updated_at DESC" + ) + queued = _query( + "SELECT id, filename, processing_status, updated_at " + "FROM source_videos WHERE processing_status = 'queued' " + "ORDER BY updated_at DESC" + ) + not_started = _query( + "SELECT id, filename, updated_at " + "FROM source_videos WHERE processing_status = 'not_started' " + "ORDER BY updated_at DESC" + ) + errored = _query( + "SELECT id, filename, updated_at " + "FROM source_videos WHERE processing_status = 'error' " + "ORDER BY updated_at DESC" + ) + + # Check Celery queue + r = _redis() + celery_queue_len = r.llen("celery") + + return _json({ + "processing": processing, + "queued": queued, + "not_started": not_started, + "errored": errored, + "celery_queue_depth": celery_queue_len, + "summary": { + "processing": len(processing), + "queued": len(queued), + "not_started": len(not_started), + "errored": len(errored), + "celery_pending": celery_queue_len, + }, + }) + + +@mcp.tool() +def trigger_pipeline(video_id: str = "", filename: str = "") -> str: + """Trigger pipeline for a video by ID or filename substring. Provide one.""" + if not video_id and not filename: + return "Error: provide either video_id or filename" + + if not video_id: + row = _query_one( + "SELECT id, filename FROM source_videos " + "WHERE filename ILIKE %s LIMIT 1", + (f"%{filename}%",), + ) + if not row: + return f"No video matching '{filename}'" + video_id = str(row["id"]) + filename = row["filename"] + else: + row = _query_one("SELECT filename FROM source_videos WHERE id = %s", (video_id,)) + filename = row["filename"] if row else "unknown" + + import httpx + + resp = httpx.post( + f"{API_URL}/api/v1/admin/pipeline/trigger", + json={"video_id": video_id}, + timeout=10, + ) + return _json({ + "video_id": video_id, + "filename": filename, + "status_code": resp.status_code, + "response": resp.json() if resp.status_code < 400 else resp.text, + }) + + +@mcp.tool() +def trigger_batch( + creator: str = "", + status: str = "not_started", + limit: int = 5, +) -> str: + """Trigger pipeline for multiple videos. Filter by creator slug and/or status. + Default: up to 5 not_started videos.""" + sql = "SELECT sv.id, sv.filename FROM source_videos sv" + conditions = [] + params: list = [] + + if status: + conditions.append("sv.processing_status = %s") + params.append(status) + if creator: + sql += " JOIN creators c ON sv.creator_id = c.id" + conditions.append("c.slug = %s") + params.append(creator) + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + sql += " ORDER BY sv.updated_at DESC LIMIT %s" + params.append(limit) + + videos = _query(sql, tuple(params)) + if not videos: + return "No videos match the filter." + + import httpx + + results = [] + for v in videos: + try: + resp = httpx.post( + f"{API_URL}/api/v1/admin/pipeline/trigger", + json={"video_id": str(v["id"])}, + timeout=10, + ) + results.append({ + "video_id": str(v["id"]), + "filename": v["filename"], + "status": resp.status_code, + }) + except Exception as e: + results.append({ + "video_id": str(v["id"]), + "filename": v["filename"], + "error": str(e), + }) + + return _json({"triggered": len(results), "results": results}) + + +@mcp.tool() +def revoke_pipeline(video_id: str) -> str: + """Revoke/cancel a pipeline run for a video.""" + import httpx + + resp = httpx.post( + f"{API_URL}/api/v1/admin/pipeline/revoke", + json={"video_id": video_id}, + timeout=10, + ) + return _json({ + "video_id": video_id, + "status_code": resp.status_code, + "response": resp.json() if resp.status_code < 400 else resp.text, + }) + + +@mcp.tool() +def rerun_stage(video_id: str, stage: int) -> str: + """Re-run a specific pipeline stage (2-6) for a video.""" + if stage < 2 or stage > 6: + return "Error: stage must be 2-6" + + import httpx + + resp = httpx.post( + f"{API_URL}/api/v1/admin/pipeline/rerun-stage", + json={"video_id": video_id, "stage": stage}, + timeout=10, + ) + return _json({ + "video_id": video_id, + "stage": stage, + "status_code": resp.status_code, + "response": resp.json() if resp.status_code < 400 else resp.text, + }) + + +@mcp.tool() +def queue_depth() -> str: + """Check Celery queue depth and active/reserved/scheduled tasks.""" + r = _redis() + queue_len = r.llen("celery") + + # Inspect via celery CLI inside the worker container + client = _docker() + try: + worker = client.containers.get("chrysopedia-worker") + active_result = worker.exec_run( + "celery -A worker inspect active --json --timeout=5" + ) + active = active_result.output.decode() if active_result.exit_code == 0 else "error" + + reserved_result = worker.exec_run( + "celery -A worker inspect reserved --json --timeout=5" + ) + reserved = reserved_result.output.decode() if reserved_result.exit_code == 0 else "error" + except Exception as e: + active = f"error: {e}" + reserved = f"error: {e}" + + return _json({ + "celery_queue_length": queue_len, + "active_tasks": active, + "reserved_tasks": reserved, + }) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DOMAIN 2: Infrastructure Health +# ═══════════════════════════════════════════════════════════════════════════════ + + +@mcp.tool() +def worker_health() -> str: + """Check Celery worker status: online, pool size, active task.""" + client = _docker() + try: + worker = client.containers.get("chrysopedia-worker") + result = worker.exec_run( + "celery -A worker inspect ping --json --timeout=5" + ) + ping = result.output.decode() if result.exit_code == 0 else "no response" + + stats_result = worker.exec_run( + "celery -A worker inspect stats --json --timeout=5" + ) + stats = stats_result.output.decode() if stats_result.exit_code == 0 else "error" + except docker.errors.NotFound: + return _json({"status": "container not found", "container": "chrysopedia-worker"}) + except Exception as e: + return _json({"status": "error", "error": str(e)}) + + return _json({"ping": ping, "stats": stats}) + + +@mcp.tool() +def container_status() -> str: + """Show status of all Chrysopedia Docker containers.""" + client = _docker() + containers = client.containers.list(all=True, filters={"name": "chrysopedia"}) + + result = [] + for c in containers: + health = "unknown" + if c.attrs.get("State", {}).get("Health"): + health = c.attrs["State"]["Health"].get("Status", "unknown") + + result.append({ + "name": c.name, + "status": c.status, + "health": health, + "image": c.image.tags[0] if c.image.tags else "untagged", + "started_at": c.attrs.get("State", {}).get("StartedAt", ""), + }) + + return _json(sorted(result, key=lambda x: x["name"])) + + +@mcp.tool() +def service_health() -> str: + """Combined health check: DB, Redis, Qdrant, Ollama, API, Worker.""" + checks = {} + + # DB + try: + with _db() as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + checks["postgresql"] = "healthy" + except Exception as e: + checks["postgresql"] = f"error: {e}" + + # Redis + try: + r = _redis() + r.ping() + checks["redis"] = "healthy" + except Exception as e: + checks["redis"] = f"error: {e}" + + # Qdrant + try: + import httpx + + resp = httpx.get(f"{QDRANT_URL}/healthz", timeout=5) + checks["qdrant"] = "healthy" if resp.status_code == 200 else f"status {resp.status_code}" + except Exception as e: + checks["qdrant"] = f"error: {e}" + + # API + try: + import httpx + + resp = httpx.get(f"{API_URL}/health", timeout=5) + checks["api"] = "healthy" if resp.status_code == 200 else f"status {resp.status_code}" + except Exception as e: + checks["api"] = f"error: {e}" + + # Worker (via container health) + try: + client = _docker() + worker = client.containers.get("chrysopedia-worker") + health = worker.attrs.get("State", {}).get("Health", {}).get("Status", "unknown") + checks["worker"] = health + except Exception as e: + checks["worker"] = f"error: {e}" + + all_healthy = all(v == "healthy" for v in checks.values()) + return _json({"overall": "healthy" if all_healthy else "degraded", "services": checks}) + + +@mcp.tool() +def restart_service(service: str) -> str: + """Restart a Chrysopedia Docker container by short name. + Valid: api, worker, watcher, web, db, redis, qdrant, ollama""" + name_map = { + "api": "chrysopedia-api", + "worker": "chrysopedia-worker", + "watcher": "chrysopedia-watcher", + "web": "chrysopedia-web-8096", + "db": "chrysopedia-db", + "redis": "chrysopedia-redis", + "qdrant": "chrysopedia-qdrant", + "ollama": "chrysopedia-ollama", + } + container_name = name_map.get(service) + if not container_name: + return f"Unknown service '{service}'. Valid: {', '.join(name_map.keys())}" + + try: + client = _docker() + container = client.containers.get(container_name) + container.restart(timeout=30) + return _json({"restarted": container_name, "status": "ok"}) + except Exception as e: + return _json({"error": str(e), "container": container_name}) + + +@mcp.tool() +def tail_logs(service: str = "worker", lines: int = 50) -> str: + """Get recent log lines from a Chrysopedia container. + service: api, worker, watcher, web. Default: worker.""" + name_map = { + "api": "chrysopedia-api", + "worker": "chrysopedia-worker", + "watcher": "chrysopedia-watcher", + "web": "chrysopedia-web-8096", + } + container_name = name_map.get(service) + if not container_name: + return f"Unknown service '{service}'. Valid: {', '.join(name_map.keys())}" + + try: + client = _docker() + container = client.containers.get(container_name) + logs = container.logs(tail=lines, timestamps=True).decode("utf-8", errors="replace") + # Truncate to 40KB to stay within MCP limits + if len(logs) > 40000: + logs = logs[-40000:] + return logs + except Exception as e: + return f"Error: {e}" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DOMAIN 3: Content & Data +# ═══════════════════════════════════════════════════════════════════════════════ + + +@mcp.tool() +def video_stats() -> str: + """Counts of technique pages, source videos, creators, with per-creator breakdown.""" + totals = _query(""" + SELECT + (SELECT count(*) FROM technique_pages) as technique_pages, + (SELECT count(*) FROM source_videos) as source_videos, + (SELECT count(*) FROM creators) as creators, + (SELECT count(*) FROM key_moments) as key_moments + """) + + by_creator = _query(""" + SELECT c.name, c.slug, + count(DISTINCT tp.id) as pages, + count(DISTINCT sv.id) as videos + FROM creators c + LEFT JOIN technique_pages tp ON tp.creator_id = c.id + LEFT JOIN source_videos sv ON sv.creator_id = c.id + GROUP BY c.id, c.name, c.slug + ORDER BY pages DESC, videos DESC + """) + + by_status = _query(""" + SELECT processing_status::text as status, count(*) as count + FROM source_videos + GROUP BY processing_status + ORDER BY count DESC + """) + + return _json({ + "totals": totals[0] if totals else {}, + "by_creator": by_creator, + "by_processing_status": by_status, + }) + + +@mcp.tool() +def list_videos( + status: str = "", + creator: str = "", + limit: int = 50, +) -> str: + """List source videos, optionally filtered by processing status and/or creator slug.""" + sql = """ + SELECT sv.id, sv.filename, sv.processing_status::text as status, + c.name as creator, c.slug as creator_slug, sv.updated_at + FROM source_videos sv + LEFT JOIN creators c ON sv.creator_id = c.id + """ + conditions = [] + params: list = [] + + if status: + conditions.append("sv.processing_status = %s") + params.append(status) + if creator: + conditions.append("c.slug = %s") + params.append(creator) + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + sql += " ORDER BY sv.updated_at DESC LIMIT %s" + params.append(limit) + + rows = _query(sql, tuple(params)) + return _json({"count": len(rows), "videos": rows}) + + +@mcp.tool() +def list_technique_pages( + creator: str = "", + format: str = "", + limit: int = 50, +) -> str: + """List technique pages, optionally filtered by creator slug and/or format (v1/v2).""" + sql = """ + SELECT tp.id, tp.title, tp.slug, tp.topic_category, + tp.body_sections_format, c.name as creator, c.slug as creator_slug, + tp.updated_at + FROM technique_pages tp + LEFT JOIN creators c ON tp.creator_id = c.id + """ + conditions = [] + params: list = [] + + if creator: + conditions.append("c.slug = %s") + params.append(creator) + if format: + conditions.append("tp.body_sections_format = %s") + params.append(format) + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + sql += " ORDER BY tp.updated_at DESC LIMIT %s" + params.append(limit) + + rows = _query(sql, tuple(params)) + return _json({"count": len(rows), "pages": rows}) + + +@mcp.tool() +def list_creators() -> str: + """List all creators with video and technique page counts.""" + rows = _query(""" + SELECT c.id, c.name, c.slug, c.genre_tags, + count(DISTINCT sv.id) as video_count, + count(DISTINCT tp.id) as page_count + FROM creators c + LEFT JOIN source_videos sv ON sv.creator_id = c.id + LEFT JOIN technique_pages tp ON tp.creator_id = c.id + GROUP BY c.id, c.name, c.slug, c.genre_tags + ORDER BY page_count DESC, video_count DESC + """) + return _json({"count": len(rows), "creators": rows}) + + +@mcp.tool() +def search_content(query: str, scope: str = "all", limit: int = 10) -> str: + """Run a search query against the Chrysopedia API. scope: all, topics, creators.""" + import httpx + + resp = httpx.get( + f"{API_URL}/api/v1/search", + params={"q": query, "scope": scope, "limit": limit}, + timeout=15, + ) + if resp.status_code != 200: + return f"Search failed: {resp.status_code} {resp.text}" + return _json(resp.json()) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DOMAIN 4: Pipeline Observability +# ═══════════════════════════════════════════════════════════════════════════════ + + +@mcp.tool() +def recent_events(limit: int = 20, video_id: str = "", stage: str = "") -> str: + """Last N pipeline events. Optionally filter by video_id or stage name.""" + sql = """ + SELECT pe.id, pe.stage, pe.event_type, pe.model, + pe.prompt_tokens, pe.completion_tokens, pe.duration_ms, + pe.created_at, sv.filename + FROM pipeline_events pe + LEFT JOIN pipeline_runs pr ON pe.pipeline_run_id = pr.id + LEFT JOIN source_videos sv ON pr.source_video_id = sv.id + """ + conditions = [] + params: list = [] + + if video_id: + conditions.append("pr.source_video_id = %s") + params.append(video_id) + if stage: + conditions.append("pe.stage = %s") + params.append(stage) + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + sql += " ORDER BY pe.created_at DESC LIMIT %s" + params.append(limit) + + rows = _query(sql, tuple(params)) + return _json({"count": len(rows), "events": rows}) + + +@mcp.tool() +def token_usage(video_id: str = "", stage: str = "") -> str: + """Token consumption summary. Optionally filter by video_id or stage.""" + sql = """ + SELECT pe.stage, + count(*) as calls, + coalesce(sum(pe.prompt_tokens), 0) as total_prompt_tokens, + coalesce(sum(pe.completion_tokens), 0) as total_completion_tokens, + coalesce(sum(pe.prompt_tokens), 0) + coalesce(sum(pe.completion_tokens), 0) as total_tokens + FROM pipeline_events pe + LEFT JOIN pipeline_runs pr ON pe.pipeline_run_id = pr.id + WHERE pe.event_type = 'llm_call' + """ + params: list = [] + if video_id: + sql += " AND pr.source_video_id = %s" + params.append(video_id) + if stage: + sql += " AND pe.stage = %s" + params.append(stage) + + sql += " GROUP BY pe.stage ORDER BY total_tokens DESC" + + rows = _query(sql, tuple(params)) + grand_total = sum(r.get("total_tokens", 0) for r in rows) + return _json({"by_stage": rows, "grand_total_tokens": grand_total}) + + +@mcp.tool() +def processing_times(limit: int = 10) -> str: + """Processing duration for recent pipeline runs (start to last event).""" + rows = _query(""" + SELECT pr.id, sv.filename, c.name as creator, + min(pe.created_at) as started_at, + max(pe.created_at) as finished_at, + EXTRACT(EPOCH FROM (max(pe.created_at) - min(pe.created_at)))::int as duration_seconds, + count(pe.id) as event_count + FROM pipeline_runs pr + JOIN source_videos sv ON pr.source_video_id = sv.id + LEFT JOIN creators c ON sv.creator_id = c.id + JOIN pipeline_events pe ON pe.pipeline_run_id = pr.id + GROUP BY pr.id, sv.filename, c.name + ORDER BY max(pe.created_at) DESC + LIMIT %s + """, (limit,)) + return _json({"count": len(rows), "runs": rows}) + + +@mcp.tool() +def debug_mode_toggle(enable: bool | None = None) -> str: + """Get or set pipeline debug mode. Pass enable=true/false to change, omit to check.""" + r = _redis() + key = "chrysopedia:debug_mode" + + if enable is None: + current = r.get(key) + return _json({"debug_mode": current == "1" if current else False}) + + r.set(key, "1" if enable else "0") + return _json({"debug_mode": enable, "set": True}) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DOMAIN 5: Qdrant / Embeddings +# ═══════════════════════════════════════════════════════════════════════════════ + + +@mcp.tool() +def qdrant_stats() -> str: + """Qdrant collection point counts and index health.""" + import httpx + + try: + resp = httpx.get(f"{QDRANT_URL}/collections", timeout=10) + collections_data = resp.json() + result = [] + + for col in collections_data.get("result", {}).get("collections", []): + name = col["name"] + detail_resp = httpx.get(f"{QDRANT_URL}/collections/{name}", timeout=10) + detail = detail_resp.json().get("result", {}) + result.append({ + "name": name, + "points_count": detail.get("points_count", 0), + "vectors_count": detail.get("vectors_count", 0), + "status": detail.get("status", "unknown"), + "segments_count": detail.get("segments_count", 0), + }) + + return _json({"collections": result}) + except Exception as e: + return _json({"error": str(e)}) + + +@mcp.tool() +def reindex_all() -> str: + """Trigger full re-embedding of all technique pages and key moments via the API.""" + import httpx + + resp = httpx.post(f"{API_URL}/api/v1/admin/pipeline/reindex-all", timeout=30) + return _json({ + "status_code": resp.status_code, + "response": resp.json() if resp.status_code < 400 else resp.text, + }) + + +@mcp.tool() +def reindex_video(video_id: str) -> str: + """Re-embed content for a specific video (re-runs stage 6).""" + return rerun_stage(video_id=video_id, stage=6) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DOMAIN 6: Prompt Management +# ═══════════════════════════════════════════════════════════════════════════════ + + +@mcp.tool() +def list_prompts() -> str: + """List current prompt files with sizes and SHA-256 hashes.""" + prompts_dir = PROMPTS_PATH + if not os.path.isdir(prompts_dir): + return _json({"error": f"Prompts directory not found: {prompts_dir}"}) + + result = [] + for fname in sorted(os.listdir(prompts_dir)): + fpath = os.path.join(prompts_dir, fname) + if not os.path.isfile(fpath): + continue + with open(fpath, "rb") as f: + content = f.read() + result.append({ + "filename": fname, + "size_bytes": len(content), + "sha256": hashlib.sha256(content).hexdigest()[:16], + "lines": content.count(b"\n"), + }) + + return _json({"count": len(result), "prompts": result}) + + +@mcp.tool() +def prompt_diff(filename: str) -> str: + """Compare a current prompt file to its most recent .bak version. + Returns a summary of changes (not full diff to save tokens).""" + prompts_dir = PROMPTS_PATH + current_path = os.path.join(prompts_dir, filename) + + if not os.path.isfile(current_path): + return f"Prompt file not found: {filename}" + + # Find most recent .bak + bak_files = sorted( + [f for f in os.listdir(prompts_dir) if f.startswith(filename) and f.endswith(".bak")], + reverse=True, + ) + + if not bak_files: + return _json({"filename": filename, "backup_found": False, "message": "No .bak file found"}) + + bak_path = os.path.join(prompts_dir, bak_files[0]) + + with open(current_path) as f: + current = f.readlines() + with open(bak_path) as f: + backup = f.readlines() + + import difflib + + diff = list(difflib.unified_diff(backup, current, fromfile=bak_files[0], tofile=filename, n=1)) + + added = sum(1 for l in diff if l.startswith("+") and not l.startswith("+++")) + removed = sum(1 for l in diff if l.startswith("-") and not l.startswith("---")) + + # Truncate diff output + diff_text = "".join(diff[:200]) + if len(diff) > 200: + diff_text += f"\n... ({len(diff) - 200} more lines)" + + return _json({ + "filename": filename, + "backup": bak_files[0], + "current_lines": len(current), + "backup_lines": len(backup), + "lines_added": added, + "lines_removed": removed, + "diff": diff_text, + }) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Entry point +# ═══════════════════════════════════════════════════════════════════════════════ + +if __name__ == "__main__": + mcp.run(transport="streamable-http")