chore: Added GET /creator/export endpoint that returns a ZIP archive co…
- "backend/routers/creator_dashboard.py" - "backend/tests/test_export.py" GSD-Task: S07/T01
This commit is contained in:
parent
3d16a5d9e8
commit
0b8dcf2ccf
2 changed files with 650 additions and 1 deletions
|
|
@ -2,26 +2,39 @@
|
|||
|
||||
Returns aggregate counts (videos, technique pages, key moments, search
|
||||
impressions) and content lists for the logged-in creator's dashboard.
|
||||
Includes a GDPR-style data export endpoint.
|
||||
"""
|
||||
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import zipfile
|
||||
from datetime import datetime, timezone
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from sqlalchemy import func, select
|
||||
from fastapi.responses import StreamingResponse
|
||||
from sqlalchemy import func, or_, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from auth import get_current_user
|
||||
from database import get_session
|
||||
from models import (
|
||||
ConsentAuditLog,
|
||||
Creator,
|
||||
GeneratedShort,
|
||||
HighlightCandidate,
|
||||
KeyMoment,
|
||||
Post,
|
||||
PostAttachment,
|
||||
RelatedTechniqueLink,
|
||||
SearchLog,
|
||||
SourceVideo,
|
||||
TechniquePage,
|
||||
TechniquePageVersion,
|
||||
User,
|
||||
VideoConsent,
|
||||
)
|
||||
from schemas import (
|
||||
CreatorDashboardResponse,
|
||||
|
|
@ -318,3 +331,213 @@ async def get_creator_transparency(
|
|||
source_videos=source_videos,
|
||||
tags=sorted(all_tags),
|
||||
)
|
||||
|
||||
|
||||
# ── Helpers for data export ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _row_to_dict(row) -> dict:
|
||||
"""Convert a SQLAlchemy model instance to a JSON-serialisable dict.
|
||||
|
||||
Handles UUIDs and datetimes via default=str on the final JSON dump.
|
||||
Skips internal SQLAlchemy state attributes.
|
||||
"""
|
||||
d = {}
|
||||
for col in row.__table__.columns:
|
||||
val = getattr(row, col.key, None)
|
||||
d[col.key] = val
|
||||
return d
|
||||
|
||||
|
||||
# ── Data Export (GDPR-style) ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
@router.get("/export")
|
||||
async def export_creator_data(
|
||||
current_user: Annotated[User, Depends(get_current_user)],
|
||||
db: AsyncSession = Depends(get_session),
|
||||
) -> StreamingResponse:
|
||||
"""Export all data derived from the authenticated creator's content.
|
||||
|
||||
Returns a ZIP archive containing one JSON file per table, plus an
|
||||
export_metadata.json. Binary attachments (videos, files) are not
|
||||
included — only metadata and derived content.
|
||||
"""
|
||||
if current_user.creator_id is None:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="No creator profile linked to this account",
|
||||
)
|
||||
|
||||
creator_id = current_user.creator_id
|
||||
|
||||
# Verify creator exists
|
||||
creator = (await db.execute(
|
||||
select(Creator).where(Creator.id == creator_id)
|
||||
)).scalar_one_or_none()
|
||||
if creator is None:
|
||||
logger.error(
|
||||
"Export: user %s has creator_id %s but creator row missing",
|
||||
current_user.id, creator_id,
|
||||
)
|
||||
raise HTTPException(status_code=404, detail="Linked creator profile not found")
|
||||
|
||||
logger.info("Data export started for creator %s", creator_id)
|
||||
|
||||
# ── Query all creator-owned tables ───────────────────────────────────
|
||||
|
||||
# 1. Creator profile
|
||||
creators_data = [_row_to_dict(creator)]
|
||||
|
||||
# 2. Source videos
|
||||
videos = (await db.execute(
|
||||
select(SourceVideo).where(SourceVideo.creator_id == creator_id)
|
||||
)).scalars().all()
|
||||
videos_data = [_row_to_dict(v) for v in videos]
|
||||
video_ids = [v.id for v in videos]
|
||||
|
||||
# 3. Key moments (via source videos)
|
||||
if video_ids:
|
||||
moments = (await db.execute(
|
||||
select(KeyMoment).where(KeyMoment.source_video_id.in_(video_ids))
|
||||
)).scalars().all()
|
||||
else:
|
||||
moments = []
|
||||
moments_data = [_row_to_dict(m) for m in moments]
|
||||
moment_ids = [m.id for m in moments]
|
||||
|
||||
# 4. Technique pages
|
||||
pages = (await db.execute(
|
||||
select(TechniquePage).where(TechniquePage.creator_id == creator_id)
|
||||
)).scalars().all()
|
||||
pages_data = [_row_to_dict(p) for p in pages]
|
||||
page_ids = [p.id for p in pages]
|
||||
|
||||
# 5. Technique page versions
|
||||
if page_ids:
|
||||
versions = (await db.execute(
|
||||
select(TechniquePageVersion).where(
|
||||
TechniquePageVersion.technique_page_id.in_(page_ids)
|
||||
)
|
||||
)).scalars().all()
|
||||
else:
|
||||
versions = []
|
||||
versions_data = [_row_to_dict(v) for v in versions]
|
||||
|
||||
# 6. Related technique links (both directions)
|
||||
if page_ids:
|
||||
links = (await db.execute(
|
||||
select(RelatedTechniqueLink).where(
|
||||
or_(
|
||||
RelatedTechniqueLink.source_page_id.in_(page_ids),
|
||||
RelatedTechniqueLink.target_page_id.in_(page_ids),
|
||||
)
|
||||
)
|
||||
)).scalars().all()
|
||||
else:
|
||||
links = []
|
||||
links_data = [_row_to_dict(lnk) for lnk in links]
|
||||
|
||||
# 7. Video consents + audit log
|
||||
consents = (await db.execute(
|
||||
select(VideoConsent).where(VideoConsent.creator_id == creator_id)
|
||||
)).scalars().all()
|
||||
consents_data = [_row_to_dict(c) for c in consents]
|
||||
consent_ids = [c.id for c in consents]
|
||||
|
||||
if consent_ids:
|
||||
audit_entries = (await db.execute(
|
||||
select(ConsentAuditLog).where(
|
||||
ConsentAuditLog.video_consent_id.in_(consent_ids)
|
||||
)
|
||||
)).scalars().all()
|
||||
else:
|
||||
audit_entries = []
|
||||
audit_data = [_row_to_dict(a) for a in audit_entries]
|
||||
|
||||
# 8. Posts + post attachments (metadata only)
|
||||
posts = (await db.execute(
|
||||
select(Post).where(Post.creator_id == creator_id)
|
||||
)).scalars().all()
|
||||
posts_data = [_row_to_dict(p) for p in posts]
|
||||
post_ids = [p.id for p in posts]
|
||||
|
||||
if post_ids:
|
||||
attachments = (await db.execute(
|
||||
select(PostAttachment).where(PostAttachment.post_id.in_(post_ids))
|
||||
)).scalars().all()
|
||||
else:
|
||||
attachments = []
|
||||
attachments_data = [_row_to_dict(a) for a in attachments]
|
||||
|
||||
# 9. Highlight candidates (via key moments)
|
||||
if moment_ids:
|
||||
highlights = (await db.execute(
|
||||
select(HighlightCandidate).where(
|
||||
HighlightCandidate.key_moment_id.in_(moment_ids)
|
||||
)
|
||||
)).scalars().all()
|
||||
else:
|
||||
highlights = []
|
||||
highlights_data = [_row_to_dict(h) for h in highlights]
|
||||
highlight_ids = [h.id for h in highlights]
|
||||
|
||||
# 10. Generated shorts (via highlight candidates)
|
||||
if highlight_ids:
|
||||
shorts = (await db.execute(
|
||||
select(GeneratedShort).where(
|
||||
GeneratedShort.highlight_candidate_id.in_(highlight_ids)
|
||||
)
|
||||
)).scalars().all()
|
||||
else:
|
||||
shorts = []
|
||||
shorts_data = [_row_to_dict(s) for s in shorts]
|
||||
|
||||
# ── Build ZIP archive ────────────────────────────────────────────────
|
||||
|
||||
files_map = {
|
||||
"creators.json": creators_data,
|
||||
"source_videos.json": videos_data,
|
||||
"key_moments.json": moments_data,
|
||||
"technique_pages.json": pages_data,
|
||||
"technique_page_versions.json": versions_data,
|
||||
"related_technique_links.json": links_data,
|
||||
"video_consents.json": consents_data,
|
||||
"consent_audit_log.json": audit_data,
|
||||
"posts.json": posts_data,
|
||||
"post_attachments.json": attachments_data,
|
||||
"highlight_candidates.json": highlights_data,
|
||||
"generated_shorts.json": shorts_data,
|
||||
}
|
||||
|
||||
export_metadata = {
|
||||
"export_timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"creator_id": str(creator_id),
|
||||
"file_count": len(files_map),
|
||||
"note": "Binary attachments (video files, uploaded files) are not included. "
|
||||
"This archive contains metadata and derived content only.",
|
||||
}
|
||||
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
zf.writestr(
|
||||
"export_metadata.json",
|
||||
json.dumps(export_metadata, indent=2, default=str),
|
||||
)
|
||||
for filename, data in files_map.items():
|
||||
zf.writestr(filename, json.dumps(data, indent=2, default=str))
|
||||
|
||||
zip_bytes = buf.getvalue()
|
||||
|
||||
logger.info(
|
||||
"Data export complete for creator %s: %d files, %d bytes",
|
||||
creator_id, len(files_map) + 1, len(zip_bytes),
|
||||
)
|
||||
|
||||
return StreamingResponse(
|
||||
io.BytesIO(zip_bytes),
|
||||
media_type="application/zip",
|
||||
headers={
|
||||
"Content-Disposition": f'attachment; filename="chrysopedia-export-{creator_id}.zip"',
|
||||
},
|
||||
)
|
||||
|
|
|
|||
426
backend/tests/test_export.py
Normal file
426
backend/tests/test_export.py
Normal file
|
|
@ -0,0 +1,426 @@
|
|||
"""Tests for the GDPR-style data export endpoint.
|
||||
|
||||
Standalone ASGI test — mocks the DB session to return canned model
|
||||
instances. Verifies the endpoint returns a valid ZIP containing all
|
||||
expected JSON files with correct structure.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import uuid
|
||||
import zipfile
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, MagicMock, PropertyMock
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
# Ensure backend/ is on sys.path
|
||||
import pathlib
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent))
|
||||
|
||||
from auth import get_current_user # noqa: E402
|
||||
from database import get_session # noqa: E402
|
||||
from main import app # noqa: E402
|
||||
from models import UserRole # noqa: E402
|
||||
|
||||
|
||||
# ── Fixtures ─────────────────────────────────────────────────────────────────
|
||||
|
||||
CREATOR_ID = uuid.uuid4()
|
||||
USER_ID = uuid.uuid4()
|
||||
VIDEO_ID = uuid.uuid4()
|
||||
MOMENT_ID = uuid.uuid4()
|
||||
PAGE_ID = uuid.uuid4()
|
||||
VERSION_ID = uuid.uuid4()
|
||||
LINK_ID = uuid.uuid4()
|
||||
CONSENT_ID = uuid.uuid4()
|
||||
AUDIT_ID = uuid.uuid4()
|
||||
POST_ID = uuid.uuid4()
|
||||
ATTACHMENT_ID = uuid.uuid4()
|
||||
HIGHLIGHT_ID = uuid.uuid4()
|
||||
SHORT_ID = uuid.uuid4()
|
||||
|
||||
|
||||
def _make_mock_user(*, has_creator: bool = True) -> MagicMock:
|
||||
"""Build a mock User with optional creator link."""
|
||||
user = MagicMock()
|
||||
user.id = USER_ID
|
||||
user.email = "test@example.com"
|
||||
user.creator_id = CREATOR_ID if has_creator else None
|
||||
user.role = UserRole.creator
|
||||
return user
|
||||
|
||||
|
||||
def _make_model_row(table_name: str, id_val: uuid.UUID, extra: dict[str, Any] | None = None) -> MagicMock:
|
||||
"""Build a mock SQLAlchemy model row with a __table__.columns interface."""
|
||||
row = MagicMock()
|
||||
row.id = id_val
|
||||
|
||||
# Base columns every entity has
|
||||
base = {
|
||||
"id": id_val,
|
||||
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
|
||||
}
|
||||
if extra:
|
||||
base.update(extra)
|
||||
|
||||
# Build mock __table__.columns
|
||||
columns = []
|
||||
for key, val in base.items():
|
||||
col = MagicMock()
|
||||
col.key = key
|
||||
columns.append(col)
|
||||
setattr(row, key, val)
|
||||
|
||||
row.__table__ = MagicMock()
|
||||
row.__table__.columns = columns
|
||||
|
||||
return row
|
||||
|
||||
|
||||
def _make_creator_row():
|
||||
return _make_model_row("creators", CREATOR_ID, {
|
||||
"name": "Test Creator",
|
||||
"slug": "test-creator",
|
||||
"folder_name": "test_creator",
|
||||
})
|
||||
|
||||
|
||||
def _make_video_row():
|
||||
return _make_model_row("source_videos", VIDEO_ID, {
|
||||
"creator_id": CREATOR_ID,
|
||||
"filename": "test.mp4",
|
||||
"processing_status": "complete",
|
||||
})
|
||||
|
||||
|
||||
def _make_moment_row():
|
||||
return _make_model_row("key_moments", MOMENT_ID, {
|
||||
"source_video_id": VIDEO_ID,
|
||||
"title": "Test Moment",
|
||||
"summary": "A test moment",
|
||||
})
|
||||
|
||||
|
||||
def _make_page_row():
|
||||
return _make_model_row("technique_pages", PAGE_ID, {
|
||||
"creator_id": CREATOR_ID,
|
||||
"title": "Test Page",
|
||||
"slug": "test-page",
|
||||
})
|
||||
|
||||
|
||||
def _make_version_row():
|
||||
return _make_model_row("technique_page_versions", VERSION_ID, {
|
||||
"technique_page_id": PAGE_ID,
|
||||
"version_number": 1,
|
||||
"content_snapshot": {"title": "v1"},
|
||||
})
|
||||
|
||||
|
||||
def _make_link_row():
|
||||
return _make_model_row("related_technique_links", LINK_ID, {
|
||||
"source_page_id": PAGE_ID,
|
||||
"target_page_id": uuid.uuid4(),
|
||||
"relationship": "general_cross_reference",
|
||||
})
|
||||
|
||||
|
||||
def _make_consent_row():
|
||||
return _make_model_row("video_consents", CONSENT_ID, {
|
||||
"source_video_id": VIDEO_ID,
|
||||
"creator_id": CREATOR_ID,
|
||||
"kb_inclusion": True,
|
||||
})
|
||||
|
||||
|
||||
def _make_audit_row():
|
||||
return _make_model_row("consent_audit_log", AUDIT_ID, {
|
||||
"video_consent_id": CONSENT_ID,
|
||||
"version": 1,
|
||||
"field_name": "kb_inclusion",
|
||||
"old_value": False,
|
||||
"new_value": True,
|
||||
})
|
||||
|
||||
|
||||
def _make_post_row():
|
||||
return _make_model_row("posts", POST_ID, {
|
||||
"creator_id": CREATOR_ID,
|
||||
"title": "Test Post",
|
||||
"body_json": {"blocks": []},
|
||||
})
|
||||
|
||||
|
||||
def _make_attachment_row():
|
||||
return _make_model_row("post_attachments", ATTACHMENT_ID, {
|
||||
"post_id": POST_ID,
|
||||
"filename": "file.pdf",
|
||||
"object_key": "posts/file.pdf",
|
||||
"content_type": "application/pdf",
|
||||
"size_bytes": 1024,
|
||||
})
|
||||
|
||||
|
||||
def _make_highlight_row():
|
||||
return _make_model_row("highlight_candidates", HIGHLIGHT_ID, {
|
||||
"key_moment_id": MOMENT_ID,
|
||||
"source_video_id": VIDEO_ID,
|
||||
"score": 0.85,
|
||||
"duration_secs": 30.0,
|
||||
"status": "candidate",
|
||||
})
|
||||
|
||||
|
||||
def _make_short_row():
|
||||
return _make_model_row("generated_shorts", SHORT_ID, {
|
||||
"highlight_candidate_id": HIGHLIGHT_ID,
|
||||
"format_preset": "vertical",
|
||||
"width": 1080,
|
||||
"height": 1920,
|
||||
"status": "complete",
|
||||
})
|
||||
|
||||
|
||||
def _setup_db_responses(mock_session: AsyncMock) -> None:
|
||||
"""Configure the mock DB session to return canned data for each query."""
|
||||
creator_row = _make_creator_row()
|
||||
video_row = _make_video_row()
|
||||
moment_row = _make_moment_row()
|
||||
page_row = _make_page_row()
|
||||
version_row = _make_version_row()
|
||||
link_row = _make_link_row()
|
||||
consent_row = _make_consent_row()
|
||||
audit_row = _make_audit_row()
|
||||
post_row = _make_post_row()
|
||||
attachment_row = _make_attachment_row()
|
||||
highlight_row = _make_highlight_row()
|
||||
short_row = _make_short_row()
|
||||
|
||||
call_count = 0
|
||||
|
||||
def _make_execute_result(scalar_one=None, scalars_all=None):
|
||||
result = MagicMock()
|
||||
if scalar_one is not None:
|
||||
result.scalar_one_or_none.return_value = scalar_one
|
||||
if scalars_all is not None:
|
||||
result.scalars.return_value.all.return_value = scalars_all
|
||||
return result
|
||||
|
||||
# The export endpoint issues queries in order:
|
||||
# 1. Creator (scalar_one_or_none)
|
||||
# 2. SourceVideo (scalars.all)
|
||||
# 3. KeyMoment (scalars.all)
|
||||
# 4. TechniquePage (scalars.all)
|
||||
# 5. TechniquePageVersion (scalars.all)
|
||||
# 6. RelatedTechniqueLink (scalars.all)
|
||||
# 7. VideoConsent (scalars.all)
|
||||
# 8. ConsentAuditLog (scalars.all)
|
||||
# 9. Post (scalars.all)
|
||||
# 10. PostAttachment (scalars.all)
|
||||
# 11. HighlightCandidate (scalars.all)
|
||||
# 12. GeneratedShort (scalars.all)
|
||||
|
||||
responses = [
|
||||
_make_execute_result(scalar_one=creator_row), # Creator
|
||||
_make_execute_result(scalars_all=[video_row]), # SourceVideo
|
||||
_make_execute_result(scalars_all=[moment_row]), # KeyMoment
|
||||
_make_execute_result(scalars_all=[page_row]), # TechniquePage
|
||||
_make_execute_result(scalars_all=[version_row]), # TechniquePageVersion
|
||||
_make_execute_result(scalars_all=[link_row]), # RelatedTechniqueLink
|
||||
_make_execute_result(scalars_all=[consent_row]), # VideoConsent
|
||||
_make_execute_result(scalars_all=[audit_row]), # ConsentAuditLog
|
||||
_make_execute_result(scalars_all=[post_row]), # Post
|
||||
_make_execute_result(scalars_all=[attachment_row]), # PostAttachment
|
||||
_make_execute_result(scalars_all=[highlight_row]), # HighlightCandidate
|
||||
_make_execute_result(scalars_all=[short_row]), # GeneratedShort
|
||||
]
|
||||
|
||||
async def _execute_side_effect(*args, **kwargs):
|
||||
nonlocal call_count
|
||||
idx = min(call_count, len(responses) - 1)
|
||||
call_count += 1
|
||||
return responses[idx]
|
||||
|
||||
mock_session.execute = AsyncMock(side_effect=_execute_side_effect)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture()
|
||||
async def export_client():
|
||||
"""Async HTTP test client with mocked auth and DB session."""
|
||||
mock_user = _make_mock_user(has_creator=True)
|
||||
mock_session = AsyncMock()
|
||||
_setup_db_responses(mock_session)
|
||||
|
||||
async def _mock_get_session():
|
||||
yield mock_session
|
||||
|
||||
app.dependency_overrides[get_session] = _mock_get_session
|
||||
app.dependency_overrides[get_current_user] = lambda: mock_user
|
||||
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://testserver/api/v1") as ac:
|
||||
yield ac
|
||||
|
||||
app.dependency_overrides.pop(get_session, None)
|
||||
app.dependency_overrides.pop(get_current_user, None)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture()
|
||||
async def no_creator_client():
|
||||
"""Client where the user has no linked creator profile."""
|
||||
mock_user = _make_mock_user(has_creator=False)
|
||||
|
||||
async def _mock_get_session():
|
||||
yield AsyncMock()
|
||||
|
||||
app.dependency_overrides[get_session] = _mock_get_session
|
||||
app.dependency_overrides[get_current_user] = lambda: mock_user
|
||||
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://testserver/api/v1") as ac:
|
||||
yield ac
|
||||
|
||||
app.dependency_overrides.pop(get_session, None)
|
||||
app.dependency_overrides.pop(get_current_user, None)
|
||||
|
||||
|
||||
# ── Tests ────────────────────────────────────────────────────────────────────
|
||||
|
||||
EXPECTED_JSON_FILES = {
|
||||
"export_metadata.json",
|
||||
"creators.json",
|
||||
"source_videos.json",
|
||||
"key_moments.json",
|
||||
"technique_pages.json",
|
||||
"technique_page_versions.json",
|
||||
"related_technique_links.json",
|
||||
"video_consents.json",
|
||||
"consent_audit_log.json",
|
||||
"posts.json",
|
||||
"post_attachments.json",
|
||||
"highlight_candidates.json",
|
||||
"generated_shorts.json",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_returns_valid_zip(export_client: AsyncClient):
|
||||
"""Endpoint returns a ZIP containing all expected JSON files."""
|
||||
resp = await export_client.get("/creator/export")
|
||||
assert resp.status_code == 200
|
||||
assert resp.headers["content-type"] == "application/zip"
|
||||
assert "content-disposition" in resp.headers
|
||||
assert "chrysopedia-export-" in resp.headers["content-disposition"]
|
||||
|
||||
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
||||
names = set(zf.namelist())
|
||||
assert names == EXPECTED_JSON_FILES
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_json_files_are_valid(export_client: AsyncClient):
|
||||
"""Each JSON file in the ZIP is valid JSON with a list at the top level."""
|
||||
resp = await export_client.get("/creator/export")
|
||||
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
||||
|
||||
for name in zf.namelist():
|
||||
data = json.loads(zf.read(name))
|
||||
if name == "export_metadata.json":
|
||||
# Metadata is a dict, not a list
|
||||
assert isinstance(data, dict)
|
||||
assert "export_timestamp" in data
|
||||
assert "creator_id" in data
|
||||
assert data["creator_id"] == str(CREATOR_ID)
|
||||
else:
|
||||
assert isinstance(data, list), f"{name} should be a list"
|
||||
assert len(data) >= 1, f"{name} should have at least one entry"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_creators_json_content(export_client: AsyncClient):
|
||||
"""Creators JSON file contains the expected creator data."""
|
||||
resp = await export_client.get("/creator/export")
|
||||
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
||||
creators = json.loads(zf.read("creators.json"))
|
||||
assert len(creators) == 1
|
||||
assert creators[0]["name"] == "Test Creator"
|
||||
assert creators[0]["slug"] == "test-creator"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_uuids_serialize_as_strings(export_client: AsyncClient):
|
||||
"""UUIDs in the JSON output are serialized as strings, not crashing."""
|
||||
resp = await export_client.get("/creator/export")
|
||||
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
||||
|
||||
creators = json.loads(zf.read("creators.json"))
|
||||
# ID should be a string representation of UUID
|
||||
creator_id_str = creators[0]["id"]
|
||||
assert isinstance(creator_id_str, str)
|
||||
uuid.UUID(creator_id_str) # Should not raise
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_datetimes_serialize(export_client: AsyncClient):
|
||||
"""Datetimes serialize correctly as ISO strings."""
|
||||
resp = await export_client.get("/creator/export")
|
||||
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
||||
|
||||
creators = json.loads(zf.read("creators.json"))
|
||||
created_at = creators[0]["created_at"]
|
||||
assert isinstance(created_at, str)
|
||||
assert "2025" in created_at
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_related_links_include_cross_references(export_client: AsyncClient):
|
||||
"""Related technique links file includes cross-creator references."""
|
||||
resp = await export_client.get("/creator/export")
|
||||
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
||||
|
||||
links = json.loads(zf.read("related_technique_links.json"))
|
||||
assert len(links) >= 1
|
||||
link = links[0]
|
||||
assert "source_page_id" in link
|
||||
assert "target_page_id" in link
|
||||
assert "relationship" in link
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_metadata_has_required_fields(export_client: AsyncClient):
|
||||
"""export_metadata.json has timestamp, creator_id, and note."""
|
||||
resp = await export_client.get("/creator/export")
|
||||
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
||||
|
||||
meta = json.loads(zf.read("export_metadata.json"))
|
||||
assert "export_timestamp" in meta
|
||||
assert "creator_id" in meta
|
||||
assert "note" in meta
|
||||
assert "file_count" in meta
|
||||
assert meta["file_count"] == 12 # 12 data files
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_requires_creator_link(no_creator_client: AsyncClient):
|
||||
"""404 when the user has no linked creator profile."""
|
||||
resp = await no_creator_client.get("/creator/export")
|
||||
assert resp.status_code == 404
|
||||
assert "No creator profile" in resp.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_export_requires_auth():
|
||||
"""401 when no auth token is provided (default dependency, no override)."""
|
||||
# Use a fresh app without dependency overrides
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
|
||||
resp = await ac.get("/api/v1/creator/export")
|
||||
assert resp.status_code in (401, 403)
|
||||
Loading…
Add table
Reference in a new issue