mirror of
https://github.com/xpltdco/media-rip.git
synced 2026-04-03 10:54:00 -06:00
Fix SSE keepalive: yield explicit ping event, enforce test timeout
- event_generator now yields {event: 'ping', data: ''} on KEEPALIVE_TIMEOUT
instead of silently looping. Gives SSE clients stream-level liveness signal.
- _collect_events helper now enforces its timeout parameter via asyncio.wait_for,
preventing tests from hanging indefinitely if generator never yields.
This commit is contained in:
parent
43ddf43951
commit
507a87ad3d
2 changed files with 11 additions and 6 deletions
|
|
@ -70,8 +70,9 @@ async def event_generator(
|
|||
"data": json.dumps(event.model_dump()),
|
||||
}
|
||||
except asyncio.TimeoutError:
|
||||
# No event in KEEPALIVE_TIMEOUT — loop back and wait again.
|
||||
# sse-starlette's built-in ping handles the actual keepalive.
|
||||
# Yield an explicit ping so SSE clients see stream liveness
|
||||
# (in addition to sse-starlette's built-in TCP keepalive).
|
||||
yield {"event": "ping", "data": ""}
|
||||
continue
|
||||
finally:
|
||||
broker.unsubscribe(session_id, queue)
|
||||
|
|
|
|||
|
|
@ -37,10 +37,14 @@ def _make_job(session_id: str, *, status: str = "queued", **overrides) -> Job:
|
|||
async def _collect_events(gen, *, count: int = 1, timeout: float = 5.0):
|
||||
"""Consume *count* events from an async generator with a safety timeout."""
|
||||
events = []
|
||||
|
||||
async def _drain():
|
||||
async for event in gen:
|
||||
events.append(event)
|
||||
if len(events) >= count:
|
||||
break
|
||||
|
||||
await asyncio.wait_for(_drain(), timeout=timeout)
|
||||
return events
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue