chrysopedia/backend/tests/test_impersonation.py
jlightner ab9dd2aa1b feat: Added write_mode support to impersonation tokens with conditional…
- "backend/auth.py"
- "backend/models.py"
- "backend/routers/admin.py"
- "backend/tests/test_impersonation.py"

GSD-Task: S07/T01
2026-04-04 06:24:04 +00:00

174 lines
5.6 KiB
Python

"""Integration tests for impersonation write-mode and audit log."""
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from models import InviteCode, User, UserRole
# Re-use fixtures from conftest: db_engine, client, admin_auth
_TARGET_EMAIL = "impersonate-target@chrysopedia.com"
_TARGET_PASSWORD = "targetpass123"
_TARGET_INVITE = "IMP-TARGET-INV"
@pytest_asyncio.fixture()
async def target_user(client: AsyncClient, db_engine):
"""Register a regular user to be the impersonation target."""
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
code = InviteCode(code=_TARGET_INVITE, uses_remaining=10)
session.add(code)
await session.commit()
resp = await client.post("/api/v1/auth/register", json={
"email": _TARGET_EMAIL,
"password": _TARGET_PASSWORD,
"display_name": "Target User",
"invite_code": _TARGET_INVITE,
})
assert resp.status_code == 201
return resp.json()
# ── Write-mode tests ────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_impersonation_without_write_mode_blocks_writes(
client: AsyncClient, admin_auth, target_user,
):
"""Read-only impersonation (default) should 403 on PUT /auth/me."""
# Start impersonation without write_mode
resp = await client.post(
f"/api/v1/admin/impersonate/{target_user['id']}",
headers=admin_auth["headers"],
)
assert resp.status_code == 200
imp_token = resp.json()["access_token"]
imp_headers = {"Authorization": f"Bearer {imp_token}"}
# Attempt a write operation — should be blocked
resp = await client.put(
"/api/v1/auth/me",
headers=imp_headers,
json={"display_name": "Hacked Name"},
)
assert resp.status_code == 403
assert "impersonation" in resp.json()["detail"].lower()
@pytest.mark.asyncio
async def test_impersonation_with_write_mode_allows_writes(
client: AsyncClient, admin_auth, target_user,
):
"""Write-mode impersonation should not 403 on PUT /auth/me."""
# Start impersonation WITH write_mode
resp = await client.post(
f"/api/v1/admin/impersonate/{target_user['id']}",
headers=admin_auth["headers"],
json={"write_mode": True},
)
assert resp.status_code == 200
imp_token = resp.json()["access_token"]
imp_headers = {"Authorization": f"Bearer {imp_token}"}
# Attempt a write — should NOT get 403 from reject_impersonation
resp = await client.put(
"/api/v1/auth/me",
headers=imp_headers,
json={"display_name": "Updated Via WriteMode"},
)
# Should succeed (200) or at least not be a 403
assert resp.status_code != 403
# Verify the update actually took effect
assert resp.status_code == 200
assert resp.json()["display_name"] == "Updated Via WriteMode"
# ── Audit log endpoint tests ────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_impersonation_log_returns_entries(
client: AsyncClient, admin_auth, target_user,
):
"""GET /admin/impersonation-log returns log entries with names."""
# Create some log entries by starting impersonation
resp = await client.post(
f"/api/v1/admin/impersonate/{target_user['id']}",
headers=admin_auth["headers"],
json={"write_mode": True},
)
assert resp.status_code == 200
# Fetch the log
resp = await client.get(
"/api/v1/admin/impersonation-log",
headers=admin_auth["headers"],
)
assert resp.status_code == 200
logs = resp.json()
assert len(logs) >= 1
entry = logs[0]
assert entry["admin_name"] == "Admin User"
assert entry["target_name"] == "Target User"
assert entry["action"] == "start"
assert entry["write_mode"] is True
assert "id" in entry
assert "created_at" in entry
@pytest.mark.asyncio
async def test_impersonation_log_non_admin_forbidden(
client: AsyncClient, target_user,
):
"""Non-admin users cannot access the impersonation log."""
# Login as the target (regular) user
resp = await client.post("/api/v1/auth/login", json={
"email": _TARGET_EMAIL,
"password": _TARGET_PASSWORD,
})
assert resp.status_code == 200
user_headers = {"Authorization": f"Bearer {resp.json()['access_token']}"}
resp = await client.get(
"/api/v1/admin/impersonation-log",
headers=user_headers,
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_impersonation_log_pagination(
client: AsyncClient, admin_auth, target_user,
):
"""Verify pagination params work on impersonation-log."""
# Create two entries
for _ in range(2):
await client.post(
f"/api/v1/admin/impersonate/{target_user['id']}",
headers=admin_auth["headers"],
)
# Fetch page 1, page_size=1
resp = await client.get(
"/api/v1/admin/impersonation-log",
headers=admin_auth["headers"],
params={"page": 1, "page_size": 1},
)
assert resp.status_code == 200
assert len(resp.json()) == 1
# Fetch page 2
resp = await client.get(
"/api/v1/admin/impersonation-log",
headers=admin_auth["headers"],
params={"page": 2, "page_size": 1},
)
assert resp.status_code == 200
assert len(resp.json()) == 1