media-rip/backend/app/core/sse_broker.py
xpltd efc2ead796 M001: media.rip() v1.0 — complete application
Full-featured self-hosted yt-dlp web frontend:
- Python 3.12+ / FastAPI backend with async SQLite, SSE transport, session isolation
- Vue 3 / TypeScript / Pinia frontend with real-time progress, theme picker
- 3 built-in themes (cyberpunk/dark/light) + drop-in custom theme system
- Admin auth (bcrypt), purge system, cookie upload, file serving
- Docker multi-stage build, GitHub Actions CI/CD
- 179 backend tests, 29 frontend tests (208 total)

Slices: S01 (Foundation), S02 (SSE+Sessions), S03 (Frontend),
        S04 (Admin+Auth), S05 (Themes), S06 (Docker+CI)
2026-03-18 20:00:17 -05:00

76 lines
2.9 KiB
Python

"""Server-Sent Events broker for per-session event distribution.
The broker holds one list of ``asyncio.Queue`` per session. Download
workers running on a :pymod:`concurrent.futures` thread call
:meth:`publish` which uses ``loop.call_soon_threadsafe`` to marshal the
event onto the asyncio event loop — making it safe to call from any thread.
"""
from __future__ import annotations
import asyncio
import logging
logger = logging.getLogger("mediarip.sse")
class SSEBroker:
"""Thread-safe pub/sub for SSE events, keyed by session ID."""
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
self._subscribers: dict[str, list[asyncio.Queue]] = {}
# ------------------------------------------------------------------
# Subscription management (called from the asyncio thread)
# ------------------------------------------------------------------
def subscribe(self, session_id: str) -> asyncio.Queue:
"""Create and return a new queue for *session_id*."""
queue: asyncio.Queue = asyncio.Queue()
self._subscribers.setdefault(session_id, []).append(queue)
logger.debug("Subscriber added for session %s (total: %d)",
session_id, len(self._subscribers[session_id]))
return queue
def unsubscribe(self, session_id: str, queue: asyncio.Queue) -> None:
"""Remove *queue* from *session_id*'s subscriber list."""
queues = self._subscribers.get(session_id)
if queues is None:
return
try:
queues.remove(queue)
except ValueError:
pass
if not queues:
del self._subscribers[session_id]
logger.debug("Subscriber removed for session %s", session_id)
# ------------------------------------------------------------------
# Publishing (safe to call from ANY thread)
# ------------------------------------------------------------------
def publish(self, session_id: str, event: object) -> None:
"""Schedule event delivery on the event loop — thread-safe.
This is the primary entry point for download worker threads.
"""
self._loop.call_soon_threadsafe(self._publish_sync, session_id, event)
def _publish_sync(self, session_id: str, event: object) -> None:
"""Deliver *event* to all queues for *session_id*.
Runs on the event loop thread (scheduled via ``call_soon_threadsafe``).
Silently skips sessions with no subscribers so yt-dlp workers can
fire-and-forget without checking subscription state.
"""
queues = self._subscribers.get(session_id)
if not queues:
return
for queue in queues:
try:
queue.put_nowait(event)
except asyncio.QueueFull:
logger.warning(
"Queue full for session %s — dropping event", session_id
)