test: Add 22 integration tests for consent endpoints covering auth, own…
- "backend/tests/test_consent.py" - "backend/tests/conftest.py" GSD-Task: S03/T03
This commit is contained in:
parent
ab3c723533
commit
ea8ddf74f0
2 changed files with 589 additions and 0 deletions
|
|
@ -40,6 +40,7 @@ from models import ( # noqa: E402
|
|||
TranscriptSegment,
|
||||
User,
|
||||
UserRole,
|
||||
VideoConsent,
|
||||
)
|
||||
|
||||
TEST_DATABASE_URL = os.getenv(
|
||||
|
|
@ -237,3 +238,142 @@ async def auth_headers(client, registered_user):
|
|||
token = resp.json()["access_token"]
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
# ── Consent test fixtures ────────────────────────────────────────────────────
|
||||
|
||||
_CONSENT_INVITE = "CONSENT-INV-2026"
|
||||
_CREATOR_EMAIL = "creator@chrysopedia.com"
|
||||
_CREATOR_PASSWORD = "creatorpass123"
|
||||
_ADMIN_EMAIL = "admin@chrysopedia.com"
|
||||
_ADMIN_PASSWORD = "adminpass123"
|
||||
|
||||
|
||||
@pytest_asyncio.fixture()
|
||||
async def creator_with_videos(db_engine):
|
||||
"""Create a Creator with 2 SourceVideos. Returns dict with IDs."""
|
||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
||||
async with factory() as session:
|
||||
creator = Creator(
|
||||
name="TestCreator",
|
||||
slug="testcreator",
|
||||
folder_name="TestCreator",
|
||||
)
|
||||
session.add(creator)
|
||||
await session.flush()
|
||||
|
||||
video1 = SourceVideo(
|
||||
creator_id=creator.id,
|
||||
filename="video-one.mp4",
|
||||
file_path="TestCreator/video-one.mp4",
|
||||
duration_seconds=600,
|
||||
content_type=ContentType.tutorial,
|
||||
processing_status=ProcessingStatus.not_started,
|
||||
)
|
||||
video2 = SourceVideo(
|
||||
creator_id=creator.id,
|
||||
filename="video-two.mp4",
|
||||
file_path="TestCreator/video-two.mp4",
|
||||
duration_seconds=900,
|
||||
content_type=ContentType.tutorial,
|
||||
processing_status=ProcessingStatus.not_started,
|
||||
)
|
||||
session.add_all([video1, video2])
|
||||
await session.flush()
|
||||
|
||||
result = {
|
||||
"creator_id": creator.id,
|
||||
"video_ids": [video1.id, video2.id],
|
||||
}
|
||||
await session.commit()
|
||||
return result
|
||||
|
||||
|
||||
@pytest_asyncio.fixture()
|
||||
async def creator_user_auth(client, db_engine, creator_with_videos):
|
||||
"""Register a user linked to the test creator, return auth headers."""
|
||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
||||
|
||||
# Create invite code for this fixture
|
||||
async with factory() as session:
|
||||
code = InviteCode(code=_CONSENT_INVITE, uses_remaining=10)
|
||||
session.add(code)
|
||||
await session.commit()
|
||||
|
||||
# Register user
|
||||
resp = await client.post("/api/v1/auth/register", json={
|
||||
"email": _CREATOR_EMAIL,
|
||||
"password": _CREATOR_PASSWORD,
|
||||
"display_name": "Creator User",
|
||||
"invite_code": _CONSENT_INVITE,
|
||||
})
|
||||
assert resp.status_code == 201
|
||||
user_data = resp.json()
|
||||
|
||||
# Link user to creator via direct DB update
|
||||
async with factory() as session:
|
||||
from sqlalchemy import update
|
||||
await session.execute(
|
||||
update(User)
|
||||
.where(User.id == uuid.UUID(user_data["id"]))
|
||||
.values(creator_id=creator_with_videos["creator_id"])
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
# Login to get token
|
||||
resp = await client.post("/api/v1/auth/login", json={
|
||||
"email": _CREATOR_EMAIL,
|
||||
"password": _CREATOR_PASSWORD,
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
token = resp.json()["access_token"]
|
||||
return {
|
||||
"headers": {"Authorization": f"Bearer {token}"},
|
||||
"user_id": user_data["id"],
|
||||
"creator_id": creator_with_videos["creator_id"],
|
||||
"video_ids": creator_with_videos["video_ids"],
|
||||
}
|
||||
|
||||
|
||||
@pytest_asyncio.fixture()
|
||||
async def admin_auth(client, db_engine):
|
||||
"""Register an admin-role user, return auth headers."""
|
||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
||||
|
||||
# Create invite code
|
||||
async with factory() as session:
|
||||
code = InviteCode(code="ADMIN-INV-2026", uses_remaining=10)
|
||||
session.add(code)
|
||||
await session.commit()
|
||||
|
||||
# Register user
|
||||
resp = await client.post("/api/v1/auth/register", json={
|
||||
"email": _ADMIN_EMAIL,
|
||||
"password": _ADMIN_PASSWORD,
|
||||
"display_name": "Admin User",
|
||||
"invite_code": "ADMIN-INV-2026",
|
||||
})
|
||||
assert resp.status_code == 201
|
||||
user_data = resp.json()
|
||||
|
||||
# Promote to admin via direct DB update
|
||||
async with factory() as session:
|
||||
from sqlalchemy import update
|
||||
await session.execute(
|
||||
update(User)
|
||||
.where(User.id == uuid.UUID(user_data["id"]))
|
||||
.values(role=UserRole.admin)
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
# Login to get token (token reflects old role, but DB has admin)
|
||||
resp = await client.post("/api/v1/auth/login", json={
|
||||
"email": _ADMIN_EMAIL,
|
||||
"password": _ADMIN_PASSWORD,
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
token = resp.json()["access_token"]
|
||||
return {
|
||||
"headers": {"Authorization": f"Bearer {token}"},
|
||||
"user_id": user_data["id"],
|
||||
}
|
||||
|
||||
|
|
|
|||
449
backend/tests/test_consent.py
Normal file
449
backend/tests/test_consent.py
Normal file
|
|
@ -0,0 +1,449 @@
|
|||
"""Integration tests for consent endpoints.
|
||||
|
||||
Covers auth/authz, ownership gating, audit trail, idempotency,
|
||||
partial updates, admin access, pagination, and 404 handling.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
BASE = "/api/v1/consent"
|
||||
|
||||
|
||||
# ── Auth tests ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unauthenticated_get_returns_401(client):
|
||||
"""GET /consent/videos without token → 401."""
|
||||
resp = await client.get(f"{BASE}/videos")
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unauthenticated_put_returns_401(client):
|
||||
"""PUT /consent/videos/{id} without token → 401."""
|
||||
resp = await client.put(
|
||||
f"{BASE}/videos/{uuid.uuid4()}", json={"kb_inclusion": True}
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_without_creator_id_list_returns_403(client, auth_headers):
|
||||
"""User with no creator_id → 403 on GET /consent/videos."""
|
||||
resp = await client.get(f"{BASE}/videos", headers=auth_headers)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_without_creator_id_get_single_returns_403(
|
||||
client, auth_headers, creator_with_videos
|
||||
):
|
||||
"""User with no creator_id → 403 on GET /consent/videos/{id}."""
|
||||
vid = str(creator_with_videos["video_ids"][0])
|
||||
resp = await client.get(f"{BASE}/videos/{vid}", headers=auth_headers)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_without_creator_id_put_returns_403(
|
||||
client, auth_headers, creator_with_videos
|
||||
):
|
||||
"""User with no creator_id → 403 on PUT /consent/videos/{id}."""
|
||||
vid = str(creator_with_videos["video_ids"][0])
|
||||
resp = await client.put(
|
||||
f"{BASE}/videos/{vid}", json={"kb_inclusion": True}, headers=auth_headers
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ── Ownership tests ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creator_cannot_read_other_creators_video(
|
||||
client, db_engine, creator_user_auth
|
||||
):
|
||||
"""Creator A can't GET consent for Creator B's video → 403."""
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
from models import Creator, SourceVideo, ContentType, ProcessingStatus
|
||||
|
||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
||||
async with factory() as session:
|
||||
other_creator = Creator(
|
||||
name="OtherCreator", slug="othercreator", folder_name="Other"
|
||||
)
|
||||
session.add(other_creator)
|
||||
await session.flush()
|
||||
other_video = SourceVideo(
|
||||
creator_id=other_creator.id,
|
||||
filename="other.mp4",
|
||||
file_path="Other/other.mp4",
|
||||
duration_seconds=100,
|
||||
content_type=ContentType.tutorial,
|
||||
processing_status=ProcessingStatus.not_started,
|
||||
)
|
||||
session.add(other_video)
|
||||
await session.flush()
|
||||
other_vid = str(other_video.id)
|
||||
await session.commit()
|
||||
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{other_vid}", headers=creator_user_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creator_cannot_update_other_creators_video(
|
||||
client, db_engine, creator_user_auth
|
||||
):
|
||||
"""Creator A can't PUT consent for Creator B's video → 403."""
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
from models import Creator, SourceVideo, ContentType, ProcessingStatus
|
||||
|
||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
||||
async with factory() as session:
|
||||
other_creator = Creator(
|
||||
name="OtherCreator2", slug="othercreator2", folder_name="Other2"
|
||||
)
|
||||
session.add(other_creator)
|
||||
await session.flush()
|
||||
other_video = SourceVideo(
|
||||
creator_id=other_creator.id,
|
||||
filename="other2.mp4",
|
||||
file_path="Other2/other2.mp4",
|
||||
duration_seconds=100,
|
||||
content_type=ContentType.tutorial,
|
||||
processing_status=ProcessingStatus.not_started,
|
||||
)
|
||||
session.add(other_video)
|
||||
await session.flush()
|
||||
other_vid = str(other_video.id)
|
||||
await session.commit()
|
||||
|
||||
resp = await client.put(
|
||||
f"{BASE}/videos/{other_vid}",
|
||||
json={"kb_inclusion": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ── Happy path — PUT ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_put_creates_consent_and_audit_entries(client, creator_user_auth):
|
||||
"""First PUT creates VideoConsent + audit entries with correct flags."""
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
resp = await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True, "training_usage": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["kb_inclusion"] is True
|
||||
assert data["training_usage"] is True
|
||||
# public_display defaults to True
|
||||
assert data["public_display"] is True
|
||||
assert data["source_video_id"] == vid
|
||||
|
||||
|
||||
# ── Happy path — GET list ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_list_returns_creator_videos(client, creator_user_auth):
|
||||
"""GET /consent/videos returns consents for the creator's videos."""
|
||||
# Create consent for one video first
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos", headers=creator_user_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["total"] >= 1
|
||||
assert len(data["items"]) >= 1
|
||||
# Verify our video is in the results
|
||||
video_ids_in_results = [item["source_video_id"] for item in data["items"]]
|
||||
assert vid in video_ids_in_results
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_list_pagination(client, creator_user_auth):
|
||||
"""GET /consent/videos respects offset/limit pagination."""
|
||||
# Create consent for both videos
|
||||
for vid in creator_user_auth["video_ids"]:
|
||||
await client.put(
|
||||
f"{BASE}/videos/{str(vid)}",
|
||||
json={"kb_inclusion": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
|
||||
# Fetch with limit=1
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos?limit=1&offset=0",
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["total"] == 2
|
||||
assert len(data["items"]) == 1
|
||||
|
||||
# Fetch page 2
|
||||
resp2 = await client.get(
|
||||
f"{BASE}/videos?limit=1&offset=1",
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
assert resp2.status_code == 200
|
||||
data2 = resp2.json()
|
||||
assert len(data2["items"]) == 1
|
||||
# Different video than page 1
|
||||
assert data2["items"][0]["source_video_id"] != data["items"][0]["source_video_id"]
|
||||
|
||||
|
||||
# ── Happy path — GET single ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_single_returns_defaults_when_no_consent(client, creator_user_auth):
|
||||
"""GET /consent/videos/{id} returns defaults when no consent exists."""
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{vid}", headers=creator_user_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["kb_inclusion"] is False
|
||||
assert data["training_usage"] is False
|
||||
assert data["public_display"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_single_returns_updated_consent(client, creator_user_auth):
|
||||
"""GET returns the consent values after a PUT."""
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True, "public_display": False},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{vid}", headers=creator_user_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["kb_inclusion"] is True
|
||||
assert data["training_usage"] is False
|
||||
assert data["public_display"] is False
|
||||
|
||||
|
||||
# ── Audit trail ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_audit_trail_records_field_changes(client, creator_user_auth):
|
||||
"""PUT changing 2 fields creates 2 audit entries with correct versions."""
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True, "training_usage": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{vid}/history", headers=creator_user_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
entries = resp.json()
|
||||
assert len(entries) == 2
|
||||
# Versions should be incrementing
|
||||
versions = [e["version"] for e in entries]
|
||||
assert versions == [1, 2]
|
||||
# Both entries should have new_value=True
|
||||
for entry in entries:
|
||||
assert entry["new_value"] is True
|
||||
# Fields should cover both changed fields
|
||||
field_names = {e["field_name"] for e in entries}
|
||||
assert field_names == {"kb_inclusion", "training_usage"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_audit_trail_ordered_by_version(client, creator_user_auth):
|
||||
"""Successive PUTs create audit entries in version order."""
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
# First change
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
# Second change
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": False},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{vid}/history", headers=creator_user_auth["headers"]
|
||||
)
|
||||
entries = resp.json()
|
||||
assert len(entries) == 2
|
||||
assert entries[0]["version"] < entries[1]["version"]
|
||||
assert entries[0]["new_value"] is True
|
||||
assert entries[1]["new_value"] is False
|
||||
assert entries[1]["old_value"] is True
|
||||
|
||||
|
||||
# ── Idempotency ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_idempotent_put_creates_no_audit_entries(client, creator_user_auth):
|
||||
"""PUT with same values as current state → no new audit entries."""
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
# Set initial values
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
# Same PUT again
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{vid}/history", headers=creator_user_auth["headers"]
|
||||
)
|
||||
entries = resp.json()
|
||||
# Only 1 audit entry from the first PUT, none from the duplicate
|
||||
assert len(entries) == 1
|
||||
|
||||
|
||||
# ── Partial update ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_partial_update_changes_only_specified_field(client, creator_user_auth):
|
||||
"""PUT with only kb_inclusion changes only that field, leaves others."""
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
# Set all three fields
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True, "training_usage": True, "public_display": False},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
# Now change only kb_inclusion
|
||||
resp = await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": False},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["kb_inclusion"] is False
|
||||
assert data["training_usage"] is True # unchanged
|
||||
assert data["public_display"] is False # unchanged
|
||||
|
||||
|
||||
# ── Admin access ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_can_read_any_creators_consent(
|
||||
client, creator_user_auth, admin_auth
|
||||
):
|
||||
"""Admin can GET consent for any creator's video."""
|
||||
vid = str(creator_user_auth["video_ids"][0])
|
||||
# Creator sets consent
|
||||
await client.put(
|
||||
f"{BASE}/videos/{vid}",
|
||||
json={"kb_inclusion": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
# Admin reads it
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{vid}", headers=admin_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["kb_inclusion"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_summary_returns_correct_counts(
|
||||
client, creator_user_auth, admin_auth
|
||||
):
|
||||
"""Admin summary endpoint returns aggregate consent counts."""
|
||||
# Set consent on both videos
|
||||
for vid in creator_user_auth["video_ids"]:
|
||||
await client.put(
|
||||
f"{BASE}/videos/{str(vid)}",
|
||||
json={"kb_inclusion": True, "training_usage": False, "public_display": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
|
||||
resp = await client.get(
|
||||
f"{BASE}/admin/summary", headers=admin_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["total_videos"] == 2
|
||||
assert data["kb_inclusion_granted"] == 2
|
||||
assert data["training_usage_granted"] == 0
|
||||
assert data["public_display_granted"] == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_admin_cannot_access_summary(client, creator_user_auth):
|
||||
"""Non-admin user → 403 on admin summary endpoint."""
|
||||
resp = await client.get(
|
||||
f"{BASE}/admin/summary", headers=creator_user_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ── Nonexistent video ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_nonexistent_video_returns_404(client, creator_user_auth):
|
||||
"""GET /consent/videos/{random_uuid} → 404."""
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{uuid.uuid4()}", headers=creator_user_auth["headers"]
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_put_nonexistent_video_returns_404(client, creator_user_auth):
|
||||
"""PUT /consent/videos/{random_uuid} → 404."""
|
||||
resp = await client.put(
|
||||
f"{BASE}/videos/{uuid.uuid4()}",
|
||||
json={"kb_inclusion": True},
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_history_nonexistent_video_returns_404(client, creator_user_auth):
|
||||
"""GET /consent/videos/{random_uuid}/history → 404."""
|
||||
resp = await client.get(
|
||||
f"{BASE}/videos/{uuid.uuid4()}/history",
|
||||
headers=creator_user_auth["headers"],
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
Loading…
Add table
Reference in a new issue