"""MCP API Key management router.""" import secrets from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from passlib.context import CryptContext from app.database import get_db from app.models import User, ApiKey from app.schemas import ApiKeyCreate, ApiKeyPublic, ApiKeyCreated from app.middleware.auth import get_current_user, require_tier router = APIRouter() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def generate_api_key() -> tuple[str, str, str]: """Generate an API key. Returns (full_key, prefix, hash).""" raw = secrets.token_bytes(32) # base58-like encoding using alphanumeric chars import base64 encoded = base64.b32encode(raw).decode().rstrip("=").lower() full_key = f"ff_key_{encoded}" prefix = full_key[:16] # ff_key_ + 8 chars key_hash = pwd_context.hash(full_key) return full_key, prefix, key_hash @router.get("", response_model=list[ApiKeyPublic]) async def list_api_keys( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute( select(ApiKey).where(ApiKey.user_id == user.id, ApiKey.revoked_at == None) ) return result.scalars().all() @router.post("", response_model=ApiKeyCreated, status_code=status.HTTP_201_CREATED) async def create_api_key( body: ApiKeyCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_tier("pro", "studio")), ): full_key, prefix, key_hash = generate_api_key() api_key = ApiKey( user_id=user.id, key_hash=key_hash, key_prefix=prefix, name=body.name, ) db.add(api_key) await db.flush() return ApiKeyCreated( id=api_key.id, key_prefix=prefix, name=body.name, trust_tier=api_key.trust_tier, rate_limit_per_hour=api_key.rate_limit_per_hour, last_used_at=None, created_at=api_key.created_at, full_key=full_key, ) @router.delete("/{key_id}", status_code=status.HTTP_204_NO_CONTENT) async def revoke_api_key( key_id: UUID, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute( select(ApiKey).where(ApiKey.id == key_id, ApiKey.user_id == user.id) ) api_key = result.scalar_one_or_none() if not api_key: raise HTTPException(status_code=404, detail="API key not found") from datetime import datetime, timezone api_key.revoked_at = datetime.now(timezone.utc)