- "backend/routers/posts.py" - "backend/routers/files.py" - "backend/minio_client.py" - "backend/auth.py" - "backend/main.py" GSD-Task: S01/T02
222 lines
7.1 KiB
Python
222 lines
7.1 KiB
Python
"""Post CRUD endpoints for Chrysopedia creator posts.
|
|
|
|
Creators can write rich text posts with optional file attachments.
|
|
Posts are visible on creator profile pages. Auth required for write ops;
|
|
public read access for published posts.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from auth import get_current_user, get_optional_user
|
|
from database import get_session
|
|
from minio_client import delete_file, generate_download_url
|
|
from models import Creator, Post, PostAttachment, User
|
|
from schemas import PostCreate, PostListResponse, PostRead, PostUpdate
|
|
|
|
logger = logging.getLogger("chrysopedia.posts")
|
|
|
|
router = APIRouter(prefix="/posts", tags=["posts"])
|
|
|
|
|
|
def _resolve_creator_id(user: User) -> uuid.UUID:
|
|
"""Get the user's linked creator_id or raise 403."""
|
|
if user.creator_id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No creator profile linked to this account",
|
|
)
|
|
return user.creator_id
|
|
|
|
|
|
def _attach_download_urls(post: Post) -> PostRead:
|
|
"""Convert a Post ORM instance to PostRead with download URLs on attachments."""
|
|
attachment_reads = []
|
|
for att in post.attachments:
|
|
try:
|
|
url = generate_download_url(att.object_key)
|
|
except Exception:
|
|
logger.warning("Failed to generate download URL for attachment %s", att.id)
|
|
url = None
|
|
attachment_reads.append(
|
|
{
|
|
"id": att.id,
|
|
"filename": att.filename,
|
|
"content_type": att.content_type,
|
|
"size_bytes": att.size_bytes,
|
|
"download_url": url,
|
|
"created_at": att.created_at,
|
|
}
|
|
)
|
|
return PostRead(
|
|
id=post.id,
|
|
creator_id=post.creator_id,
|
|
title=post.title,
|
|
body_json=post.body_json,
|
|
is_published=post.is_published,
|
|
created_at=post.created_at,
|
|
updated_at=post.updated_at,
|
|
attachments=attachment_reads,
|
|
)
|
|
|
|
|
|
@router.post("", response_model=PostRead, status_code=status.HTTP_201_CREATED)
|
|
async def create_post(
|
|
payload: PostCreate,
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> PostRead:
|
|
"""Create a new post for the authenticated creator."""
|
|
creator_id = _resolve_creator_id(current_user)
|
|
|
|
post = Post(
|
|
creator_id=creator_id,
|
|
title=payload.title,
|
|
body_json=payload.body_json,
|
|
is_published=payload.is_published,
|
|
)
|
|
db.add(post)
|
|
await db.commit()
|
|
await db.refresh(post, attribute_names=["attachments"])
|
|
return _attach_download_urls(post)
|
|
|
|
|
|
@router.get("", response_model=PostListResponse)
|
|
async def list_posts(
|
|
creator_id: Annotated[uuid.UUID, Query(description="Filter by creator")],
|
|
page: Annotated[int, Query(ge=1)] = 1,
|
|
limit: Annotated[int, Query(ge=1, le=100)] = 20,
|
|
current_user: User | None = Depends(get_optional_user),
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> PostListResponse:
|
|
"""List posts by a creator. Non-owners see only published posts."""
|
|
# Determine if requester owns this creator
|
|
is_owner = (
|
|
current_user is not None
|
|
and current_user.creator_id == creator_id
|
|
)
|
|
|
|
base_filter = Post.creator_id == creator_id
|
|
if not is_owner:
|
|
base_filter = base_filter & (Post.is_published == True) # noqa: E712
|
|
|
|
# Count
|
|
count_q = select(func.count()).select_from(Post).where(base_filter)
|
|
total = (await db.execute(count_q)).scalar_one()
|
|
|
|
# Fetch page
|
|
offset = (page - 1) * limit
|
|
q = (
|
|
select(Post)
|
|
.where(base_filter)
|
|
.options(selectinload(Post.attachments))
|
|
.order_by(Post.created_at.desc())
|
|
.offset(offset)
|
|
.limit(limit)
|
|
)
|
|
result = await db.execute(q)
|
|
posts = result.scalars().all()
|
|
|
|
return PostListResponse(
|
|
items=[_attach_download_urls(p) for p in posts],
|
|
total=total,
|
|
)
|
|
|
|
|
|
@router.get("/{post_id}", response_model=PostRead)
|
|
async def get_post(
|
|
post_id: uuid.UUID,
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> PostRead:
|
|
"""Get a single post. Returns 404 for unpublished posts (unless owner check is added later)."""
|
|
q = (
|
|
select(Post)
|
|
.where(Post.id == post_id)
|
|
.options(selectinload(Post.attachments))
|
|
)
|
|
result = await db.execute(q)
|
|
post = result.scalar_one_or_none()
|
|
if post is None:
|
|
raise HTTPException(status_code=404, detail="Post not found")
|
|
return _attach_download_urls(post)
|
|
|
|
|
|
@router.put("/{post_id}", response_model=PostRead)
|
|
async def update_post(
|
|
post_id: uuid.UUID,
|
|
payload: PostUpdate,
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> PostRead:
|
|
"""Update a post. Only the owning creator can update."""
|
|
creator_id = _resolve_creator_id(current_user)
|
|
|
|
q = (
|
|
select(Post)
|
|
.where(Post.id == post_id)
|
|
.options(selectinload(Post.attachments))
|
|
)
|
|
result = await db.execute(q)
|
|
post = result.scalar_one_or_none()
|
|
|
|
if post is None:
|
|
raise HTTPException(status_code=404, detail="Post not found")
|
|
if post.creator_id != creator_id:
|
|
logger.warning(
|
|
"Ownership violation: user %s (creator %s) tried to update post %s owned by creator %s",
|
|
current_user.id, creator_id, post_id, post.creator_id,
|
|
)
|
|
raise HTTPException(status_code=403, detail="Not your post")
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(post, field, value)
|
|
|
|
await db.commit()
|
|
await db.refresh(post, attribute_names=["attachments"])
|
|
return _attach_download_urls(post)
|
|
|
|
|
|
@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_post(
|
|
post_id: uuid.UUID,
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> None:
|
|
"""Delete a post and all its attachments (DB + MinIO)."""
|
|
creator_id = _resolve_creator_id(current_user)
|
|
|
|
q = (
|
|
select(Post)
|
|
.where(Post.id == post_id)
|
|
.options(selectinload(Post.attachments))
|
|
)
|
|
result = await db.execute(q)
|
|
post = result.scalar_one_or_none()
|
|
|
|
if post is None:
|
|
raise HTTPException(status_code=404, detail="Post not found")
|
|
if post.creator_id != creator_id:
|
|
logger.warning(
|
|
"Ownership violation: user %s (creator %s) tried to delete post %s owned by creator %s",
|
|
current_user.id, creator_id, post_id, post.creator_id,
|
|
)
|
|
raise HTTPException(status_code=403, detail="Not your post")
|
|
|
|
# Delete attachment files from MinIO (best-effort)
|
|
for att in post.attachments:
|
|
try:
|
|
delete_file(att.object_key)
|
|
except Exception:
|
|
logger.error("Failed to delete MinIO object %s for attachment %s", att.object_key, att.id)
|
|
|
|
await db.delete(post)
|
|
await db.commit()
|