"""MCP API Key management router.""" import secrets from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select import bcrypt from app.database import get_db from app.models import User, ApiKey from app.schemas import ApiKeyCreate, ApiKeyPublic, ApiKeyCreated from app.middleware.auth import get_current_user, require_tier router = APIRouter() def generate_api_key() -> tuple[str, str, str]: """Generate an API key. Returns (full_key, prefix, hash).""" raw = secrets.token_bytes(32) import base64 encoded = base64.b32encode(raw).decode().rstrip("=").lower() full_key = f"ff_key_{encoded}" prefix = full_key[:16] key_hash = bcrypt.hashpw(full_key.encode("utf-8"), bcrypt.gensalt(rounds=10)).decode("utf-8") return full_key, prefix, key_hash @router.get("", response_model=list[ApiKeyPublic]) async def list_api_keys( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute( select(ApiKey).where(ApiKey.user_id == user.id, ApiKey.revoked_at == None) ) return result.scalars().all() @router.post("", response_model=ApiKeyCreated, status_code=status.HTTP_201_CREATED) async def create_api_key( body: ApiKeyCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_tier("pro", "studio")), ): full_key, prefix, key_hash = generate_api_key() api_key = ApiKey( user_id=user.id, key_hash=key_hash, key_prefix=prefix, name=body.name, ) db.add(api_key) await db.flush() return ApiKeyCreated( id=api_key.id, key_prefix=prefix, name=body.name, trust_tier=api_key.trust_tier, rate_limit_per_hour=api_key.rate_limit_per_hour, last_used_at=None, created_at=api_key.created_at, full_key=full_key, ) @router.delete("/{key_id}", status_code=status.HTTP_204_NO_CONTENT) async def revoke_api_key( key_id: UUID, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute( select(ApiKey).where(ApiKey.id == key_id, ApiKey.user_id == user.id) ) api_key = result.scalar_one_or_none() if not api_key: raise HTTPException(status_code=404, detail="API key not found") from datetime import datetime, timezone api_key.revoked_at = datetime.now(timezone.utc)