- "backend/routers/creator_dashboard.py" - "backend/tests/test_export.py" GSD-Task: S07/T01
426 lines
14 KiB
Python
426 lines
14 KiB
Python
"""Tests for the GDPR-style data export endpoint.
|
|
|
|
Standalone ASGI test — mocks the DB session to return canned model
|
|
instances. Verifies the endpoint returns a valid ZIP containing all
|
|
expected JSON files with correct structure.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import uuid
|
|
import zipfile
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock, MagicMock, PropertyMock
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
# Ensure backend/ is on sys.path
|
|
import pathlib
|
|
import sys
|
|
|
|
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent))
|
|
|
|
from auth import get_current_user # noqa: E402
|
|
from database import get_session # noqa: E402
|
|
from main import app # noqa: E402
|
|
from models import UserRole # noqa: E402
|
|
|
|
|
|
# ── Fixtures ─────────────────────────────────────────────────────────────────
|
|
|
|
CREATOR_ID = uuid.uuid4()
|
|
USER_ID = uuid.uuid4()
|
|
VIDEO_ID = uuid.uuid4()
|
|
MOMENT_ID = uuid.uuid4()
|
|
PAGE_ID = uuid.uuid4()
|
|
VERSION_ID = uuid.uuid4()
|
|
LINK_ID = uuid.uuid4()
|
|
CONSENT_ID = uuid.uuid4()
|
|
AUDIT_ID = uuid.uuid4()
|
|
POST_ID = uuid.uuid4()
|
|
ATTACHMENT_ID = uuid.uuid4()
|
|
HIGHLIGHT_ID = uuid.uuid4()
|
|
SHORT_ID = uuid.uuid4()
|
|
|
|
|
|
def _make_mock_user(*, has_creator: bool = True) -> MagicMock:
|
|
"""Build a mock User with optional creator link."""
|
|
user = MagicMock()
|
|
user.id = USER_ID
|
|
user.email = "test@example.com"
|
|
user.creator_id = CREATOR_ID if has_creator else None
|
|
user.role = UserRole.creator
|
|
return user
|
|
|
|
|
|
def _make_model_row(table_name: str, id_val: uuid.UUID, extra: dict[str, Any] | None = None) -> MagicMock:
|
|
"""Build a mock SQLAlchemy model row with a __table__.columns interface."""
|
|
row = MagicMock()
|
|
row.id = id_val
|
|
|
|
# Base columns every entity has
|
|
base = {
|
|
"id": id_val,
|
|
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
|
|
}
|
|
if extra:
|
|
base.update(extra)
|
|
|
|
# Build mock __table__.columns
|
|
columns = []
|
|
for key, val in base.items():
|
|
col = MagicMock()
|
|
col.key = key
|
|
columns.append(col)
|
|
setattr(row, key, val)
|
|
|
|
row.__table__ = MagicMock()
|
|
row.__table__.columns = columns
|
|
|
|
return row
|
|
|
|
|
|
def _make_creator_row():
|
|
return _make_model_row("creators", CREATOR_ID, {
|
|
"name": "Test Creator",
|
|
"slug": "test-creator",
|
|
"folder_name": "test_creator",
|
|
})
|
|
|
|
|
|
def _make_video_row():
|
|
return _make_model_row("source_videos", VIDEO_ID, {
|
|
"creator_id": CREATOR_ID,
|
|
"filename": "test.mp4",
|
|
"processing_status": "complete",
|
|
})
|
|
|
|
|
|
def _make_moment_row():
|
|
return _make_model_row("key_moments", MOMENT_ID, {
|
|
"source_video_id": VIDEO_ID,
|
|
"title": "Test Moment",
|
|
"summary": "A test moment",
|
|
})
|
|
|
|
|
|
def _make_page_row():
|
|
return _make_model_row("technique_pages", PAGE_ID, {
|
|
"creator_id": CREATOR_ID,
|
|
"title": "Test Page",
|
|
"slug": "test-page",
|
|
})
|
|
|
|
|
|
def _make_version_row():
|
|
return _make_model_row("technique_page_versions", VERSION_ID, {
|
|
"technique_page_id": PAGE_ID,
|
|
"version_number": 1,
|
|
"content_snapshot": {"title": "v1"},
|
|
})
|
|
|
|
|
|
def _make_link_row():
|
|
return _make_model_row("related_technique_links", LINK_ID, {
|
|
"source_page_id": PAGE_ID,
|
|
"target_page_id": uuid.uuid4(),
|
|
"relationship": "general_cross_reference",
|
|
})
|
|
|
|
|
|
def _make_consent_row():
|
|
return _make_model_row("video_consents", CONSENT_ID, {
|
|
"source_video_id": VIDEO_ID,
|
|
"creator_id": CREATOR_ID,
|
|
"kb_inclusion": True,
|
|
})
|
|
|
|
|
|
def _make_audit_row():
|
|
return _make_model_row("consent_audit_log", AUDIT_ID, {
|
|
"video_consent_id": CONSENT_ID,
|
|
"version": 1,
|
|
"field_name": "kb_inclusion",
|
|
"old_value": False,
|
|
"new_value": True,
|
|
})
|
|
|
|
|
|
def _make_post_row():
|
|
return _make_model_row("posts", POST_ID, {
|
|
"creator_id": CREATOR_ID,
|
|
"title": "Test Post",
|
|
"body_json": {"blocks": []},
|
|
})
|
|
|
|
|
|
def _make_attachment_row():
|
|
return _make_model_row("post_attachments", ATTACHMENT_ID, {
|
|
"post_id": POST_ID,
|
|
"filename": "file.pdf",
|
|
"object_key": "posts/file.pdf",
|
|
"content_type": "application/pdf",
|
|
"size_bytes": 1024,
|
|
})
|
|
|
|
|
|
def _make_highlight_row():
|
|
return _make_model_row("highlight_candidates", HIGHLIGHT_ID, {
|
|
"key_moment_id": MOMENT_ID,
|
|
"source_video_id": VIDEO_ID,
|
|
"score": 0.85,
|
|
"duration_secs": 30.0,
|
|
"status": "candidate",
|
|
})
|
|
|
|
|
|
def _make_short_row():
|
|
return _make_model_row("generated_shorts", SHORT_ID, {
|
|
"highlight_candidate_id": HIGHLIGHT_ID,
|
|
"format_preset": "vertical",
|
|
"width": 1080,
|
|
"height": 1920,
|
|
"status": "complete",
|
|
})
|
|
|
|
|
|
def _setup_db_responses(mock_session: AsyncMock) -> None:
|
|
"""Configure the mock DB session to return canned data for each query."""
|
|
creator_row = _make_creator_row()
|
|
video_row = _make_video_row()
|
|
moment_row = _make_moment_row()
|
|
page_row = _make_page_row()
|
|
version_row = _make_version_row()
|
|
link_row = _make_link_row()
|
|
consent_row = _make_consent_row()
|
|
audit_row = _make_audit_row()
|
|
post_row = _make_post_row()
|
|
attachment_row = _make_attachment_row()
|
|
highlight_row = _make_highlight_row()
|
|
short_row = _make_short_row()
|
|
|
|
call_count = 0
|
|
|
|
def _make_execute_result(scalar_one=None, scalars_all=None):
|
|
result = MagicMock()
|
|
if scalar_one is not None:
|
|
result.scalar_one_or_none.return_value = scalar_one
|
|
if scalars_all is not None:
|
|
result.scalars.return_value.all.return_value = scalars_all
|
|
return result
|
|
|
|
# The export endpoint issues queries in order:
|
|
# 1. Creator (scalar_one_or_none)
|
|
# 2. SourceVideo (scalars.all)
|
|
# 3. KeyMoment (scalars.all)
|
|
# 4. TechniquePage (scalars.all)
|
|
# 5. TechniquePageVersion (scalars.all)
|
|
# 6. RelatedTechniqueLink (scalars.all)
|
|
# 7. VideoConsent (scalars.all)
|
|
# 8. ConsentAuditLog (scalars.all)
|
|
# 9. Post (scalars.all)
|
|
# 10. PostAttachment (scalars.all)
|
|
# 11. HighlightCandidate (scalars.all)
|
|
# 12. GeneratedShort (scalars.all)
|
|
|
|
responses = [
|
|
_make_execute_result(scalar_one=creator_row), # Creator
|
|
_make_execute_result(scalars_all=[video_row]), # SourceVideo
|
|
_make_execute_result(scalars_all=[moment_row]), # KeyMoment
|
|
_make_execute_result(scalars_all=[page_row]), # TechniquePage
|
|
_make_execute_result(scalars_all=[version_row]), # TechniquePageVersion
|
|
_make_execute_result(scalars_all=[link_row]), # RelatedTechniqueLink
|
|
_make_execute_result(scalars_all=[consent_row]), # VideoConsent
|
|
_make_execute_result(scalars_all=[audit_row]), # ConsentAuditLog
|
|
_make_execute_result(scalars_all=[post_row]), # Post
|
|
_make_execute_result(scalars_all=[attachment_row]), # PostAttachment
|
|
_make_execute_result(scalars_all=[highlight_row]), # HighlightCandidate
|
|
_make_execute_result(scalars_all=[short_row]), # GeneratedShort
|
|
]
|
|
|
|
async def _execute_side_effect(*args, **kwargs):
|
|
nonlocal call_count
|
|
idx = min(call_count, len(responses) - 1)
|
|
call_count += 1
|
|
return responses[idx]
|
|
|
|
mock_session.execute = AsyncMock(side_effect=_execute_side_effect)
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def export_client():
|
|
"""Async HTTP test client with mocked auth and DB session."""
|
|
mock_user = _make_mock_user(has_creator=True)
|
|
mock_session = AsyncMock()
|
|
_setup_db_responses(mock_session)
|
|
|
|
async def _mock_get_session():
|
|
yield mock_session
|
|
|
|
app.dependency_overrides[get_session] = _mock_get_session
|
|
app.dependency_overrides[get_current_user] = lambda: mock_user
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://testserver/api/v1") as ac:
|
|
yield ac
|
|
|
|
app.dependency_overrides.pop(get_session, None)
|
|
app.dependency_overrides.pop(get_current_user, None)
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def no_creator_client():
|
|
"""Client where the user has no linked creator profile."""
|
|
mock_user = _make_mock_user(has_creator=False)
|
|
|
|
async def _mock_get_session():
|
|
yield AsyncMock()
|
|
|
|
app.dependency_overrides[get_session] = _mock_get_session
|
|
app.dependency_overrides[get_current_user] = lambda: mock_user
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://testserver/api/v1") as ac:
|
|
yield ac
|
|
|
|
app.dependency_overrides.pop(get_session, None)
|
|
app.dependency_overrides.pop(get_current_user, None)
|
|
|
|
|
|
# ── Tests ────────────────────────────────────────────────────────────────────
|
|
|
|
EXPECTED_JSON_FILES = {
|
|
"export_metadata.json",
|
|
"creators.json",
|
|
"source_videos.json",
|
|
"key_moments.json",
|
|
"technique_pages.json",
|
|
"technique_page_versions.json",
|
|
"related_technique_links.json",
|
|
"video_consents.json",
|
|
"consent_audit_log.json",
|
|
"posts.json",
|
|
"post_attachments.json",
|
|
"highlight_candidates.json",
|
|
"generated_shorts.json",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_returns_valid_zip(export_client: AsyncClient):
|
|
"""Endpoint returns a ZIP containing all expected JSON files."""
|
|
resp = await export_client.get("/creator/export")
|
|
assert resp.status_code == 200
|
|
assert resp.headers["content-type"] == "application/zip"
|
|
assert "content-disposition" in resp.headers
|
|
assert "chrysopedia-export-" in resp.headers["content-disposition"]
|
|
|
|
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
|
names = set(zf.namelist())
|
|
assert names == EXPECTED_JSON_FILES
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_json_files_are_valid(export_client: AsyncClient):
|
|
"""Each JSON file in the ZIP is valid JSON with a list at the top level."""
|
|
resp = await export_client.get("/creator/export")
|
|
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
|
|
|
for name in zf.namelist():
|
|
data = json.loads(zf.read(name))
|
|
if name == "export_metadata.json":
|
|
# Metadata is a dict, not a list
|
|
assert isinstance(data, dict)
|
|
assert "export_timestamp" in data
|
|
assert "creator_id" in data
|
|
assert data["creator_id"] == str(CREATOR_ID)
|
|
else:
|
|
assert isinstance(data, list), f"{name} should be a list"
|
|
assert len(data) >= 1, f"{name} should have at least one entry"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_creators_json_content(export_client: AsyncClient):
|
|
"""Creators JSON file contains the expected creator data."""
|
|
resp = await export_client.get("/creator/export")
|
|
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
|
creators = json.loads(zf.read("creators.json"))
|
|
assert len(creators) == 1
|
|
assert creators[0]["name"] == "Test Creator"
|
|
assert creators[0]["slug"] == "test-creator"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_uuids_serialize_as_strings(export_client: AsyncClient):
|
|
"""UUIDs in the JSON output are serialized as strings, not crashing."""
|
|
resp = await export_client.get("/creator/export")
|
|
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
|
|
|
creators = json.loads(zf.read("creators.json"))
|
|
# ID should be a string representation of UUID
|
|
creator_id_str = creators[0]["id"]
|
|
assert isinstance(creator_id_str, str)
|
|
uuid.UUID(creator_id_str) # Should not raise
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_datetimes_serialize(export_client: AsyncClient):
|
|
"""Datetimes serialize correctly as ISO strings."""
|
|
resp = await export_client.get("/creator/export")
|
|
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
|
|
|
creators = json.loads(zf.read("creators.json"))
|
|
created_at = creators[0]["created_at"]
|
|
assert isinstance(created_at, str)
|
|
assert "2025" in created_at
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_related_links_include_cross_references(export_client: AsyncClient):
|
|
"""Related technique links file includes cross-creator references."""
|
|
resp = await export_client.get("/creator/export")
|
|
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
|
|
|
links = json.loads(zf.read("related_technique_links.json"))
|
|
assert len(links) >= 1
|
|
link = links[0]
|
|
assert "source_page_id" in link
|
|
assert "target_page_id" in link
|
|
assert "relationship" in link
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_metadata_has_required_fields(export_client: AsyncClient):
|
|
"""export_metadata.json has timestamp, creator_id, and note."""
|
|
resp = await export_client.get("/creator/export")
|
|
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
|
|
|
meta = json.loads(zf.read("export_metadata.json"))
|
|
assert "export_timestamp" in meta
|
|
assert "creator_id" in meta
|
|
assert "note" in meta
|
|
assert "file_count" in meta
|
|
assert meta["file_count"] == 12 # 12 data files
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_requires_creator_link(no_creator_client: AsyncClient):
|
|
"""404 when the user has no linked creator profile."""
|
|
resp = await no_creator_client.get("/creator/export")
|
|
assert resp.status_code == 404
|
|
assert "No creator profile" in resp.json()["detail"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_requires_auth():
|
|
"""401 when no auth token is provided (default dependency, no override)."""
|
|
# Use a fresh app without dependency overrides
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
|
|
resp = await ac.get("/api/v1/creator/export")
|
|
assert resp.status_code in (401, 403)
|