feat: Chrysopedia MCP server — 25 tools for pipeline, infra, content, observability, embeddings, prompts

Runs as chrysopedia-mcp container in Docker Compose with direct DB, Redis,
Docker socket, and API access. Streamable HTTP transport on port 8097.
Clients connect via http://ub01:8097/mcp
This commit is contained in:
jlightner 2026-04-03 02:57:27 +00:00
parent bbea843235
commit 567f69a480
5 changed files with 894 additions and 0 deletions

7
.mcp.json Normal file
View file

@ -0,0 +1,7 @@
{
"mcpServers": {
"chrysopedia": {
"url": "http://ub01:8097/mcp"
}
}
}

View file

@ -198,6 +198,41 @@ services:
start_period: 10s
stop_grace_period: 15s
# ── MCP Server (management tools for AI agents) ──
chrysopedia-mcp:
build:
context: .
dockerfile: docker/Dockerfile.mcp
container_name: chrysopedia-mcp
restart: unless-stopped
environment:
DATABASE_URL: postgresql://${POSTGRES_USER:-chrysopedia}:${POSTGRES_PASSWORD:-changeme}@chrysopedia-db:5432/${POSTGRES_DB:-chrysopedia}
REDIS_URL: redis://chrysopedia-redis:6379/0
QDRANT_URL: http://chrysopedia-qdrant:6333
API_URL: http://chrysopedia-api:8000
PROMPTS_PATH: /prompts
ports:
- "0.0.0.0:8097:8097"
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./prompts:/prompts:ro
depends_on:
chrysopedia-db:
condition: service_healthy
chrysopedia-redis:
condition: service_healthy
chrysopedia-api:
condition: service_healthy
networks:
- chrysopedia
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:8097/mcp -o /dev/null || exit 1"]
interval: 30s
timeout: 5s
retries: 3
start_period: 15s
stop_grace_period: 15s
networks:
chrysopedia:
driver: bridge

19
docker/Dockerfile.mcp Normal file
View file

