"""Tests for the GDPR-style data export endpoint. Standalone ASGI test — mocks the DB session to return canned model instances. Verifies the endpoint returns a valid ZIP containing all expected JSON files with correct structure. """ from __future__ import annotations import io import json import uuid import zipfile from datetime import datetime, timezone from typing import Any from unittest.mock import AsyncMock, MagicMock, PropertyMock import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient # Ensure backend/ is on sys.path import pathlib import sys sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent)) from auth import get_current_user # noqa: E402 from database import get_session # noqa: E402 from main import app # noqa: E402 from models import UserRole # noqa: E402 # ── Fixtures ───────────────────────────────────────────────────────────────── CREATOR_ID = uuid.uuid4() USER_ID = uuid.uuid4() VIDEO_ID = uuid.uuid4() MOMENT_ID = uuid.uuid4() PAGE_ID = uuid.uuid4() VERSION_ID = uuid.uuid4() LINK_ID = uuid.uuid4() CONSENT_ID = uuid.uuid4() AUDIT_ID = uuid.uuid4() POST_ID = uuid.uuid4() ATTACHMENT_ID = uuid.uuid4() HIGHLIGHT_ID = uuid.uuid4() SHORT_ID = uuid.uuid4() def _make_mock_user(*, has_creator: bool = True) -> MagicMock: """Build a mock User with optional creator link.""" user = MagicMock() user.id = USER_ID user.email = "test@example.com" user.creator_id = CREATOR_ID if has_creator else None user.role = UserRole.creator return user def _make_model_row(table_name: str, id_val: uuid.UUID, extra: dict[str, Any] | None = None) -> MagicMock: """Build a mock SQLAlchemy model row with a __table__.columns interface.""" row = MagicMock() row.id = id_val # Base columns every entity has base = { "id": id_val, "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } if extra: base.update(extra) # Build mock __table__.columns columns = [] for key, val in base.items(): col = MagicMock() col.key = key columns.append(col) setattr(row, key, val) row.__table__ = MagicMock() row.__table__.columns = columns return row def _make_creator_row(): return _make_model_row("creators", CREATOR_ID, { "name": "Test Creator", "slug": "test-creator", "folder_name": "test_creator", }) def _make_video_row(): return _make_model_row("source_videos", VIDEO_ID, { "creator_id": CREATOR_ID, "filename": "test.mp4", "processing_status": "complete", }) def _make_moment_row(): return _make_model_row("key_moments", MOMENT_ID, { "source_video_id": VIDEO_ID, "title": "Test Moment", "summary": "A test moment", }) def _make_page_row(): return _make_model_row("technique_pages", PAGE_ID, { "creator_id": CREATOR_ID, "title": "Test Page", "slug": "test-page", }) def _make_version_row(): return _make_model_row("technique_page_versions", VERSION_ID, { "technique_page_id": PAGE_ID, "version_number": 1, "content_snapshot": {"title": "v1"}, }) def _make_link_row(): return _make_model_row("related_technique_links", LINK_ID, { "source_page_id": PAGE_ID, "target_page_id": uuid.uuid4(), "relationship": "general_cross_reference", }) def _make_consent_row(): return _make_model_row("video_consents", CONSENT_ID, { "source_video_id": VIDEO_ID, "creator_id": CREATOR_ID, "kb_inclusion": True, }) def _make_audit_row(): return _make_model_row("consent_audit_log", AUDIT_ID, { "video_consent_id": CONSENT_ID, "version": 1, "field_name": "kb_inclusion", "old_value": False, "new_value": True, }) def _make_post_row(): return _make_model_row("posts", POST_ID, { "creator_id": CREATOR_ID, "title": "Test Post", "body_json": {"blocks": []}, }) def _make_attachment_row(): return _make_model_row("post_attachments", ATTACHMENT_ID, { "post_id": POST_ID, "filename": "file.pdf", "object_key": "posts/file.pdf", "content_type": "application/pdf", "size_bytes": 1024, }) def _make_highlight_row(): return _make_model_row("highlight_candidates", HIGHLIGHT_ID, { "key_moment_id": MOMENT_ID, "source_video_id": VIDEO_ID, "score": 0.85, "duration_secs": 30.0, "status": "candidate", }) def _make_short_row(): return _make_model_row("generated_shorts", SHORT_ID, { "highlight_candidate_id": HIGHLIGHT_ID, "format_preset": "vertical", "width": 1080, "height": 1920, "status": "complete", }) def _setup_db_responses(mock_session: AsyncMock) -> None: """Configure the mock DB session to return canned data for each query.""" creator_row = _make_creator_row() video_row = _make_video_row() moment_row = _make_moment_row() page_row = _make_page_row() version_row = _make_version_row() link_row = _make_link_row() consent_row = _make_consent_row() audit_row = _make_audit_row() post_row = _make_post_row() attachment_row = _make_attachment_row() highlight_row = _make_highlight_row() short_row = _make_short_row() call_count = 0 def _make_execute_result(scalar_one=None, scalars_all=None): result = MagicMock() if scalar_one is not None: result.scalar_one_or_none.return_value = scalar_one if scalars_all is not None: result.scalars.return_value.all.return_value = scalars_all return result # The export endpoint issues queries in order: # 1. Creator (scalar_one_or_none) # 2. SourceVideo (scalars.all) # 3. KeyMoment (scalars.all) # 4. TechniquePage (scalars.all) # 5. TechniquePageVersion (scalars.all) # 6. RelatedTechniqueLink (scalars.all) # 7. VideoConsent (scalars.all) # 8. ConsentAuditLog (scalars.all) # 9. Post (scalars.all) # 10. PostAttachment (scalars.all) # 11. HighlightCandidate (scalars.all) # 12. GeneratedShort (scalars.all) responses = [ _make_execute_result(scalar_one=creator_row), # Creator _make_execute_result(scalars_all=[video_row]), # SourceVideo _make_execute_result(scalars_all=[moment_row]), # KeyMoment _make_execute_result(scalars_all=[page_row]), # TechniquePage _make_execute_result(scalars_all=[version_row]), # TechniquePageVersion _make_execute_result(scalars_all=[link_row]), # RelatedTechniqueLink _make_execute_result(scalars_all=[consent_row]), # VideoConsent _make_execute_result(scalars_all=[audit_row]), # ConsentAuditLog _make_execute_result(scalars_all=[post_row]), # Post _make_execute_result(scalars_all=[attachment_row]), # PostAttachment _make_execute_result(scalars_all=[highlight_row]), # HighlightCandidate _make_execute_result(scalars_all=[short_row]), # GeneratedShort ] async def _execute_side_effect(*args, **kwargs): nonlocal call_count idx = min(call_count, len(responses) - 1) call_count += 1 return responses[idx] mock_session.execute = AsyncMock(side_effect=_execute_side_effect) @pytest_asyncio.fixture() async def export_client(): """Async HTTP test client with mocked auth and DB session.""" mock_user = _make_mock_user(has_creator=True) mock_session = AsyncMock() _setup_db_responses(mock_session) async def _mock_get_session(): yield mock_session app.dependency_overrides[get_session] = _mock_get_session app.dependency_overrides[get_current_user] = lambda: mock_user transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver/api/v1") as ac: yield ac app.dependency_overrides.pop(get_session, None) app.dependency_overrides.pop(get_current_user, None) @pytest_asyncio.fixture() async def no_creator_client(): """Client where the user has no linked creator profile.""" mock_user = _make_mock_user(has_creator=False) async def _mock_get_session(): yield AsyncMock() app.dependency_overrides[get_session] = _mock_get_session app.dependency_overrides[get_current_user] = lambda: mock_user transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver/api/v1") as ac: yield ac app.dependency_overrides.pop(get_session, None) app.dependency_overrides.pop(get_current_user, None) # ── Tests ──────────────────────────────────────────────────────────────────── EXPECTED_JSON_FILES = { "export_metadata.json", "creators.json", "source_videos.json", "key_moments.json", "technique_pages.json", "technique_page_versions.json", "related_technique_links.json", "video_consents.json", "consent_audit_log.json", "posts.json", "post_attachments.json", "highlight_candidates.json", "generated_shorts.json", } @pytest.mark.asyncio async def test_export_returns_valid_zip(export_client: AsyncClient): """Endpoint returns a ZIP containing all expected JSON files.""" resp = await export_client.get("/creator/export") assert resp.status_code == 200 assert resp.headers["content-type"] == "application/zip" assert "content-disposition" in resp.headers assert "chrysopedia-export-" in resp.headers["content-disposition"] zf = zipfile.ZipFile(io.BytesIO(resp.content)) names = set(zf.namelist()) assert names == EXPECTED_JSON_FILES @pytest.mark.asyncio async def test_export_json_files_are_valid(export_client: AsyncClient): """Each JSON file in the ZIP is valid JSON with a list at the top level.""" resp = await export_client.get("/creator/export") zf = zipfile.ZipFile(io.BytesIO(resp.content)) for name in zf.namelist(): data = json.loads(zf.read(name)) if name == "export_metadata.json": # Metadata is a dict, not a list assert isinstance(data, dict) assert "export_timestamp" in data assert "creator_id" in data assert data["creator_id"] == str(CREATOR_ID) else: assert isinstance(data, list), f"{name} should be a list" assert len(data) >= 1, f"{name} should have at least one entry" @pytest.mark.asyncio async def test_export_creators_json_content(export_client: AsyncClient): """Creators JSON file contains the expected creator data.""" resp = await export_client.get("/creator/export") zf = zipfile.ZipFile(io.BytesIO(resp.content)) creators = json.loads(zf.read("creators.json")) assert len(creators) == 1 assert creators[0]["name"] == "Test Creator" assert creators[0]["slug"] == "test-creator" @pytest.mark.asyncio async def test_export_uuids_serialize_as_strings(export_client: AsyncClient): """UUIDs in the JSON output are serialized as strings, not crashing.""" resp = await export_client.get("/creator/export") zf = zipfile.ZipFile(io.BytesIO(resp.content)) creators = json.loads(zf.read("creators.json")) # ID should be a string representation of UUID creator_id_str = creators[0]["id"] assert isinstance(creator_id_str, str) uuid.UUID(creator_id_str) # Should not raise @pytest.mark.asyncio async def test_export_datetimes_serialize(export_client: AsyncClient): """Datetimes serialize correctly as ISO strings.""" resp = await export_client.get("/creator/export") zf = zipfile.ZipFile(io.BytesIO(resp.content)) creators = json.loads(zf.read("creators.json")) created_at = creators[0]["created_at"] assert isinstance(created_at, str) assert "2025" in created_at @pytest.mark.asyncio async def test_export_related_links_include_cross_references(export_client: AsyncClient): """Related technique links file includes cross-creator references.""" resp = await export_client.get("/creator/export") zf = zipfile.ZipFile(io.BytesIO(resp.content)) links = json.loads(zf.read("related_technique_links.json")) assert len(links) >= 1 link = links[0] assert "source_page_id" in link assert "target_page_id" in link assert "relationship" in link @pytest.mark.asyncio async def test_export_metadata_has_required_fields(export_client: AsyncClient): """export_metadata.json has timestamp, creator_id, and note.""" resp = await export_client.get("/creator/export") zf = zipfile.ZipFile(io.BytesIO(resp.content)) meta = json.loads(zf.read("export_metadata.json")) assert "export_timestamp" in meta assert "creator_id" in meta assert "note" in meta assert "file_count" in meta assert meta["file_count"] == 12 # 12 data files @pytest.mark.asyncio async def test_export_requires_creator_link(no_creator_client: AsyncClient): """404 when the user has no linked creator profile.""" resp = await no_creator_client.get("/creator/export") assert resp.status_code == 404 assert "No creator profile" in resp.json()["detail"] @pytest.mark.asyncio async def test_export_requires_auth(): """401 when no auth token is provided (default dependency, no override).""" # Use a fresh app without dependency overrides transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as ac: resp = await ac.get("/api/v1/creator/export") assert resp.status_code in (401, 403)