Fix SSE keepalive: yield explicit ping event, enforce test timeout

- event_generator now yields {event: 'ping', data: ''} on KEEPALIVE_TIMEOUT
  instead of silently looping. Gives SSE clients stream-level liveness signal.
- _collect_events helper now enforces its timeout parameter via asyncio.wait_for,
  preventing tests from hanging indefinitely if generator never yields.
This commit is contained in:
xpltd 2026-03-21 20:57:50 -05:00
parent 43ddf43951
commit 6cb3828b92
2 changed files with 11 additions and 6 deletions

View file

@ -70,8 +70,9 @@ async def event_generator(
"data": json.dumps(event.model_dump()), "data": json.dumps(event.model_dump()),
} }
except asyncio.TimeoutError: except asyncio.TimeoutError:
# No event in KEEPALIVE_TIMEOUT — loop back and wait again. # Yield an explicit ping so SSE clients see stream liveness
# sse-starlette's built-in ping handles the actual keepalive. # (in addition to sse-starlette's built-in TCP keepalive).
yield {"event": "ping", "data": ""}
continue continue
finally: finally:
broker.unsubscribe(session_id, queue) broker.unsubscribe(session_id, queue)

View file

@ -37,10 +37,14 @@ def _make_job(session_id: str, *, status: str = "queued", **overrides) -> Job:
async def _collect_events(gen, *, count: int = 1, timeout: float = 5.0): async def _collect_events(gen, *, count: int = 1, timeout: float = 5.0):
"""Consume *count* events from an async generator with a safety timeout.""" """Consume *count* events from an async generator with a safety timeout."""
events = [] events = []
async for event in gen:
events.append(event) async def _drain():
if len(events) >= count: async for event in gen:
break events.append(event)
if len(events) >= count:
break
await asyncio.wait_for(_drain(), timeout=timeout)
return events return events