@ -0,0 +1,19 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
curl libpq-dev gcc \
&& rm -rf /var/lib/apt/lists/*
COPY mcp_server/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY mcp_server/server.py /app/server.py
EXPOSE 8097
HEALTHCHECK --interval=30s --timeout=5s --retries=3 --start-period=10s \
CMD curl -sf http://localhost:8097/mcp -o /dev/null || exit 1
CMD ["python", "server.py"]

View file

@ -0,0 +1,5 @@
mcp[cli]>=1.9.0,<2.0
psycopg2-binary>=2.9,<3.0
redis>=5.0,<6.0
docker>=7.0,<8.0
uvicorn[standard]>=0.32.0,<1.0

828
mcp_server/server.py Normal file
View file

@ -0,0 +1,828 @@
"""
Chrysopedia MCP Server Docker-native management tools.
Runs inside the Docker Compose stack with direct access to PostgreSQL,
Redis (Celery broker), Docker socket, and the filesystem.
25 tools across 6 domains: pipeline execution, infrastructure health,
content queries, pipeline observability, embeddings, and prompts.
"""
from __future__ import annotations
import hashlib
import json
import os
import subprocess
from datetime import datetime, timezone
import docker
import psycopg2
import psycopg2.extras
import redis
from mcp.server.fastmcp import FastMCP
# ── Configuration ─────────────────────────────────────────────────────────────
DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia",
)
REDIS_URL = os.getenv("REDIS_URL", "redis://chrysopedia-redis:6379/0")
QDRANT_URL = os.getenv("QDRANT_URL", "http://chrysopedia-qdrant:6333")
API_URL = os.getenv("API_URL", "http://chrysopedia-api:8000")
PROMPTS_PATH = os.getenv("PROMPTS_PATH", "/prompts")
COMPOSE_PROJECT = "xpltd_chrysopedia"
mcp = FastMCP(
"Chrysopedia",
stateless_http=True,
json_response=True,
host="0.0.0.0",
port=8097,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _db():
"""Return a new psycopg2 connection (sync, short-lived)."""
return psycopg2.connect(DB_URL, cursor_factory=psycopg2.extras.RealDictCursor)
def _redis():
"""Return a Redis client."""
return redis.from_url(REDIS_URL, decode_responses=True)
def _docker():
"""Return a Docker client from the mounted socket."""
return docker.from_env()
def _query(sql: str, params: tuple = ()) -> list[dict]:
"""Execute a read query and return rows as dicts."""
with _db() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
return [dict(r) for r in cur.fetchall()]
def _query_one(sql: str, params: tuple = ()) -> dict | None:
rows = _query(sql, params)
return rows[0] if rows else None
def _execute(sql: str, params: tuple = ()) -> int:
"""Execute a write query and return rowcount."""
with _db() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
conn.commit()
return cur.rowcount
def _json_default(obj):
"""JSON serializer for types not handled by default."""
if isinstance(obj, datetime):
return obj.isoformat()
if hasattr(obj, "__str__"):
return str(obj)
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
def _json(data) -> str:
return json.dumps(data, default=_json_default, indent=2)
# ═══════════════════════════════════════════════════════════════════════════════
# DOMAIN 1: Pipeline Execution
# ═══════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def pipeline_status() -> str:
"""Show currently processing, queued, and backlogged videos with counts."""
processing = _query(
"SELECT id, filename, processing_status, updated_at "
"FROM source_videos WHERE processing_status = 'processing' "
"ORDER BY updated_at DESC"
)
queued = _query(
"SELECT id, filename, processing_status, updated_at "
"FROM source_videos WHERE processing_status = 'queued' "
"ORDER BY updated_at DESC"
)
not_started = _query(
"SELECT id, filename, updated_at "
"FROM source_videos WHERE processing_status = 'not_started' "
"ORDER BY updated_at DESC"
)
errored = _query(
"SELECT id, filename, updated_at "
"FROM source_videos WHERE processing_status = 'error' "
"ORDER BY updated_at DESC"
)
# Check Celery queue
r = _redis()
celery_queue_len = r.llen("celery")
return _json({
"processing": processing,
"queued": queued,
"not_started": not_started,
"errored": errored,
"celery_queue_depth": celery_queue_len,
"summary": {
"processing": len(processing),
"queued": len(queued),
"not_started": len(not_started),
"errored": len(errored),
"celery_pending": celery_queue_len,
},
})
@mcp.tool()
def trigger_pipeline(video_id: str = "", filename: str = "") -> str:
"""Trigger pipeline for a video by ID or filename substring. Provide one."""
if not video_id and not filename:
return "Error: provide either video_id or filename"
if not video_id:
row = _query_one(
"SELECT id, filename FROM source_videos "
"WHERE filename ILIKE %s LIMIT 1",
(f"%{filename}%",),
)
if not row:
return f"No video matching '{filename}'"
video_id = str(row["id"])
filename = row["filename"]
else:
row = _query_one("SELECT filename FROM source_videos WHERE id = %s", (video_id,))
filename = row["filename"] if row else "unknown"
import httpx
resp = httpx.post(
f"{API_URL}/api/v1/admin/pipeline/trigger",
json={"video_id": video_id},
timeout=10,
)
return _json({
"video_id": video_id,
"filename": filename,
"status_code": resp.status_code,
"response": resp.json() if resp.status_code < 400 else resp.text,
})
@mcp.tool()
def trigger_batch(
creator: str = "",
status: str = "not_started",
limit: int = 5,
) -> str:
"""Trigger pipeline for multiple videos. Filter by creator slug and/or status.
Default: up to 5 not_started videos."""
sql = "SELECT sv.id, sv.filename FROM source_videos sv"
conditions = []
params: list = []
if status:
conditions.append("sv.processing_status = %s")
params.append(status)
if creator:
sql += " JOIN creators c ON sv.creator_id = c.id"
conditions.append("c.slug = %s")
params.append(creator)
if conditions:
sql += " WHERE " + " AND ".join(conditions)
sql += " ORDER BY sv.updated_at DESC LIMIT %s"
params.append(limit)
videos = _query(sql, tuple(params))
if not videos:
return "No videos match the filter."
import httpx
results = []
for v in videos:
try:
resp = httpx.post(
f"{API_URL}/api/v1/admin/pipeline/trigger",
json={"video_id": str(v["id"])},
timeout=10,
)
results.append({
"video_id": str(v["id"]),
"filename": v["filename"],
"status": resp.status_code,
})
except Exception as e:
results.append({
"video_id": str(v["id"]),
"filename": v["filename"],
"error": str(e),
})
return _json({"triggered": len(results), "results": results})
@mcp.tool()
def revoke_pipeline(video_id: str) -> str:
"""Revoke/cancel a pipeline run for a video."""
import httpx
resp = httpx.post(
f"{API_URL}/api/v1/admin/pipeline/revoke",
json={"video_id": video_id},
timeout=10,
)
return _json({
"video_id": video_id,
"status_code": resp.status_code,
"response": resp.json() if resp.status_code < 400 else resp.text,
})
@mcp.tool()
def rerun_stage(video_id: str, stage: int) -> str:
"""Re-run a specific pipeline stage (2-6) for a video."""
if stage < 2 or stage > 6:
return "Error: stage must be 2-6"
import httpx
resp = httpx.post(
f"{API_URL}/api/v1/admin/pipeline/rerun-stage",
json={"video_id": video_id, "stage": stage},
timeout=10,
)
return _json({
"video_id": video_id,
"stage": stage,
"status_code": resp.status_code,
"response": resp.json() if resp.status_code < 400 else resp.text,
})
@mcp.tool()
def queue_depth() -> str:
"""Check Celery queue depth and active/reserved/scheduled tasks."""
r = _redis()
queue_len = r.llen("celery")
# Inspect via celery CLI inside the worker container
client = _docker()
try:
worker = client.containers.get("chrysopedia-worker")
active_result = worker.exec_run(
"celery -A worker inspect active --json --timeout=5"
)
active = active_result.output.decode() if active_result.exit_code == 0 else "error"
reserved_result = worker.exec_run(
"celery -A worker inspect reserved --json --timeout=5"
)
reserved = reserved_result.output.decode() if reserved_result.exit_code == 0 else "error"
except Exception as e:
active = f"error: {e}"
reserved = f"error: {e}"
return _json({
"celery_queue_length": queue_len,
"active_tasks": active,
"reserved_tasks": reserved,
})
# ═══════════════════════════════════════════════════════════════════════════════
# DOMAIN 2: Infrastructure Health
# ═══════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def worker_health() -> str:
"""Check Celery worker status: online, pool size, active task."""
client = _docker()
try:
worker = client.containers.get("chrysopedia-worker")
result = worker.exec_run(
"celery -A worker inspect ping --json --timeout=5"
)
ping = result.output.decode() if result.exit_code == 0 else "no response"
stats_result = worker.exec_run(
"celery -A worker inspect stats --json --timeout=5"
)
stats = stats_result.output.decode() if stats_result.exit_code == 0 else "error"
except docker.errors.NotFound:
return _json({"status": "container not found", "container": "chrysopedia-worker"})
except Exception as e:
return _json({"status": "error", "error": str(e)})
return _json({"ping": ping, "stats": stats})
@mcp.tool()
def container_status() -> str:
"""Show status of all Chrysopedia Docker containers."""
client = _docker()
containers = client.containers.list(all=True, filters={"name": "chrysopedia"})
result = []
for c in containers:
health = "unknown"
if c.attrs.get("State", {}).get("Health"):
health = c.attrs["State"]["Health"].get("Status", "unknown")
result.append({
"name": c.name,
"status": c.status,
"health": health,
"image": c.image.tags[0] if c.image.tags else "untagged",
"started_at": c.attrs.get("State", {}).get("StartedAt", ""),
})
return _json(sorted(result, key=lambda x: x["name"]))
@mcp.tool()
def service_health() -> str:
"""Combined health check: DB, Redis, Qdrant, Ollama, API, Worker."""
checks = {}
# DB
try:
with _db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1")
checks["postgresql"] = "healthy"
except Exception as e:
checks["postgresql"] = f"error: {e}"
# Redis
try:
r = _redis()
r.ping()
checks["redis"] = "healthy"
except Exception as e:
checks["redis"] = f"error: {e}"
# Qdrant
try:
import httpx
resp = httpx.get(f"{QDRANT_URL}/healthz", timeout=5)
checks["qdrant"] = "healthy" if resp.status_code == 200 else f"status {resp.status_code}"
except Exception as e:
checks["qdrant"] = f"error: {e}"
# API
try:
import httpx
resp = httpx.get(f"{API_URL}/health", timeout=5)
checks["api"] = "healthy" if resp.status_code == 200 else f"status {resp.status_code}"
except Exception as e:
checks["api"] = f"error: {e}"
# Worker (via container health)
try:
client = _docker()
worker = client.containers.get("chrysopedia-worker")
health = worker.attrs.get("State", {}).get("Health", {}).get("Status", "unknown")
checks["worker"] = health
except Exception as e:
checks["worker"] = f"error: {e}"
all_healthy = all(v == "healthy" for v in checks.values())
return _json({"overall": "healthy" if all_healthy else "degraded", "services": checks})
@mcp.tool()
def restart_service(service: str) -> str:
"""Restart a Chrysopedia Docker container by short name.
Valid: api, worker, watcher, web, db, redis, qdrant, ollama"""
name_map = {
"api": "chrysopedia-api",
"worker": "chrysopedia-worker",
"watcher": "chrysopedia-watcher",
"web": "chrysopedia-web-8096",
"db": "chrysopedia-db",
"redis": "chrysopedia-redis",
"qdrant": "chrysopedia-qdrant",
"ollama": "chrysopedia-ollama",
}
container_name = name_map.get(service)
if not container_name:
return f"Unknown service '{service}'. Valid: {', '.join(name_map.keys())}"
try:
client = _docker()
container = client.containers.get(container_name)
container.restart(timeout=30)
return _json({"restarted": container_name, "status": "ok"})
except Exception as e:
return _json({"error": str(e), "container": container_name})
@mcp.tool()
def tail_logs(service: str = "worker", lines: int = 50) -> str:
"""Get recent log lines from a Chrysopedia container.
service: api, worker, watcher, web. Default: worker."""
name_map = {
"api": "chrysopedia-api",
"worker": "chrysopedia-worker",
"watcher": "chrysopedia-watcher",
"web": "chrysopedia-web-8096",
}
container_name = name_map.get(service)
if not container_name:
return f"Unknown service '{service}'. Valid: {', '.join(name_map.keys())}"
try:
client = _docker()
container = client.containers.get(container_name)
logs = container.logs(tail=lines, timestamps=True).decode("utf-8", errors="replace")
# Truncate to 40KB to stay within MCP limits
if len(logs) > 40000:
logs = logs[-40000:]
return logs
except Exception as e:
return f"Error: {e}"
# ═══════════════════════════════════════════════════════════════════════════════
# DOMAIN 3: Content & Data
# ═══════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def video_stats() -> str:
"""Counts of technique pages, source videos, creators, with per-creator breakdown."""
totals = _query("""
SELECT
(SELECT count(*) FROM technique_pages) as technique_pages,
(SELECT count(*) FROM source_videos) as source_videos,
(SELECT count(*) FROM creators) as creators,
(SELECT count(*) FROM key_moments) as key_moments
""")
by_creator = _query("""
SELECT c.name, c.slug,
count(DISTINCT tp.id) as pages,
count(DISTINCT sv.id) as videos
FROM creators c
LEFT JOIN technique_pages tp ON tp.creator_id = c.id
LEFT JOIN source_videos sv ON sv.creator_id = c.id
GROUP BY c.id, c.name, c.slug
ORDER BY pages DESC, videos DESC
""")
by_status = _query("""
SELECT processing_status::text as status, count(*) as count
FROM source_videos
GROUP BY processing_status
ORDER BY count DESC
""")
return _json({
"totals": totals[0] if totals else {},
"by_creator": by_creator,
"by_processing_status": by_status,
})
@mcp.tool()
def list_videos(
status: str = "",
creator: str = "",
limit: int = 50,
) -> str:
"""List source videos, optionally filtered by processing status and/or creator slug."""
sql = """
SELECT sv.id, sv.filename, sv.processing_status::text as status,
c.name as creator, c.slug as creator_slug, sv.updated_at
FROM source_videos sv
LEFT JOIN creators c ON sv.creator_id = c.id
"""
conditions = []
params: list = []
if status:
conditions.append("sv.processing_status = %s")
params.append(status)
if creator:
conditions.append("c.slug = %s")
params.append(creator)
if conditions:
sql += " WHERE " + " AND ".join(conditions)
sql += " ORDER BY sv.updated_at DESC LIMIT %s"
params.append(limit)
rows = _query(sql, tuple(params))
return _json({"count": len(rows), "videos": rows})
@mcp.tool()
def list_technique_pages(
creator: str = "",
format: str = "",
limit: int = 50,
) -> str:
"""List technique pages, optionally filtered by creator slug and/or format (v1/v2)."""
sql = """
SELECT tp.id, tp.title, tp.slug, tp.topic_category,
tp.body_sections_format, c.name as creator, c.slug as creator_slug,
tp.updated_at
FROM technique_pages tp
LEFT JOIN creators c ON tp.creator_id = c.id
"""
conditions = []
params: list = []
if creator:
conditions.append("c.slug = %s")
params.append(creator)
if format:
conditions.append("tp.body_sections_format = %s")
params.append(format)
if conditions:
sql += " WHERE " + " AND ".join(conditions)
sql += " ORDER BY tp.updated_at DESC LIMIT %s"
params.append(limit)
rows = _query(sql, tuple(params))
return _json({"count": len(rows), "pages": rows})
@mcp.tool()
def list_creators() -> str:
"""List all creators with video and technique page counts."""
rows = _query("""
SELECT c.id, c.name, c.slug, c.genre_tags,
count(DISTINCT sv.id) as video_count,
count(DISTINCT tp.id) as page_count
FROM creators c
LEFT JOIN source_videos sv ON sv.creator_id = c.id
LEFT JOIN technique_pages tp ON tp.creator_id = c.id
GROUP BY c.id, c.name, c.slug, c.genre_tags
ORDER BY page_count DESC, video_count DESC
""")
return _json({"count": len(rows), "creators": rows})
@mcp.tool()
def search_content(query: str, scope: str = "all", limit: int = 10) -> str:
"""Run a search query against the Chrysopedia API. scope: all, topics, creators."""
import httpx
resp = httpx.get(
f"{API_URL}/api/v1/search",
params={"q": query, "scope": scope, "limit": limit},
timeout=15,
)
if resp.status_code != 200:
return f"Search failed: {resp.status_code} {resp.text}"
return _json(resp.json())
# ═══════════════════════════════════════════════════════════════════════════════
# DOMAIN 4: Pipeline Observability
# ═══════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def recent_events(limit: int = 20, video_id: str = "", stage: str = "") -> str:
"""Last N pipeline events. Optionally filter by video_id or stage name."""
sql = """
SELECT pe.id, pe.stage, pe.event_type, pe.model,
pe.prompt_tokens, pe.completion_tokens, pe.duration_ms,
pe.created_at, sv.filename
FROM pipeline_events pe
LEFT JOIN pipeline_runs pr ON pe.pipeline_run_id = pr.id
LEFT JOIN source_videos sv ON pr.source_video_id = sv.id
"""
conditions = []
params: list = []
if video_id:
conditions.append("pr.source_video_id = %s")
params.append(video_id)
if stage:
conditions.append("pe.stage = %s")
params.append(stage)
if conditions:
sql += " WHERE " + " AND ".join(conditions)
sql += " ORDER BY pe.created_at DESC LIMIT %s"
params.append(limit)
rows = _query(sql, tuple(params))
return _json({"count": len(rows), "events": rows})
@mcp.tool()
def token_usage(video_id: str = "", stage: str = "") -> str:
"""Token consumption summary. Optionally filter by video_id or stage."""
sql = """
SELECT pe.stage,
count(*) as calls,
coalesce(sum(pe.prompt_tokens), 0) as total_prompt_tokens,
coalesce(sum(pe.completion_tokens), 0) as total_completion_tokens,
coalesce(sum(pe.prompt_tokens), 0) + coalesce(sum(pe.completion_tokens), 0) as total_tokens
FROM pipeline_events pe
LEFT JOIN pipeline_runs pr ON pe.pipeline_run_id = pr.id
WHERE pe.event_type = 'llm_call'
"""
params: list = []
if video_id:
sql += " AND pr.source_video_id = %s"
params.append(video_id)
if stage:
sql += " AND pe.stage = %s"
params.append(stage)
sql += " GROUP BY pe.stage ORDER BY total_tokens DESC"
rows = _query(sql, tuple(params))
grand_total = sum(r.get("total_tokens", 0) for r in rows)
return _json({"by_stage": rows, "grand_total_tokens": grand_total})
@mcp.tool()
def processing_times(limit: int = 10) -> str:
"""Processing duration for recent pipeline runs (start to last event)."""
rows = _query("""
SELECT pr.id, sv.filename, c.name as creator,
min(pe.created_at) as started_at,
max(pe.created_at) as finished_at,
EXTRACT(EPOCH FROM (max(pe.created_at) - min(pe.created_at)))::int as duration_seconds,
count(pe.id) as event_count
FROM pipeline_runs pr
JOIN source_videos sv ON pr.source_video_id = sv.id
LEFT JOIN creators c ON sv.creator_id = c.id
JOIN pipeline_events pe ON pe.pipeline_run_id = pr.id
GROUP BY pr.id, sv.filename, c.name
ORDER BY max(pe.created_at) DESC
LIMIT %s
""", (limit,))
return _json({"count": len(rows), "runs": rows})
@mcp.tool()
def debug_mode_toggle(enable: bool | None = None) -> str:
"""Get or set pipeline debug mode. Pass enable=true/false to change, omit to check."""
r = _redis()
key = "chrysopedia:debug_mode"
if enable is None:
current = r.get(key)
return _json({"debug_mode": current == "1" if current else False})
r.set(key, "1" if enable else "0")
return _json({"debug_mode": enable, "set": True})
# ═══════════════════════════════════════════════════════════════════════════════
# DOMAIN 5: Qdrant / Embeddings
# ═══════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def qdrant_stats() -> str:
"""Qdrant collection point counts and index health."""
import httpx
try:
resp = httpx.get(f"{QDRANT_URL}/collections", timeout=10)
collections_data = resp.json()
result = []
for col in collections_data.get("result", {}).get("collections", []):
name = col["name"]
detail_resp = httpx.get(f"{QDRANT_URL}/collections/{name}", timeout=10)
detail = detail_resp.json().get("result", {})
result.append({
"name": name,
"points_count": detail.get("points_count", 0),
"vectors_count": detail.get("vectors_count", 0),
"status": detail.get("status", "unknown"),
"segments_count": detail.get("segments_count", 0),
})
return _json({"collections": result})
except Exception as e:
return _json({"error": str(e)})
@mcp.tool()
def reindex_all() -> str:
"""Trigger full re-embedding of all technique pages and key moments via the API."""
import httpx
resp = httpx.post(f"{API_URL}/api/v1/admin/pipeline/reindex-all", timeout=30)
return _json({
"status_code": resp.status_code,
"response": resp.json() if resp.status_code < 400 else resp.text,
})
@mcp.tool()
def reindex_video(video_id: str) -> str:
"""Re-embed content for a specific video (re-runs stage 6)."""
return rerun_stage(video_id=video_id, stage=6)
# ═══════════════════════════════════════════════════════════════════════════════
# DOMAIN 6: Prompt Management
# ═══════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def list_prompts() -> str:
"""List current prompt files with sizes and SHA-256 hashes."""
prompts_dir = PROMPTS_PATH
if not os.path.isdir(prompts_dir):
return _json({"error": f"Prompts directory not found: {prompts_dir}"})
result = []
for fname in sorted(os.listdir(prompts_dir)):
fpath = os.path.join(prompts_dir, fname)
if not os.path.isfile(fpath):
continue
with open(fpath, "rb") as f:
content = f.read()
result.append({
"filename": fname,
"size_bytes": len(content),
"sha256": hashlib.sha256(content).hexdigest()[:16],
"lines": content.count(b"\n"),
})
return _json({"count": len(result), "prompts": result})
@mcp.tool()
def prompt_diff(filename: str) -> str:
"""Compare a current prompt file to its most recent .bak version.
Returns a summary of changes (not full diff to save tokens)."""
prompts_dir = PROMPTS_PATH
current_path = os.path.join(prompts_dir, filename)
if not os.path.isfile(current_path):
return f"Prompt file not found: {filename}"
# Find most recent .bak
bak_files = sorted(
[f for f in os.listdir(prompts_dir) if f.startswith(filename) and f.endswith(".bak")],
reverse=True,
)
if not bak_files:
return _json({"filename": filename, "backup_found": False, "message": "No .bak file found"})
bak_path = os.path.join(prompts_dir, bak_files[0])
with open(current_path) as f:
current = f.readlines()
with open(bak_path) as f:
backup = f.readlines()
import difflib
diff = list(difflib.unified_diff(backup, current, fromfile=bak_files[0], tofile=filename, n=1))
added = sum(1 for l in diff if l.startswith("+") and not l.startswith("+++"))
removed = sum(1 for l in diff if l.startswith("-") and not l.startswith("---"))
# Truncate diff output
diff_text = "".join(diff[:200])
if len(diff) > 200:
diff_text += f"\n... ({len(diff) - 200} more lines)"
return _json({
"filename": filename,
"backup": bak_files[0],
"current_lines": len(current),
"backup_lines": len(backup),
"lines_added": added,
"lines_removed": removed,
"diff": diff_text,
})
# ═══════════════════════════════════════════════════════════════════════════════
# Entry point
# ═══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
mcp.run(transport="streamable-http")