chrysopedia/backend/routers/creator_dashboard.py
jlightner 8b2876906c chore: Added GET /creator/export endpoint that returns a ZIP archive co…
- "backend/routers/creator_dashboard.py"
- "backend/tests/test_export.py"

GSD-Task: S07/T01
2026-04-04 14:16:56 +00:00

543 lines
19 KiB
Python

"""Creator dashboard endpoint — authenticated analytics for a linked creator.
Returns aggregate counts (videos, technique pages, key moments, search
impressions) and content lists for the logged-in creator's dashboard.
Includes a GDPR-style data export endpoint.
"""
import io
import json
import logging
import zipfile
from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from sqlalchemy import func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from auth import get_current_user
from database import get_session
from models import (
ConsentAuditLog,
Creator,
GeneratedShort,
HighlightCandidate,
KeyMoment,
Post,
PostAttachment,
RelatedTechniqueLink,
SearchLog,
SourceVideo,
TechniquePage,
TechniquePageVersion,
User,
VideoConsent,
)
from schemas import (
CreatorDashboardResponse,
CreatorDashboardTechnique,
CreatorDashboardVideo,
CreatorTransparencyResponse,
TransparencyKeyMoment,
TransparencyRelationship,
TransparencySourceVideo,
TransparencyTechnique,
)
logger = logging.getLogger("chrysopedia.creator_dashboard")
router = APIRouter(prefix="/creator", tags=["creator-dashboard"])
@router.get("/dashboard", response_model=CreatorDashboardResponse)
async def get_creator_dashboard(
current_user: Annotated[User, Depends(get_current_user)],
db: AsyncSession = Depends(get_session),
) -> CreatorDashboardResponse:
"""Return dashboard analytics for the authenticated creator.
Requires the user to have a linked creator_id. Returns 404 if the
user has no linked creator profile.
"""
if current_user.creator_id is None:
raise HTTPException(
status_code=404,
detail="No creator profile linked to this account",
)
creator_id = current_user.creator_id
# Verify creator exists (defensive — FK should guarantee this)
creator = (await db.execute(
select(Creator).where(Creator.id == creator_id)
)).scalar_one_or_none()
if creator is None:
logger.error("User %s has creator_id %s but creator row missing", current_user.id, creator_id)
raise HTTPException(
status_code=404,
detail="Linked creator profile not found",
)
# ── Aggregate counts ─────────────────────────────────────────────────
video_count = (await db.execute(
select(func.count()).select_from(SourceVideo)
.where(SourceVideo.creator_id == creator_id)
)).scalar() or 0
technique_count = (await db.execute(
select(func.count()).select_from(TechniquePage)
.where(TechniquePage.creator_id == creator_id)
)).scalar() or 0
key_moment_count = (await db.execute(
select(func.count()).select_from(KeyMoment)
.join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id)
.where(SourceVideo.creator_id == creator_id)
)).scalar() or 0
# Search impressions: count distinct search_log rows where the query
# exactly matches (case-insensitive) any of this creator's technique titles.
search_impressions = (await db.execute(
select(func.count(func.distinct(SearchLog.id)))
.where(
select(TechniquePage.id)
.where(
TechniquePage.creator_id == creator_id,
func.lower(SearchLog.query) == func.lower(TechniquePage.title),
)
.correlate(SearchLog)
.exists()
)
)).scalar() or 0
# ── Content lists ────────────────────────────────────────────────────
# Techniques with per-page key moment count
km_count_sq = (
select(func.count(KeyMoment.id))
.where(KeyMoment.technique_page_id == TechniquePage.id)
.correlate(TechniquePage)
.scalar_subquery()
.label("key_moment_count")
)
technique_rows = (await db.execute(
select(
TechniquePage.title,
TechniquePage.slug,
TechniquePage.topic_category,
TechniquePage.created_at,
km_count_sq,
)
.where(TechniquePage.creator_id == creator_id)
.order_by(TechniquePage.created_at.desc())
)).all()
techniques = [
CreatorDashboardTechnique(
title=r.title,
slug=r.slug,
topic_category=r.topic_category,
created_at=r.created_at,
key_moment_count=r.key_moment_count or 0,
)
for r in technique_rows
]
# Videos
video_rows = (await db.execute(
select(
SourceVideo.filename,
SourceVideo.processing_status,
SourceVideo.created_at,
)
.where(SourceVideo.creator_id == creator_id)
.order_by(SourceVideo.created_at.desc())
)).all()
videos = [
CreatorDashboardVideo(
filename=r.filename,
processing_status=r.processing_status.value if hasattr(r.processing_status, 'value') else str(r.processing_status),
created_at=r.created_at,
)
for r in video_rows
]
logger.info(
"Dashboard loaded for creator %s: %d videos, %d techniques, %d moments, %d impressions",
creator_id, video_count, technique_count, key_moment_count, search_impressions,
)
return CreatorDashboardResponse(
video_count=video_count,
technique_count=technique_count,
key_moment_count=key_moment_count,
search_impressions=search_impressions,
techniques=techniques,
videos=videos,
)
@router.get("/transparency", response_model=CreatorTransparencyResponse)
async def get_creator_transparency(
current_user: Annotated[User, Depends(get_current_user)],
db: AsyncSession = Depends(get_session),
) -> CreatorTransparencyResponse:
"""Return all entities derived from the authenticated creator's content.
Shows technique pages, key moments, cross-reference relationships,
source videos, and aggregated topic tags — everything the AI pipeline
produced from this creator's uploads.
"""
if current_user.creator_id is None:
raise HTTPException(
status_code=404,
detail="No creator profile linked to this account",
)
creator_id = current_user.creator_id
# Verify creator exists
creator = (await db.execute(
select(Creator).where(Creator.id == creator_id)
)).scalar_one_or_none()
if creator is None:
logger.error("User %s has creator_id %s but creator row missing", current_user.id, creator_id)
raise HTTPException(
status_code=404,
detail="Linked creator profile not found",
)
# ── Technique pages with key moment counts ───────────────────────────
technique_pages = (await db.execute(
select(TechniquePage)
.where(TechniquePage.creator_id == creator_id)
.options(
selectinload(TechniquePage.key_moments).selectinload(KeyMoment.source_video),
selectinload(TechniquePage.outgoing_links).selectinload(RelatedTechniqueLink.target_page),
selectinload(TechniquePage.incoming_links).selectinload(RelatedTechniqueLink.source_page),
)
.order_by(TechniquePage.created_at.desc())
)).scalars().all()
techniques = []
all_key_moments: list[TransparencyKeyMoment] = []
all_relationships: list[TransparencyRelationship] = []
all_tags: set[str] = set()
for tp in technique_pages:
techniques.append(TransparencyTechnique(
title=tp.title,
slug=tp.slug,
topic_category=tp.topic_category,
topic_tags=tp.topic_tags or [],
summary=(tp.summary or "")[:200],
created_at=tp.created_at,
key_moment_count=len(tp.key_moments),
))
# Collect tags
if tp.topic_tags:
all_tags.update(tp.topic_tags)
# Key moments from this technique page
for km in tp.key_moments:
all_key_moments.append(TransparencyKeyMoment(
title=km.title,
summary=km.summary,
content_type=km.content_type.value if hasattr(km.content_type, 'value') else str(km.content_type),
start_time=km.start_time,
end_time=km.end_time,
source_video_filename=km.source_video.filename if km.source_video else "",
technique_page_title=tp.title,
))
# Outgoing relationships
for link in tp.outgoing_links:
all_relationships.append(TransparencyRelationship(
relationship_type=link.relationship.value if hasattr(link.relationship, 'value') else str(link.relationship),
source_page_title=tp.title,
source_page_slug=tp.slug,
target_page_title=link.target_page.title if link.target_page else "",
target_page_slug=link.target_page.slug if link.target_page else "",
))
# Incoming relationships
for link in tp.incoming_links:
all_relationships.append(TransparencyRelationship(
relationship_type=link.relationship.value if hasattr(link.relationship, 'value') else str(link.relationship),
source_page_title=link.source_page.title if link.source_page else "",
source_page_slug=link.source_page.slug if link.source_page else "",
target_page_title=tp.title,
target_page_slug=tp.slug,
))
# ── Key moments not linked to a technique page ───────────────────────
# (moments from creator's videos that haven't been assigned to a page)
unlinked_moments = (await db.execute(
select(KeyMoment)
.join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id)
.where(
SourceVideo.creator_id == creator_id,
KeyMoment.technique_page_id.is_(None),
)
.options(selectinload(KeyMoment.source_video))
)).scalars().all()
for km in unlinked_moments:
all_key_moments.append(TransparencyKeyMoment(
title=km.title,
summary=km.summary,
content_type=km.content_type.value if hasattr(km.content_type, 'value') else str(km.content_type),
start_time=km.start_time,
end_time=km.end_time,
source_video_filename=km.source_video.filename if km.source_video else "",
technique_page_title=None,
))
# ── Source videos ────────────────────────────────────────────────────
video_rows = (await db.execute(
select(SourceVideo)
.where(SourceVideo.creator_id == creator_id)
.order_by(SourceVideo.created_at.desc())
)).scalars().all()
source_videos = [
TransparencySourceVideo(
filename=v.filename,
processing_status=v.processing_status.value if hasattr(v.processing_status, 'value') else str(v.processing_status),
created_at=v.created_at,
)
for v in video_rows
]
logger.info(
"Transparency loaded for creator %s: %d techniques, %d moments, %d relationships, %d videos, %d tags",
creator_id, len(techniques), len(all_key_moments),
len(all_relationships), len(source_videos), len(all_tags),
)
return CreatorTransparencyResponse(
techniques=techniques,
key_moments=all_key_moments,
relationships=all_relationships,
source_videos=source_videos,
tags=sorted(all_tags),
)
# ── Helpers for data export ──────────────────────────────────────────────────
def _row_to_dict(row) -> dict:
"""Convert a SQLAlchemy model instance to a JSON-serialisable dict.
Handles UUIDs and datetimes via default=str on the final JSON dump.
Skips internal SQLAlchemy state attributes.
"""
d = {}
for col in row.__table__.columns:
val = getattr(row, col.key, None)
d[col.key] = val
return d
# ── Data Export (GDPR-style) ─────────────────────────────────────────────────
@router.get("/export")
async def export_creator_data(
current_user: Annotated[User, Depends(get_current_user)],
db: AsyncSession = Depends(get_session),
) -> StreamingResponse:
"""Export all data derived from the authenticated creator's content.
Returns a ZIP archive containing one JSON file per table, plus an
export_metadata.json. Binary attachments (videos, files) are not
included — only metadata and derived content.
"""
if current_user.creator_id is None:
raise HTTPException(
status_code=404,
detail="No creator profile linked to this account",
)
creator_id = current_user.creator_id
# Verify creator exists
creator = (await db.execute(
select(Creator).where(Creator.id == creator_id)
)).scalar_one_or_none()
if creator is None:
logger.error(
"Export: user %s has creator_id %s but creator row missing",
current_user.id, creator_id,
)
raise HTTPException(status_code=404, detail="Linked creator profile not found")
logger.info("Data export started for creator %s", creator_id)
# ── Query all creator-owned tables ───────────────────────────────────
# 1. Creator profile
creators_data = [_row_to_dict(creator)]
# 2. Source videos
videos = (await db.execute(
select(SourceVideo).where(SourceVideo.creator_id == creator_id)
)).scalars().all()
videos_data = [_row_to_dict(v) for v in videos]
video_ids = [v.id for v in videos]
# 3. Key moments (via source videos)
if video_ids:
moments = (await db.execute(
select(KeyMoment).where(KeyMoment.source_video_id.in_(video_ids))
)).scalars().all()
else:
moments = []
moments_data = [_row_to_dict(m) for m in moments]
moment_ids = [m.id for m in moments]
# 4. Technique pages
pages = (await db.execute(
select(TechniquePage).where(TechniquePage.creator_id == creator_id)
)).scalars().all()
pages_data = [_row_to_dict(p) for p in pages]
page_ids = [p.id for p in pages]
# 5. Technique page versions
if page_ids:
versions = (await db.execute(
select(TechniquePageVersion).where(
TechniquePageVersion.technique_page_id.in_(page_ids)
)
)).scalars().all()
else:
versions = []
versions_data = [_row_to_dict(v) for v in versions]
# 6. Related technique links (both directions)
if page_ids:
links = (await db.execute(
select(RelatedTechniqueLink).where(
or_(
RelatedTechniqueLink.source_page_id.in_(page_ids),
RelatedTechniqueLink.target_page_id.in_(page_ids),
)
)
)).scalars().all()
else:
links = []
links_data = [_row_to_dict(lnk) for lnk in links]
# 7. Video consents + audit log
consents = (await db.execute(
select(VideoConsent).where(VideoConsent.creator_id == creator_id)
)).scalars().all()
consents_data = [_row_to_dict(c) for c in consents]
consent_ids = [c.id for c in consents]
if consent_ids:
audit_entries = (await db.execute(
select(ConsentAuditLog).where(
ConsentAuditLog.video_consent_id.in_(consent_ids)
)
)).scalars().all()
else:
audit_entries = []
audit_data = [_row_to_dict(a) for a in audit_entries]
# 8. Posts + post attachments (metadata only)
posts = (await db.execute(
select(Post).where(Post.creator_id == creator_id)
)).scalars().all()
posts_data = [_row_to_dict(p) for p in posts]
post_ids = [p.id for p in posts]
if post_ids:
attachments = (await db.execute(
select(PostAttachment).where(PostAttachment.post_id.in_(post_ids))
)).scalars().all()
else:
attachments = []
attachments_data = [_row_to_dict(a) for a in attachments]
# 9. Highlight candidates (via key moments)
if moment_ids:
highlights = (await db.execute(
select(HighlightCandidate).where(
HighlightCandidate.key_moment_id.in_(moment_ids)
)
)).scalars().all()
else:
highlights = []
highlights_data = [_row_to_dict(h) for h in highlights]
highlight_ids = [h.id for h in highlights]
# 10. Generated shorts (via highlight candidates)
if highlight_ids:
shorts = (await db.execute(
select(GeneratedShort).where(
GeneratedShort.highlight_candidate_id.in_(highlight_ids)
)
)).scalars().all()
else:
shorts = []
shorts_data = [_row_to_dict(s) for s in shorts]
# ── Build ZIP archive ────────────────────────────────────────────────
files_map = {
"creators.json": creators_data,
"source_videos.json": videos_data,
"key_moments.json": moments_data,
"technique_pages.json": pages_data,
"technique_page_versions.json": versions_data,
"related_technique_links.json": links_data,
"video_consents.json": consents_data,
"consent_audit_log.json": audit_data,
"posts.json": posts_data,
"post_attachments.json": attachments_data,
"highlight_candidates.json": highlights_data,
"generated_shorts.json": shorts_data,
}
export_metadata = {
"export_timestamp": datetime.now(timezone.utc).isoformat(),
"creator_id": str(creator_id),
"file_count": len(files_map),
"note": "Binary attachments (video files, uploaded files) are not included. "
"This archive contains metadata and derived content only.",
}
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr(
"export_metadata.json",
json.dumps(export_metadata, indent=2, default=str),
)
for filename, data in files_map.items():
zf.writestr(filename, json.dumps(data, indent=2, default=str))
zip_bytes = buf.getvalue()
logger.info(
"Data export complete for creator %s: %d files, %d bytes",
creator_id, len(files_map) + 1, len(zip_bytes),
)
return StreamingResponse(
io.BytesIO(zip_bytes),
media_type="application/zip",
headers={
"Content-Disposition": f'attachment; filename="chrysopedia-export-{creator_id}.zip"',
},
)