"""Integration tests for impersonation write-mode and audit log.""" import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from models import InviteCode, User, UserRole # Re-use fixtures from conftest: db_engine, client, admin_auth _TARGET_EMAIL = "impersonate-target@chrysopedia.com" _TARGET_PASSWORD = "targetpass123" _TARGET_INVITE = "IMP-TARGET-INV" @pytest_asyncio.fixture() async def target_user(client: AsyncClient, db_engine): """Register a regular user to be the impersonation target.""" factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) async with factory() as session: code = InviteCode(code=_TARGET_INVITE, uses_remaining=10) session.add(code) await session.commit() resp = await client.post("/api/v1/auth/register", json={ "email": _TARGET_EMAIL, "password": _TARGET_PASSWORD, "display_name": "Target User", "invite_code": _TARGET_INVITE, }) assert resp.status_code == 201 return resp.json() # ── Write-mode tests ──────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_impersonation_without_write_mode_blocks_writes( client: AsyncClient, admin_auth, target_user, ): """Read-only impersonation (default) should 403 on PUT /auth/me.""" # Start impersonation without write_mode resp = await client.post( f"/api/v1/admin/impersonate/{target_user['id']}", headers=admin_auth["headers"], ) assert resp.status_code == 200 imp_token = resp.json()["access_token"] imp_headers = {"Authorization": f"Bearer {imp_token}"} # Attempt a write operation — should be blocked resp = await client.put( "/api/v1/auth/me", headers=imp_headers, json={"display_name": "Hacked Name"}, ) assert resp.status_code == 403 assert "impersonation" in resp.json()["detail"].lower() @pytest.mark.asyncio async def test_impersonation_with_write_mode_allows_writes( client: AsyncClient, admin_auth, target_user, ): """Write-mode impersonation should not 403 on PUT /auth/me.""" # Start impersonation WITH write_mode resp = await client.post( f"/api/v1/admin/impersonate/{target_user['id']}", headers=admin_auth["headers"], json={"write_mode": True}, ) assert resp.status_code == 200 imp_token = resp.json()["access_token"] imp_headers = {"Authorization": f"Bearer {imp_token}"} # Attempt a write — should NOT get 403 from reject_impersonation resp = await client.put( "/api/v1/auth/me", headers=imp_headers, json={"display_name": "Updated Via WriteMode"}, ) # Should succeed (200) or at least not be a 403 assert resp.status_code != 403 # Verify the update actually took effect assert resp.status_code == 200 assert resp.json()["display_name"] == "Updated Via WriteMode" # ── Audit log endpoint tests ──────────────────────────────────────────────── @pytest.mark.asyncio async def test_impersonation_log_returns_entries( client: AsyncClient, admin_auth, target_user, ): """GET /admin/impersonation-log returns log entries with names.""" # Create some log entries by starting impersonation resp = await client.post( f"/api/v1/admin/impersonate/{target_user['id']}", headers=admin_auth["headers"], json={"write_mode": True}, ) assert resp.status_code == 200 # Fetch the log resp = await client.get( "/api/v1/admin/impersonation-log", headers=admin_auth["headers"], ) assert resp.status_code == 200 logs = resp.json() assert len(logs) >= 1 entry = logs[0] assert entry["admin_name"] == "Admin User" assert entry["target_name"] == "Target User" assert entry["action"] == "start" assert entry["write_mode"] is True assert "id" in entry assert "created_at" in entry @pytest.mark.asyncio async def test_impersonation_log_non_admin_forbidden( client: AsyncClient, target_user, ): """Non-admin users cannot access the impersonation log.""" # Login as the target (regular) user resp = await client.post("/api/v1/auth/login", json={ "email": _TARGET_EMAIL, "password": _TARGET_PASSWORD, }) assert resp.status_code == 200 user_headers = {"Authorization": f"Bearer {resp.json()['access_token']}"} resp = await client.get( "/api/v1/admin/impersonation-log", headers=user_headers, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_impersonation_log_pagination( client: AsyncClient, admin_auth, target_user, ): """Verify pagination params work on impersonation-log.""" # Create two entries for _ in range(2): await client.post( f"/api/v1/admin/impersonate/{target_user['id']}", headers=admin_auth["headers"], ) # Fetch page 1, page_size=1 resp = await client.get( "/api/v1/admin/impersonation-log", headers=admin_auth["headers"], params={"page": 1, "page_size": 1}, ) assert resp.status_code == 200 assert len(resp.json()) == 1 # Fetch page 2 resp = await client.get( "/api/v1/admin/impersonation-log", headers=admin_auth["headers"], params={"page": 2, "page_size": 1}, ) assert resp.status_code == 200 assert len(resp.json()) == 1