- "backend/auth.py" - "backend/models.py" - "backend/routers/admin.py" - "backend/tests/test_impersonation.py" GSD-Task: S07/T01
174 lines
5.6 KiB
Python
174 lines
5.6 KiB
Python
"""Integration tests for impersonation write-mode and audit log."""
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
from models import InviteCode, User, UserRole
|
|
|
|
# Re-use fixtures from conftest: db_engine, client, admin_auth
|
|
|
|
|
|
_TARGET_EMAIL = "impersonate-target@chrysopedia.com"
|
|
_TARGET_PASSWORD = "targetpass123"
|
|
_TARGET_INVITE = "IMP-TARGET-INV"
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def target_user(client: AsyncClient, db_engine):
|
|
"""Register a regular user to be the impersonation target."""
|
|
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
async with factory() as session:
|
|
code = InviteCode(code=_TARGET_INVITE, uses_remaining=10)
|
|
session.add(code)
|
|
await session.commit()
|
|
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": _TARGET_EMAIL,
|
|
"password": _TARGET_PASSWORD,
|
|
"display_name": "Target User",
|
|
"invite_code": _TARGET_INVITE,
|
|
})
|
|
assert resp.status_code == 201
|
|
return resp.json()
|
|
|
|
|
|
# ── Write-mode tests ────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_impersonation_without_write_mode_blocks_writes(
|
|
client: AsyncClient, admin_auth, target_user,
|
|
):
|
|
"""Read-only impersonation (default) should 403 on PUT /auth/me."""
|
|
# Start impersonation without write_mode
|
|
resp = await client.post(
|
|
f"/api/v1/admin/impersonate/{target_user['id']}",
|
|
headers=admin_auth["headers"],
|
|
)
|
|
assert resp.status_code == 200
|
|
imp_token = resp.json()["access_token"]
|
|
imp_headers = {"Authorization": f"Bearer {imp_token}"}
|
|
|
|
# Attempt a write operation — should be blocked
|
|
resp = await client.put(
|
|
"/api/v1/auth/me",
|
|
headers=imp_headers,
|
|
json={"display_name": "Hacked Name"},
|
|
)
|
|
assert resp.status_code == 403
|
|
assert "impersonation" in resp.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_impersonation_with_write_mode_allows_writes(
|
|
client: AsyncClient, admin_auth, target_user,
|
|
):
|
|
"""Write-mode impersonation should not 403 on PUT /auth/me."""
|
|
# Start impersonation WITH write_mode
|
|
resp = await client.post(
|
|
f"/api/v1/admin/impersonate/{target_user['id']}",
|
|
headers=admin_auth["headers"],
|
|
json={"write_mode": True},
|
|
)
|
|
assert resp.status_code == 200
|
|
imp_token = resp.json()["access_token"]
|
|
imp_headers = {"Authorization": f"Bearer {imp_token}"}
|
|
|
|
# Attempt a write — should NOT get 403 from reject_impersonation
|
|
resp = await client.put(
|
|
"/api/v1/auth/me",
|
|
headers=imp_headers,
|
|
json={"display_name": "Updated Via WriteMode"},
|
|
)
|
|
# Should succeed (200) or at least not be a 403
|
|
assert resp.status_code != 403
|
|
# Verify the update actually took effect
|
|
assert resp.status_code == 200
|
|
assert resp.json()["display_name"] == "Updated Via WriteMode"
|
|
|
|
|
|
# ── Audit log endpoint tests ────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_impersonation_log_returns_entries(
|
|
client: AsyncClient, admin_auth, target_user,
|
|
):
|
|
"""GET /admin/impersonation-log returns log entries with names."""
|
|
# Create some log entries by starting impersonation
|
|
resp = await client.post(
|
|
f"/api/v1/admin/impersonate/{target_user['id']}",
|
|
headers=admin_auth["headers"],
|
|
json={"write_mode": True},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
# Fetch the log
|
|
resp = await client.get(
|
|
"/api/v1/admin/impersonation-log",
|
|
headers=admin_auth["headers"],
|
|
)
|
|
assert resp.status_code == 200
|
|
logs = resp.json()
|
|
assert len(logs) >= 1
|
|
|
|
entry = logs[0]
|
|
assert entry["admin_name"] == "Admin User"
|
|
assert entry["target_name"] == "Target User"
|
|
assert entry["action"] == "start"
|
|
assert entry["write_mode"] is True
|
|
assert "id" in entry
|
|
assert "created_at" in entry
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_impersonation_log_non_admin_forbidden(
|
|
client: AsyncClient, target_user,
|
|
):
|
|
"""Non-admin users cannot access the impersonation log."""
|
|
# Login as the target (regular) user
|
|
resp = await client.post("/api/v1/auth/login", json={
|
|
"email": _TARGET_EMAIL,
|
|
"password": _TARGET_PASSWORD,
|
|
})
|
|
assert resp.status_code == 200
|
|
user_headers = {"Authorization": f"Bearer {resp.json()['access_token']}"}
|
|
|
|
resp = await client.get(
|
|
"/api/v1/admin/impersonation-log",
|
|
headers=user_headers,
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_impersonation_log_pagination(
|
|
client: AsyncClient, admin_auth, target_user,
|
|
):
|
|
"""Verify pagination params work on impersonation-log."""
|
|
# Create two entries
|
|
for _ in range(2):
|
|
await client.post(
|
|
f"/api/v1/admin/impersonate/{target_user['id']}",
|
|
headers=admin_auth["headers"],
|
|
)
|
|
|
|
# Fetch page 1, page_size=1
|
|
resp = await client.get(
|
|
"/api/v1/admin/impersonation-log",
|
|
headers=admin_auth["headers"],
|
|
params={"page": 1, "page_size": 1},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert len(resp.json()) == 1
|
|
|
|
# Fetch page 2
|
|
resp = await client.get(
|
|
"/api/v1/admin/impersonation-log",
|
|
headers=admin_auth["headers"],
|
|
params={"page": 2, "page_size": 1},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert len(resp.json()) == 1
|