From 6cb3828b92ed3ff37c2d1a11dcb426ff26b1706b Mon Sep 17 00:00:00 2001 From: xpltd Date: Sat, 21 Mar 2026 20:57:50 -0500 Subject: [PATCH] Fix SSE keepalive: yield explicit ping event, enforce test timeout - event_generator now yields {event: 'ping', data: ''} on KEEPALIVE_TIMEOUT instead of silently looping. Gives SSE clients stream-level liveness signal. - _collect_events helper now enforces its timeout parameter via asyncio.wait_for, preventing tests from hanging indefinitely if generator never yields. --- backend/app/routers/sse.py | 5 +++-- backend/tests/test_sse.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/backend/app/routers/sse.py b/backend/app/routers/sse.py index c1b0235..7b17a16 100644 --- a/backend/app/routers/sse.py +++ b/backend/app/routers/sse.py @@ -70,8 +70,9 @@ async def event_generator( "data": json.dumps(event.model_dump()), } except asyncio.TimeoutError: - # No event in KEEPALIVE_TIMEOUT — loop back and wait again. - # sse-starlette's built-in ping handles the actual keepalive. + # Yield an explicit ping so SSE clients see stream liveness + # (in addition to sse-starlette's built-in TCP keepalive). + yield {"event": "ping", "data": ""} continue finally: broker.unsubscribe(session_id, queue) diff --git a/backend/tests/test_sse.py b/backend/tests/test_sse.py index dec757f..738abd7 100644 --- a/backend/tests/test_sse.py +++ b/backend/tests/test_sse.py @@ -37,10 +37,14 @@ def _make_job(session_id: str, *, status: str = "queued", **overrides) -> Job: async def _collect_events(gen, *, count: int = 1, timeout: float = 5.0): """Consume *count* events from an async generator with a safety timeout.""" events = [] - async for event in gen: - events.append(event) - if len(events) >= count: - break + + async def _drain(): + async for event in gen: + events.append(event) + if len(events) >= count: + break + + await asyncio.wait_for(_drain(), timeout=timeout) return events