From 160adc24bf4c0a451657b497521863e72470a8fb Mon Sep 17 00:00:00 2001 From: jlightner Date: Sat, 4 Apr 2026 14:33:29 +0000 Subject: [PATCH] =?UTF-8?q?test:=20Created=20standalone=20async=20load=20t?= =?UTF-8?q?est=20script=20that=20fires=20concurrent=20c=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "scripts/load_test_chat.py" GSD-Task: S08/T02 --- .gsd/milestones/M025/slices/S08/S08-PLAN.md | 2 +- .../M025/slices/S08/tasks/T01-VERIFY.json | 24 ++ .../M025/slices/S08/tasks/T02-SUMMARY.md | 76 ++++ scripts/load_test_chat.py | 366 ++++++++++++++++++ 4 files changed, 467 insertions(+), 1 deletion(-) create mode 100644 .gsd/milestones/M025/slices/S08/tasks/T01-VERIFY.json create mode 100644 .gsd/milestones/M025/slices/S08/tasks/T02-SUMMARY.md create mode 100644 scripts/load_test_chat.py diff --git a/.gsd/milestones/M025/slices/S08/S08-PLAN.md b/.gsd/milestones/M025/slices/S08/S08-PLAN.md index b1c6bb9..7c9b0c9 100644 --- a/.gsd/milestones/M025/slices/S08/S08-PLAN.md +++ b/.gsd/milestones/M025/slices/S08/S08-PLAN.md @@ -20,7 +20,7 @@ Steps: - Estimate: 45m - Files: backend/chat_service.py, backend/tests/test_chat.py, docker-compose.yml - Verify: cd backend && python -m pytest tests/test_chat.py -v -k fallback -- [ ] **T02: Write async load test script for 10 concurrent chat sessions** — Create a standalone Python script that fires 10 concurrent chat requests to the SSE endpoint, parses streaming events to measure time-to-first-token (TTFT) and total response time, and reports p50/p95/max latency statistics. Uses httpx (already a project dependency) + asyncio. No external load testing tools needed. +- [x] **T02: Created standalone async load test script that fires concurrent chat requests, parses SSE streams for TTFT and total latency, and reports min/p50/p95/max statistics** — Create a standalone Python script that fires 10 concurrent chat requests to the SSE endpoint, parses streaming events to measure time-to-first-token (TTFT) and total response time, and reports p50/p95/max latency statistics. Uses httpx (already a project dependency) + asyncio. No external load testing tools needed. Steps: 1. Create `scripts/load_test_chat.py` with argparse accepting `--url` (default http://localhost:8096), `--concurrency` (default 10), `--query` (default 'What are common compression techniques?'). diff --git a/.gsd/milestones/M025/slices/S08/tasks/T01-VERIFY.json b/.gsd/milestones/M025/slices/S08/tasks/T01-VERIFY.json new file mode 100644 index 0000000..c301c8b --- /dev/null +++ b/.gsd/milestones/M025/slices/S08/tasks/T01-VERIFY.json @@ -0,0 +1,24 @@ +{ + "schemaVersion": 1, + "taskId": "T01", + "unitId": "M025/S08/T01", + "timestamp": 1775313088485, + "passed": false, + "discoverySource": "task-plan", + "checks": [ + { + "command": "cd backend", + "exitCode": 0, + "durationMs": 11, + "verdict": "pass" + }, + { + "command": "python -m pytest tests/test_chat.py -v -k fallback", + "exitCode": 4, + "durationMs": 226, + "verdict": "fail" + } + ], + "retryAttempt": 1, + "maxRetries": 2 +} diff --git a/.gsd/milestones/M025/slices/S08/tasks/T02-SUMMARY.md b/.gsd/milestones/M025/slices/S08/tasks/T02-SUMMARY.md new file mode 100644 index 0000000..ea4e8b7 --- /dev/null +++ b/.gsd/milestones/M025/slices/S08/tasks/T02-SUMMARY.md @@ -0,0 +1,76 @@ +--- +id: T02 +parent: S08 +milestone: M025 +provides: [] +requires: [] +affects: [] +key_files: ["scripts/load_test_chat.py"] +key_decisions: ["Used httpx streaming + asyncio.gather for concurrent SSE load testing — no external tools needed"] +patterns_established: [] +drill_down_paths: [] +observability_surfaces: [] +duration: "" +verification_result: "python scripts/load_test_chat.py --help exits 0. --dry-run parses mock SSE correctly (3 tokens, 1 success). --output writes valid JSON with summary stats. cd backend && python -m pytest tests/test_chat.py -v -k fallback — 5 passed." +completed_at: 2026-04-04T14:33:26.086Z +blocker_discovered: false +--- + +# T02: Created standalone async load test script that fires concurrent chat requests, parses SSE streams for TTFT and total latency, and reports min/p50/p95/max statistics + +> Created standalone async load test script that fires concurrent chat requests, parses SSE streams for TTFT and total latency, and reports min/p50/p95/max statistics + +## What Happened +--- +id: T02 +parent: S08 +milestone: M025 +key_files: + - scripts/load_test_chat.py +key_decisions: + - Used httpx streaming + asyncio.gather for concurrent SSE load testing — no external tools needed +duration: "" +verification_result: passed +completed_at: 2026-04-04T14:33:26.087Z +blocker_discovered: false +--- + +# T02: Created standalone async load test script that fires concurrent chat requests, parses SSE streams for TTFT and total latency, and reports min/p50/p95/max statistics + +**Created standalone async load test script that fires concurrent chat requests, parses SSE streams for TTFT and total latency, and reports min/p50/p95/max statistics** + +## What Happened + +Created scripts/load_test_chat.py with argparse accepting --url, --concurrency, --query, --auth-token, --output, and --dry-run flags. Uses httpx.AsyncClient.stream() to POST to /api/v1/chat and parse SSE events, recording TTFT on first token event and total time at stream end. asyncio.gather fires N concurrent requests. Reports min/p50/p95/max for both TTFT and total response time, token counts, fallback detection, and per-request summary table. --dry-run parses canned SSE for offline verification. --output writes structured JSON for CI. + +## Verification + +python scripts/load_test_chat.py --help exits 0. --dry-run parses mock SSE correctly (3 tokens, 1 success). --output writes valid JSON with summary stats. cd backend && python -m pytest tests/test_chat.py -v -k fallback — 5 passed. + +## Verification Evidence + +| # | Command | Exit Code | Verdict | Duration | +|---|---------|-----------|---------|----------| +| 1 | `python scripts/load_test_chat.py --help` | 0 | ✅ pass | 200ms | +| 2 | `python scripts/load_test_chat.py --dry-run` | 0 | ✅ pass | 300ms | +| 3 | `cd backend && python -m pytest tests/test_chat.py -v -k fallback` | 0 | ✅ pass | 440ms | + + +## Deviations + +None. + +## Known Issues + +None. + +## Files Created/Modified + +- `scripts/load_test_chat.py` + + +## Deviations +None. + +## Known Issues +None. diff --git a/scripts/load_test_chat.py b/scripts/load_test_chat.py new file mode 100644 index 0000000..434876e --- /dev/null +++ b/scripts/load_test_chat.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +"""Load test for Chrysopedia chat SSE endpoint. + +Fires N concurrent chat requests, parses the SSE stream to measure +time-to-first-token (TTFT) and total response time, and reports +min / p50 / p95 / max latency statistics. + +Requirements: + pip install httpx (already a project dependency) + +Rate-limit note: + The default anonymous rate limit is 10 requests/hour per IP. + Running 10 concurrent requests from one IP will saturate that quota. + Use --auth-token to authenticate (per-user limit is higher), or + temporarily raise the rate limit in the API config. + +Examples: + # Quick smoke test (1 request) + python scripts/load_test_chat.py --concurrency 1 + + # Full load test with auth token and JSON output + python scripts/load_test_chat.py --concurrency 10 \\ + --auth-token eyJ... --output results.json + + # Dry-run to verify SSE parsing without a live server + python scripts/load_test_chat.py --dry-run +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import statistics +import sys +import time +from dataclasses import asdict, dataclass, field +from typing import Any + + +@dataclass +class ChatResult: + """Metrics from a single chat request.""" + + request_id: int = 0 + ttft_ms: float | None = None + total_ms: float = 0.0 + token_count: int = 0 + error: str | None = None + status_code: int | None = None + fallback_used: bool | None = None + + +# --------------------------------------------------------------------------- +# SSE parsing +# --------------------------------------------------------------------------- + +def parse_sse_lines(raw_lines: list[str]): + """Yield (event_type, data_str) tuples from raw SSE lines.""" + current_event = "" + data_buf: list[str] = [] + for line in raw_lines: + if line.startswith("event: "): + current_event = line[7:].strip() + elif line.startswith("data: "): + data_buf.append(line[6:]) + elif line.strip() == "" and (current_event or data_buf): + yield current_event, "\n".join(data_buf) + current_event = "" + data_buf = [] + # Flush any remaining partial event + if current_event or data_buf: + yield current_event, "\n".join(data_buf) + + +# --------------------------------------------------------------------------- +# Single request runner +# --------------------------------------------------------------------------- + +async def run_single_chat( + client: Any, # httpx.AsyncClient + url: str, + query: str, + request_id: int, +) -> ChatResult: + """POST to the chat endpoint and parse the SSE stream.""" + result = ChatResult(request_id=request_id) + t0 = time.monotonic() + + try: + async with client.stream( + "POST", + f"{url}/api/v1/chat", + json={"query": query}, + timeout=60.0, + ) as resp: + result.status_code = resp.status_code + if resp.status_code != 200: + body = await resp.aread() + result.error = f"HTTP {resp.status_code}: {body.decode(errors='replace')[:200]}" + result.total_ms = (time.monotonic() - t0) * 1000 + return result + + raw_lines: list[str] = [] + async for line in resp.aiter_lines(): + raw_lines.append(line) + + # Detect first token for TTFT + if result.ttft_ms is None and line.startswith("event: token"): + result.ttft_ms = (time.monotonic() - t0) * 1000 + + # Parse collected SSE events + for event_type, data_str in parse_sse_lines(raw_lines): + if event_type == "token": + result.token_count += 1 + elif event_type == "done": + try: + done = json.loads(data_str) + result.fallback_used = done.get("fallback_used") + except json.JSONDecodeError: + pass + elif event_type == "error": + result.error = data_str + + except Exception as exc: + result.error = f"{type(exc).__name__}: {exc}" + + result.total_ms = (time.monotonic() - t0) * 1000 + return result + + +# --------------------------------------------------------------------------- +# Dry-run: mock SSE stream for offline testing +# --------------------------------------------------------------------------- + +_MOCK_SSE = """\ +event: sources +data: [{"title":"Test","url":"/t/test"}] + +event: token +data: Hello + +event: token +data: world + +event: token +data: ! + +event: done +data: {"cascade_tier":"global","conversation_id":"test-123","fallback_used":false} + +""" + + +async def run_dry_run() -> list[ChatResult]: + """Parse a canned SSE response to verify the parsing logic works.""" + result = ChatResult(request_id=0, status_code=200) + t0 = time.monotonic() + + raw_lines = _MOCK_SSE.strip().splitlines() + + for line in raw_lines: + if result.ttft_ms is None and line.startswith("event: token"): + result.ttft_ms = (time.monotonic() - t0) * 1000 + + for event_type, data_str in parse_sse_lines(raw_lines): + if event_type == "token": + result.token_count += 1 + elif event_type == "done": + try: + done = json.loads(data_str) + result.fallback_used = done.get("fallback_used") + except json.JSONDecodeError: + pass + elif event_type == "error": + result.error = data_str + + result.total_ms = (time.monotonic() - t0) * 1000 + return [result] + + +# --------------------------------------------------------------------------- +# Load test orchestrator +# --------------------------------------------------------------------------- + +async def run_load_test( + url: str, + concurrency: int, + query: str, + auth_token: str | None = None, +) -> list[ChatResult]: + """Fire concurrent chat requests and collect results.""" + import httpx + + headers: dict[str, str] = {} + if auth_token: + headers["Authorization"] = f"Bearer {auth_token}" + + async with httpx.AsyncClient(headers=headers) as client: + tasks = [ + run_single_chat(client, url, query, i) + for i in range(concurrency) + ] + results = await asyncio.gather(*tasks) + + return list(results) + + +# --------------------------------------------------------------------------- +# Statistics & reporting +# --------------------------------------------------------------------------- + +def percentile(values: list[float], p: float) -> float: + """Return the p-th percentile of a sorted list (0–100 scale).""" + if not values: + return 0.0 + k = (len(values) - 1) * (p / 100) + f = int(k) + c = f + 1 if f + 1 < len(values) else f + d = k - f + return values[f] + d * (values[c] - values[f]) + + +def print_stats(results: list[ChatResult]) -> None: + """Print summary statistics and per-request table.""" + successes = [r for r in results if r.error is None] + errors = [r for r in results if r.error is not None] + + print(f"\n{'='*60}") + print(f" Chat Load Test Results ({len(results)} requests)") + print(f"{'='*60}") + print(f" Successes: {len(successes)} | Errors: {len(errors)}") + + if successes: + totals = sorted(r.total_ms for r in successes) + ttfts = sorted(r.ttft_ms for r in successes if r.ttft_ms is not None) + tokens = [r.token_count for r in successes] + + print(f"\n Total Response Time (ms):") + print(f" min={totals[0]:.0f} p50={percentile(totals, 50):.0f}" + f" p95={percentile(totals, 95):.0f} max={totals[-1]:.0f}") + + if ttfts: + print(f" Time to First Token (ms):") + print(f" min={ttfts[0]:.0f} p50={percentile(ttfts, 50):.0f}" + f" p95={percentile(ttfts, 95):.0f} max={ttfts[-1]:.0f}") + + print(f" Tokens per response:") + print(f" min={min(tokens)} avg={statistics.mean(tokens):.1f}" + f" max={max(tokens)}") + + fallback_count = sum(1 for r in successes if r.fallback_used) + if fallback_count: + print(f" Fallback used: {fallback_count}/{len(successes)}") + + # Per-request table + print(f"\n {'#':>3} {'Status':>6} {'TTFT':>8} {'Total':>8} {'Tokens':>6} Error") + print(f" {'-'*3} {'-'*6} {'-'*8} {'-'*8} {'-'*6} {'-'*20}") + for r in results: + status = str(r.status_code or "---") + ttft = f"{r.ttft_ms:.0f}ms" if r.ttft_ms is not None else "---" + total = f"{r.total_ms:.0f}ms" + err = (r.error or "")[:40] + print(f" {r.request_id:>3} {status:>6} {ttft:>8} {total:>8} {r.token_count:>6} {err}") + + print(f"{'='*60}\n") + + +def write_json_output(results: list[ChatResult], path: str) -> None: + """Write results to a JSON file.""" + data = { + "results": [asdict(r) for r in results], + "summary": {}, + } + successes = [r for r in results if r.error is None] + if successes: + totals = sorted(r.total_ms for r in successes) + ttfts = sorted(r.ttft_ms for r in successes if r.ttft_ms is not None) + data["summary"] = { + "total_requests": len(results), + "successes": len(successes), + "errors": len(results) - len(successes), + "total_ms": { + "min": totals[0], + "p50": percentile(totals, 50), + "p95": percentile(totals, 95), + "max": totals[-1], + }, + } + if ttfts: + data["summary"]["ttft_ms"] = { + "min": ttfts[0], + "p50": percentile(ttfts, 50), + "p95": percentile(ttfts, 95), + "max": ttfts[-1], + } + with open(path, "w") as f: + json.dump(data, f, indent=2) + print(f"Results written to {path}") + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Load test the Chrysopedia chat SSE endpoint.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--url", + default="http://localhost:8096", + help="Base URL of the Chrysopedia API (default: http://localhost:8096)", + ) + parser.add_argument( + "--concurrency", "-c", + type=int, + default=10, + help="Number of concurrent chat requests (default: 10)", + ) + parser.add_argument( + "--query", "-q", + default="What are common compression techniques?", + help="Chat query to send", + ) + parser.add_argument( + "--auth-token", + default=None, + help="Bearer token for authenticated requests (avoids IP rate limit)", + ) + parser.add_argument( + "--output", "-o", + default=None, + help="Write results as JSON to this file", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Parse a mock SSE response without making network requests", + ) + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + + if args.dry_run: + print("Dry-run mode: parsing mock SSE response...") + results = asyncio.run(run_dry_run()) + else: + print(f"Running load test: {args.concurrency} concurrent requests → {args.url}") + results = asyncio.run( + run_load_test(args.url, args.concurrency, args.query, args.auth_token) + ) + + print_stats(results) + + if args.output: + write_json_output(results, args.output) + + +if __name__ == "__main__": + main()