chrysopedia/backend/routers/posts.py
jlightner cc60852ac9 feat: Built post CRUD and file upload/download API routers with auth, o…
- "backend/routers/posts.py"
- "backend/routers/files.py"
- "backend/minio_client.py"
- "backend/auth.py"
- "backend/main.py"

GSD-Task: S01/T02
2026-04-04 09:07:35 +00:00

222 lines
7.1 KiB
Python

"""Post CRUD endpoints for Chrysopedia creator posts.
Creators can write rich text posts with optional file attachments.
Posts are visible on creator profile pages. Auth required for write ops;
public read access for published posts.
"""
from __future__ import annotations
import logging
import uuid
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from auth import get_current_user, get_optional_user
from database import get_session
from minio_client import delete_file, generate_download_url
from models import Creator, Post, PostAttachment, User
from schemas import PostCreate, PostListResponse, PostRead, PostUpdate
logger = logging.getLogger("chrysopedia.posts")
router = APIRouter(prefix="/posts", tags=["posts"])
def _resolve_creator_id(user: User) -> uuid.UUID:
"""Get the user's linked creator_id or raise 403."""
if user.creator_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No creator profile linked to this account",
)
return user.creator_id
def _attach_download_urls(post: Post) -> PostRead:
"""Convert a Post ORM instance to PostRead with download URLs on attachments."""
attachment_reads = []
for att in post.attachments:
try:
url = generate_download_url(att.object_key)
except Exception:
logger.warning("Failed to generate download URL for attachment %s", att.id)
url = None
attachment_reads.append(
{
"id": att.id,
"filename": att.filename,
"content_type": att.content_type,
"size_bytes": att.size_bytes,
"download_url": url,
"created_at": att.created_at,
}
)
return PostRead(
id=post.id,
creator_id=post.creator_id,
title=post.title,
body_json=post.body_json,
is_published=post.is_published,
created_at=post.created_at,
updated_at=post.updated_at,
attachments=attachment_reads,
)
@router.post("", response_model=PostRead, status_code=status.HTTP_201_CREATED)
async def create_post(
payload: PostCreate,
current_user: Annotated[User, Depends(get_current_user)],
db: AsyncSession = Depends(get_session),
) -> PostRead:
"""Create a new post for the authenticated creator."""
creator_id = _resolve_creator_id(current_user)
post = Post(
creator_id=creator_id,
title=payload.title,
body_json=payload.body_json,
is_published=payload.is_published,
)
db.add(post)
await db.commit()
await db.refresh(post, attribute_names=["attachments"])
return _attach_download_urls(post)
@router.get("", response_model=PostListResponse)
async def list_posts(
creator_id: Annotated[uuid.UUID, Query(description="Filter by creator")],
page: Annotated[int, Query(ge=1)] = 1,
limit: Annotated[int, Query(ge=1, le=100)] = 20,
current_user: User | None = Depends(get_optional_user),
db: AsyncSession = Depends(get_session),
) -> PostListResponse:
"""List posts by a creator. Non-owners see only published posts."""
# Determine if requester owns this creator
is_owner = (
current_user is not None
and current_user.creator_id == creator_id
)
base_filter = Post.creator_id == creator_id
if not is_owner:
base_filter = base_filter & (Post.is_published == True) # noqa: E712
# Count
count_q = select(func.count()).select_from(Post).where(base_filter)
total = (await db.execute(count_q)).scalar_one()
# Fetch page
offset = (page - 1) * limit
q = (
select(Post)
.where(base_filter)
.options(selectinload(Post.attachments))
.order_by(Post.created_at.desc())
.offset(offset)
.limit(limit)
)
result = await db.execute(q)
posts = result.scalars().all()
return PostListResponse(
items=[_attach_download_urls(p) for p in posts],
total=total,
)
@router.get("/{post_id}", response_model=PostRead)
async def get_post(
post_id: uuid.UUID,
db: AsyncSession = Depends(get_session),
) -> PostRead:
"""Get a single post. Returns 404 for unpublished posts (unless owner check is added later)."""
q = (
select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.attachments))
)
result = await db.execute(q)
post = result.scalar_one_or_none()
if post is None:
raise HTTPException(status_code=404, detail="Post not found")
return _attach_download_urls(post)
@router.put("/{post_id}", response_model=PostRead)
async def update_post(
post_id: uuid.UUID,
payload: PostUpdate,
current_user: Annotated[User, Depends(get_current_user)],
db: AsyncSession = Depends(get_session),
) -> PostRead:
"""Update a post. Only the owning creator can update."""
creator_id = _resolve_creator_id(current_user)
q = (
select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.attachments))
)
result = await db.execute(q)
post = result.scalar_one_or_none()
if post is None:
raise HTTPException(status_code=404, detail="Post not found")
if post.creator_id != creator_id:
logger.warning(
"Ownership violation: user %s (creator %s) tried to update post %s owned by creator %s",
current_user.id, creator_id, post_id, post.creator_id,
)
raise HTTPException(status_code=403, detail="Not your post")
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(post, field, value)
await db.commit()
await db.refresh(post, attribute_names=["attachments"])
return _attach_download_urls(post)
@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
post_id: uuid.UUID,
current_user: Annotated[User, Depends(get_current_user)],
db: AsyncSession = Depends(get_session),
) -> None:
"""Delete a post and all its attachments (DB + MinIO)."""
creator_id = _resolve_creator_id(current_user)
q = (
select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.attachments))
)
result = await db.execute(q)
post = result.scalar_one_or_none()
if post is None:
raise HTTPException(status_code=404, detail="Post not found")
if post.creator_id != creator_id:
logger.warning(
"Ownership violation: user %s (creator %s) tried to delete post %s owned by creator %s",
current_user.id, creator_id, post_id, post.creator_id,
)
raise HTTPException(status_code=403, detail="Not your post")
# Delete attachment files from MinIO (best-effort)
for att in post.attachments:
try:
delete_file(att.object_key)
except Exception:
logger.error("Failed to delete MinIO object %s for attachment %s", att.object_key, att.id)
await db.delete(post)
await db.commit()