mirror of
https://github.com/xpltdco/media-rip.git
synced 2026-04-03 10:54:00 -06:00
Full-featured self-hosted yt-dlp web frontend:
- Python 3.12+ / FastAPI backend with async SQLite, SSE transport, session isolation
- Vue 3 / TypeScript / Pinia frontend with real-time progress, theme picker
- 3 built-in themes (cyberpunk/dark/light) + drop-in custom theme system
- Admin auth (bcrypt), purge system, cookie upload, file serving
- Docker multi-stage build, GitHub Actions CI/CD
- 179 backend tests, 29 frontend tests (208 total)
Slices: S01 (Foundation), S02 (SSE+Sessions), S03 (Frontend),
S04 (Admin+Auth), S05 (Themes), S06 (Docker+CI)
76 lines
2.9 KiB
Python
76 lines
2.9 KiB
Python
"""Server-Sent Events broker for per-session event distribution.
|
|
|
|
The broker holds one list of ``asyncio.Queue`` per session. Download
|
|
workers running on a :pymod:`concurrent.futures` thread call
|
|
:meth:`publish` which uses ``loop.call_soon_threadsafe`` to marshal the
|
|
event onto the asyncio event loop — making it safe to call from any thread.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
logger = logging.getLogger("mediarip.sse")
|
|
|
|
|
|
class SSEBroker:
|
|
"""Thread-safe pub/sub for SSE events, keyed by session ID."""
|
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
|
|
self._loop = loop
|
|
self._subscribers: dict[str, list[asyncio.Queue]] = {}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Subscription management (called from the asyncio thread)
|
|
# ------------------------------------------------------------------
|
|
|
|
def subscribe(self, session_id: str) -> asyncio.Queue:
|
|
"""Create and return a new queue for *session_id*."""
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
self._subscribers.setdefault(session_id, []).append(queue)
|
|
logger.debug("Subscriber added for session %s (total: %d)",
|
|
session_id, len(self._subscribers[session_id]))
|
|
return queue
|
|
|
|
def unsubscribe(self, session_id: str, queue: asyncio.Queue) -> None:
|
|
"""Remove *queue* from *session_id*'s subscriber list."""
|
|
queues = self._subscribers.get(session_id)
|
|
if queues is None:
|
|
return
|
|
try:
|
|
queues.remove(queue)
|
|
except ValueError:
|
|
pass
|
|
if not queues:
|
|
del self._subscribers[session_id]
|
|
logger.debug("Subscriber removed for session %s", session_id)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Publishing (safe to call from ANY thread)
|
|
# ------------------------------------------------------------------
|
|
|
|
def publish(self, session_id: str, event: object) -> None:
|
|
"""Schedule event delivery on the event loop — thread-safe.
|
|
|
|
This is the primary entry point for download worker threads.
|
|
"""
|
|
self._loop.call_soon_threadsafe(self._publish_sync, session_id, event)
|
|
|
|
def _publish_sync(self, session_id: str, event: object) -> None:
|
|
"""Deliver *event* to all queues for *session_id*.
|
|
|
|
Runs on the event loop thread (scheduled via ``call_soon_threadsafe``).
|
|
Silently skips sessions with no subscribers so yt-dlp workers can
|
|
fire-and-forget without checking subscription state.
|
|
"""
|
|
queues = self._subscribers.get(session_id)
|
|
if not queues:
|
|
return
|
|
for queue in queues:
|
|
try:
|
|
queue.put_nowait(event)
|
|
except asyncio.QueueFull:
|
|
logger.warning(
|
|
"Queue full for session %s — dropping event", session_id
|
|
)
|