449 lines
16 KiB
Python
449 lines
16 KiB
Python
"""Integration tests for consent endpoints.
|
|
|
|
Covers auth/authz, ownership gating, audit trail, idempotency,
|
|
partial updates, admin access, pagination, and 404 handling.
|
|
"""
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
BASE = "/api/v1/consent"
|
|
|
|
|
|
# ── Auth tests ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unauthenticated_get_returns_401(client):
|
|
"""GET /consent/videos without token → 401."""
|
|
resp = await client.get(f"{BASE}/videos")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unauthenticated_put_returns_401(client):
|
|
"""PUT /consent/videos/{id} without token → 401."""
|
|
resp = await client.put(
|
|
f"{BASE}/videos/{uuid.uuid4()}", json={"kb_inclusion": True}
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_without_creator_id_list_returns_403(client, auth_headers):
|
|
"""User with no creator_id → 403 on GET /consent/videos."""
|
|
resp = await client.get(f"{BASE}/videos", headers=auth_headers)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_without_creator_id_get_single_returns_403(
|
|
client, auth_headers, creator_with_videos
|
|
):
|
|
"""User with no creator_id → 403 on GET /consent/videos/{id}."""
|
|
vid = str(creator_with_videos["video_ids"][0])
|
|
resp = await client.get(f"{BASE}/videos/{vid}", headers=auth_headers)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_without_creator_id_put_returns_403(
|
|
client, auth_headers, creator_with_videos
|
|
):
|
|
"""User with no creator_id → 403 on PUT /consent/videos/{id}."""
|
|
vid = str(creator_with_videos["video_ids"][0])
|
|
resp = await client.put(
|
|
f"{BASE}/videos/{vid}", json={"kb_inclusion": True}, headers=auth_headers
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
# ── Ownership tests ──────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_creator_cannot_read_other_creators_video(
|
|
client, db_engine, creator_user_auth
|
|
):
|
|
"""Creator A can't GET consent for Creator B's video → 403."""
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
from models import Creator, SourceVideo, ContentType, ProcessingStatus
|
|
|
|
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
async with factory() as session:
|
|
other_creator = Creator(
|
|
name="OtherCreator", slug="othercreator", folder_name="Other"
|
|
)
|
|
session.add(other_creator)
|
|
await session.flush()
|
|
other_video = SourceVideo(
|
|
creator_id=other_creator.id,
|
|
filename="other.mp4",
|
|
file_path="Other/other.mp4",
|
|
duration_seconds=100,
|
|
content_type=ContentType.tutorial,
|
|
processing_status=ProcessingStatus.not_started,
|
|
)
|
|
session.add(other_video)
|
|
await session.flush()
|
|
other_vid = str(other_video.id)
|
|
await session.commit()
|
|
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{other_vid}", headers=creator_user_auth["headers"]
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_creator_cannot_update_other_creators_video(
|
|
client, db_engine, creator_user_auth
|
|
):
|
|
"""Creator A can't PUT consent for Creator B's video → 403."""
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
from models import Creator, SourceVideo, ContentType, ProcessingStatus
|
|
|
|
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
async with factory() as session:
|
|
other_creator = Creator(
|
|
name="OtherCreator2", slug="othercreator2", folder_name="Other2"
|
|
)
|
|
session.add(other_creator)
|
|
await session.flush()
|
|
other_video = SourceVideo(
|
|
creator_id=other_creator.id,
|
|
filename="other2.mp4",
|
|
file_path="Other2/other2.mp4",
|
|
duration_seconds=100,
|
|
content_type=ContentType.tutorial,
|
|
processing_status=ProcessingStatus.not_started,
|
|
)
|
|
session.add(other_video)
|
|
await session.flush()
|
|
other_vid = str(other_video.id)
|
|
await session.commit()
|
|
|
|
resp = await client.put(
|
|
f"{BASE}/videos/{other_vid}",
|
|
json={"kb_inclusion": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
# ── Happy path — PUT ─────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_put_creates_consent_and_audit_entries(client, creator_user_auth):
|
|
"""First PUT creates VideoConsent + audit entries with correct flags."""
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
resp = await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True, "training_usage": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["kb_inclusion"] is True
|
|
assert data["training_usage"] is True
|
|
# public_display defaults to True
|
|
assert data["public_display"] is True
|
|
assert data["source_video_id"] == vid
|
|
|
|
|
|
# ── Happy path — GET list ────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_list_returns_creator_videos(client, creator_user_auth):
|
|
"""GET /consent/videos returns consents for the creator's videos."""
|
|
# Create consent for one video first
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
|
|
resp = await client.get(
|
|
f"{BASE}/videos", headers=creator_user_auth["headers"]
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] >= 1
|
|
assert len(data["items"]) >= 1
|
|
# Verify our video is in the results
|
|
video_ids_in_results = [item["source_video_id"] for item in data["items"]]
|
|
assert vid in video_ids_in_results
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_list_pagination(client, creator_user_auth):
|
|
"""GET /consent/videos respects offset/limit pagination."""
|
|
# Create consent for both videos
|
|
for vid in creator_user_auth["video_ids"]:
|
|
await client.put(
|
|
f"{BASE}/videos/{str(vid)}",
|
|
json={"kb_inclusion": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
|
|
# Fetch with limit=1
|
|
resp = await client.get(
|
|
f"{BASE}/videos?limit=1&offset=0",
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 2
|
|
assert len(data["items"]) == 1
|
|
|
|
# Fetch page 2
|
|
resp2 = await client.get(
|
|
f"{BASE}/videos?limit=1&offset=1",
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
assert resp2.status_code == 200
|
|
data2 = resp2.json()
|
|
assert len(data2["items"]) == 1
|
|
# Different video than page 1
|
|
assert data2["items"][0]["source_video_id"] != data["items"][0]["source_video_id"]
|
|
|
|
|
|
# ── Happy path — GET single ──────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_single_returns_defaults_when_no_consent(client, creator_user_auth):
|
|
"""GET /consent/videos/{id} returns defaults when no consent exists."""
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{vid}", headers=creator_user_auth["headers"]
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["kb_inclusion"] is False
|
|
assert data["training_usage"] is False
|
|
assert data["public_display"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_single_returns_updated_consent(client, creator_user_auth):
|
|
"""GET returns the consent values after a PUT."""
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True, "public_display": False},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{vid}", headers=creator_user_auth["headers"]
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["kb_inclusion"] is True
|
|
assert data["training_usage"] is False
|
|
assert data["public_display"] is False
|
|
|
|
|
|
# ── Audit trail ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_audit_trail_records_field_changes(client, creator_user_auth):
|
|
"""PUT changing 2 fields creates 2 audit entries with correct versions."""
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True, "training_usage": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{vid}/history", headers=creator_user_auth["headers"]
|
|
)
|
|
assert resp.status_code == 200
|
|
entries = resp.json()
|
|
assert len(entries) == 2
|
|
# Versions should be incrementing
|
|
versions = [e["version"] for e in entries]
|
|
assert versions == [1, 2]
|
|
# Both entries should have new_value=True
|
|
for entry in entries:
|
|
assert entry["new_value"] is True
|
|
# Fields should cover both changed fields
|
|
field_names = {e["field_name"] for e in entries}
|
|
assert field_names == {"kb_inclusion", "training_usage"}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_audit_trail_ordered_by_version(client, creator_user_auth):
|
|
"""Successive PUTs create audit entries in version order."""
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
# First change
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
# Second change
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": False},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{vid}/history", headers=creator_user_auth["headers"]
|
|
)
|
|
entries = resp.json()
|
|
assert len(entries) == 2
|
|
assert entries[0]["version"] < entries[1]["version"]
|
|
assert entries[0]["new_value"] is True
|
|
assert entries[1]["new_value"] is False
|
|
assert entries[1]["old_value"] is True
|
|
|
|
|
|
# ── Idempotency ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_idempotent_put_creates_no_audit_entries(client, creator_user_auth):
|
|
"""PUT with same values as current state → no new audit entries."""
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
# Set initial values
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
# Same PUT again
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{vid}/history", headers=creator_user_auth["headers"]
|
|
)
|
|
entries = resp.json()
|
|
# Only 1 audit entry from the first PUT, none from the duplicate
|
|
assert len(entries) == 1
|
|
|
|
|
|
# ── Partial update ───────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_partial_update_changes_only_specified_field(client, creator_user_auth):
|
|
"""PUT with only kb_inclusion changes only that field, leaves others."""
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
# Set all three fields
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True, "training_usage": True, "public_display": False},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
# Now change only kb_inclusion
|
|
resp = await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": False},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["kb_inclusion"] is False
|
|
assert data["training_usage"] is True # unchanged
|
|
assert data["public_display"] is False # unchanged
|
|
|
|
|
|
# ── Admin access ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_can_read_any_creators_consent(
|
|
client, creator_user_auth, admin_auth
|
|
):
|
|
"""Admin can GET consent for any creator's video."""
|
|
vid = str(creator_user_auth["video_ids"][0])
|
|
# Creator sets consent
|
|
await client.put(
|
|
f"{BASE}/videos/{vid}",
|
|
json={"kb_inclusion": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
# Admin reads it
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{vid}", headers=admin_auth["headers"]
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["kb_inclusion"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_summary_returns_correct_counts(
|
|
client, creator_user_auth, admin_auth
|
|
):
|
|
"""Admin summary endpoint returns aggregate consent counts."""
|
|
# Set consent on both videos
|
|
for vid in creator_user_auth["video_ids"]:
|
|
await client.put(
|
|
f"{BASE}/videos/{str(vid)}",
|
|
json={"kb_inclusion": True, "training_usage": False, "public_display": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
|
|
resp = await client.get(
|
|
f"{BASE}/admin/summary", headers=admin_auth["headers"]
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total_videos"] == 2
|
|
assert data["kb_inclusion_granted"] == 2
|
|
assert data["training_usage_granted"] == 0
|
|
assert data["public_display_granted"] == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_non_admin_cannot_access_summary(client, creator_user_auth):
|
|
"""Non-admin user → 403 on admin summary endpoint."""
|
|
resp = await client.get(
|
|
f"{BASE}/admin/summary", headers=creator_user_auth["headers"]
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
# ── Nonexistent video ────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_nonexistent_video_returns_404(client, creator_user_auth):
|
|
"""GET /consent/videos/{random_uuid} → 404."""
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{uuid.uuid4()}", headers=creator_user_auth["headers"]
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_put_nonexistent_video_returns_404(client, creator_user_auth):
|
|
"""PUT /consent/videos/{random_uuid} → 404."""
|
|
resp = await client.put(
|
|
f"{BASE}/videos/{uuid.uuid4()}",
|
|
json={"kb_inclusion": True},
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_nonexistent_video_returns_404(client, creator_user_auth):
|
|
"""GET /consent/videos/{random_uuid}/history → 404."""
|
|
resp = await client.get(
|
|
f"{BASE}/videos/{uuid.uuid4()}/history",
|
|
headers=creator_user_auth["headers"],
|
|
)
|
|
assert resp.status_code == 404
|