test: Added compose subcommand with build_compose_prompt(), run_compose…

- "backend/pipeline/test_harness.py"

GSD-Task: S02/T02
This commit is contained in:
jlightner 2026-04-03 01:05:25 +00:00
parent 709d14802c
commit efe6d7197c
4 changed files with 415 additions and 2 deletions

View file

@ -34,7 +34,7 @@ assert json_match, 'no JSON example found'
json.loads(json_match.group(1))
print('All structural checks passed')
"
- [ ] **T02: Add compose subcommand to test harness** — Add a `compose` subcommand to `backend/pipeline/test_harness.py` that loads an existing page JSON + new moments fixture, builds a compose user prompt, calls the LLM via the compose prompt, and validates the output. Also extract the compose user-prompt builder as a testable function.
- [x] **T02: Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing** — Add a `compose` subcommand to `backend/pipeline/test_harness.py` that loads an existing page JSON + new moments fixture, builds a compose user prompt, calls the LLM via the compose prompt, and validates the output. Also extract the compose user-prompt builder as a testable function.
Steps:
1. Read `backend/pipeline/test_harness.py` fully to understand the existing `run` and `promote` subcommand patterns.

View file

@ -0,0 +1,9 @@
{
"schemaVersion": 1,
"taskId": "T01",
"unitId": "M014/S02/T01",
"timestamp": 1775178181291,
"passed": true,
"discoverySource": "none",
"checks": []
}

View file

@ -0,0 +1,76 @@
---
id: T02
parent: S02
milestone: M014
provides: []
requires: []
affects: []
key_files: ["backend/pipeline/test_harness.py"]
key_decisions: ["Compose accepts both harness output (with .pages[]) and raw SynthesizedPage JSON", "New moment indices built manually with offset to ensure correct [N+i] numbering"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "compose --help exits 0 with correct argument listing. build_compose_prompt() smoke test verified XML tag presence, correct index offsets, and creator tag content."
completed_at: 2026-04-03T01:05:22.034Z
blocker_discovered: false
---
# T02: Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing
> Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing
## What Happened
---
id: T02
parent: S02
milestone: M014
key_files:
- backend/pipeline/test_harness.py
key_decisions:
- Compose accepts both harness output (with .pages[]) and raw SynthesizedPage JSON
- New moment indices built manually with offset to ensure correct [N+i] numbering
duration: ""
verification_result: passed
completed_at: 2026-04-03T01:05:22.034Z
blocker_discovered: false
---
# T02: Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing
**Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing**
## What Happened
Added build_compose_prompt() (builds user prompt with XML tags and offset-based citation indices), run_compose() (loads existing page + two fixtures, calls LLM, validates output, logs compose-specific metrics), and the compose CLI subcommand with --existing-page, --fixture, --existing-fixture args plus optional prompt/output/category/model/modality overrides.
## Verification
compose --help exits 0 with correct argument listing. build_compose_prompt() smoke test verified XML tag presence, correct index offsets, and creator tag content.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `cd backend && python -m pipeline.test_harness compose --help` | 0 | ✅ pass | 500ms |
| 2 | `python -c build_compose_prompt assertions (XML tags, index offsets, creator)` | 0 | ✅ pass | 400ms |
## Deviations
None.
## Known Issues
None.
## Files Created/Modified
- `backend/pipeline/test_harness.py`
## Deviations
None.
## Known Issues
None.

View file

@ -35,7 +35,7 @@ from pydantic import ValidationError
from config import get_settings
from pipeline.citation_utils import validate_citations
from pipeline.llm_client import LLMClient, estimate_max_tokens
from pipeline.schemas import SynthesisResult
from pipeline.schemas import SynthesizedPage, SynthesisResult
# ── Lightweight stand-in for KeyMoment ORM model ───────────────────────────
@ -318,6 +318,274 @@ def run_synthesis(
return all_pages, exit_code
# ── Compose: merge new moments into existing page ──────────────────────────
def _count_page_words(page_dict: dict) -> int:
"""Count total words in a page's body sections."""
return sum(
len(s.get("content", "").split())
+ sum(len(sub.get("content", "").split()) for sub in s.get("subsections", []))
for s in page_dict.get("body_sections", [])
)
def build_compose_prompt(
existing_page: dict,
existing_moments: list[tuple[MockKeyMoment, dict]],
new_moments: list[tuple[MockKeyMoment, dict]],
creator_name: str,
) -> str:
"""Build the user prompt for composition (merging new moments into an existing page).
Existing moments keep indices [0]-[N-1].
New moments get indices [N]-[N+M-1].
Uses build_moments_text() for formatting, with index offsets applied for new moments.
"""
category = existing_page.get("topic_category", "Uncategorized")
# Format existing moments [0]-[N-1]
existing_text, _ = build_moments_text(existing_moments, category)
# Format new moments with offset indices [N]-[N+M-1]
n = len(existing_moments)
new_lines = []
for i, (m, cls_info) in enumerate(new_moments):
tags = cls_info.get("topic_tags", [])
new_lines.append(
f"[{n + i}] Title: {m.title}\n"
f" Summary: {m.summary}\n"
f" Content type: {m.content_type.value}\n"
f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
f" Category: {category}\n"
f" Tags: {', '.join(tags) if tags else 'none'}\n"
f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
)
new_text = "\n\n".join(new_lines)
page_json = json.dumps(existing_page, indent=2, ensure_ascii=False)
return (
f"<existing_page>\n{page_json}\n</existing_page>\n"
f"<existing_moments>\n{existing_text}\n</existing_moments>\n"
f"<new_moments>\n{new_text}\n</new_moments>\n"
f"<creator>{creator_name}</creator>"
)
def run_compose(
existing_page_path: str,
existing_fixture_path: str,
new_fixture_path: str,
prompt_path: str,
category_filter: str | None = None,
model_override: str | None = None,
modality: str | None = None,
) -> tuple[list[dict], int]:
"""Run composition: merge new fixture moments into an existing page.
Returns (pages, exit_code) same shape as run_synthesis().
"""
# Load existing page JSON
existing_page_file = Path(existing_page_path)
if not existing_page_file.exists():
_log("ERROR", f"Existing page not found: {existing_page_path}", level="ERROR")
return [], 3
try:
existing_raw = json.loads(existing_page_file.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
_log("ERROR", f"Invalid JSON in existing page: {exc}", level="ERROR")
return [], 3
# The existing page file might be a harness output (with .pages[]) or a raw SynthesizedPage
if "pages" in existing_raw and isinstance(existing_raw["pages"], list):
page_dicts = existing_raw["pages"]
_log("COMPOSE", f"Loaded harness output with {len(page_dicts)} pages")
elif "title" in existing_raw and "body_sections" in existing_raw:
page_dicts = [existing_raw]
_log("COMPOSE", "Loaded single SynthesizedPage")
else:
_log("ERROR", "Existing page JSON must be a SynthesizedPage or harness output with 'pages' key", level="ERROR")
return [], 3
# Validate each page against SynthesizedPage
validated_pages: list[dict] = []
for pd in page_dicts:
try:
SynthesizedPage.model_validate(pd)
validated_pages.append(pd)
except ValidationError as exc:
_log("WARN", f"Skipping invalid page '{pd.get('title', '?')}': {exc}", level="WARN")
if not validated_pages:
_log("ERROR", "No valid SynthesizedPage found in existing page file", level="ERROR")
return [], 3
# Apply category filter
if category_filter:
validated_pages = [p for p in validated_pages if p.get("topic_category") == category_filter]
if not validated_pages:
_log("ERROR", f"No pages match category '{category_filter}'", level="ERROR")
return [], 3
# Load existing moments fixture (the original moments the page was built from)
try:
existing_fixture = load_fixture(existing_fixture_path)
except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
_log("ERROR", f"Existing fixture error: {exc}", level="ERROR")
return [], 3
# Load new moments fixture
try:
new_fixture = load_fixture(new_fixture_path)
except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
_log("ERROR", f"New fixture error: {exc}", level="ERROR")
return [], 3
# Load prompt
prompt_file = Path(prompt_path)
if not prompt_file.exists():
_log("ERROR", f"Prompt file not found: {prompt_path}", level="ERROR")
return [], 3
system_prompt = prompt_file.read_text(encoding="utf-8")
_log("PROMPT", f"Loading compose prompt: {prompt_path} ({len(system_prompt)} chars)")
# Setup LLM
settings = get_settings()
llm = LLMClient(settings)
stage_model = model_override or settings.llm_stage5_model or settings.llm_model
stage_modality = modality or settings.llm_stage5_modality or "thinking"
hard_limit = settings.llm_max_tokens_hard_limit
_log("LLM", f"Model: {stage_model}, modality: {stage_modality}, hard_limit: {hard_limit}")
all_pages: list[dict] = []
exit_code = 0
for page_idx, existing_page in enumerate(validated_pages, 1):
page_category = existing_page.get("topic_category", "Uncategorized")
page_title = existing_page.get("title", "Untitled")
_log("COMPOSE", f"Page {page_idx}/{len(validated_pages)}: '{page_title}' ({page_category})")
# Get existing moments for this page's category
existing_moments = existing_fixture.groups.get(page_category, [])
if not existing_moments:
_log("WARN", f" No existing moments found for category '{page_category}' — skipping", level="WARN")
continue
# Get new moments for this page's category
new_moments = new_fixture.groups.get(page_category, [])
if not new_moments:
_log("WARN", f" No new moments for category '{page_category}' — nothing to compose", level="WARN")
all_pages.append(existing_page)
continue
n_existing = len(existing_moments)
n_new = len(new_moments)
total_moments = n_existing + n_new
# Before metrics
before_words = _count_page_words(existing_page)
before_sections = len(existing_page.get("body_sections", []))
_log(
"COMPOSE",
f" Existing: {n_existing} moments, {before_sections} sections, {before_words} words | "
f"New: {n_new} moments | Total citation space: [0]-[{total_moments - 1}]",
)
# Build compose prompt
user_prompt = build_compose_prompt(
existing_page=existing_page,
existing_moments=existing_moments,
new_moments=new_moments,
creator_name=existing_fixture.creator_name,
)
estimated_tokens = estimate_max_tokens(
system_prompt, user_prompt,
stage="stage5_synthesis",
hard_limit=hard_limit,
)
_log("COMPOSE", f" Prompt built: {len(user_prompt)} chars, max_tokens={estimated_tokens}")
# Call LLM
call_start = time.monotonic()
_log("LLM", f" Calling: model={stage_model}, max_tokens={estimated_tokens}, modality={stage_modality}")
try:
raw = llm.complete(
system_prompt,
user_prompt,
response_model=SynthesisResult,
modality=stage_modality,
model_override=stage_model,
max_tokens=estimated_tokens,
)
except Exception as exc:
_log("ERROR", f" LLM call failed: {exc}", level="ERROR")
exit_code = 1
continue
call_duration_ms = int((time.monotonic() - call_start) * 1000)
prompt_tokens = getattr(raw, "prompt_tokens", None) or 0
completion_tokens = getattr(raw, "completion_tokens", None) or 0
finish_reason = getattr(raw, "finish_reason", "unknown")
_log(
"LLM",
f" Response: {prompt_tokens} prompt + {completion_tokens} completion tokens, "
f"{call_duration_ms}ms, finish_reason={finish_reason}",
)
if finish_reason == "length":
_log("WARN", " finish_reason=length — output likely truncated!", level="WARN")
# Parse response
try:
result = SynthesisResult.model_validate_json(str(raw))
except (ValidationError, json.JSONDecodeError) as exc:
_log("ERROR", f" Parse failed: {exc}", level="ERROR")
_log("ERROR", f" Raw response (first 2000 chars): {str(raw)[:2000]}", level="ERROR")
exit_code = 2
continue
# Log compose-specific metrics
for page in result.pages:
page_dict = page.model_dump()
after_words = _count_page_words(page_dict)
after_sections = len(page.body_sections or [])
# Identify new sections (headings not in the original)
existing_headings = {s.get("heading", "") for s in existing_page.get("body_sections", [])}
new_section_headings = [
s.heading for s in (page.body_sections or []) if s.heading not in existing_headings
]
_log(
"COMPOSE",
f" Result: '{page.title}'"
f"words {before_words}{after_words} ({after_words - before_words:+d}), "
f"sections {before_sections}{after_sections} ({after_sections - before_sections:+d})"
+ (f", new sections: {new_section_headings}" if new_section_headings else ""),
)
# Citation validation with unified moment count
cit = validate_citations(page.body_sections or [], total_moments)
_log(
"CITE",
f" Citations: {cit['total_citations']}/{total_moments} moments cited "
f"({cit['coverage_pct']}% coverage)"
+ (f", invalid indices: {cit['invalid_indices']}" if cit['invalid_indices'] else "")
+ (f", uncited: {cit['uncited_moments']}" if cit['uncited_moments'] else ""),
)
all_pages.append(page_dict)
_log("SUMMARY", f"Compose complete: {len(all_pages)} pages")
return all_pages, exit_code
# ── Promote: deploy a prompt to production ─────────────────────────────────
_STAGE_PROMPT_MAP = {
@ -426,6 +694,17 @@ def main() -> int:
promo_parser.add_argument("--reason", "-r", type=str, required=True, help="Why this prompt is being promoted")
promo_parser.add_argument("--commit", action="store_true", help="Also create a git commit")
# -- compose subcommand --
compose_parser = sub.add_parser("compose", help="Merge new moments into an existing page")
compose_parser.add_argument("--existing-page", type=str, required=True, help="Existing page JSON (harness output or raw SynthesizedPage)")
compose_parser.add_argument("--fixture", "-f", type=str, required=True, help="New moments fixture JSON")
compose_parser.add_argument("--existing-fixture", type=str, required=True, help="Original moments fixture JSON (for citation context)")
compose_parser.add_argument("--prompt", "-p", type=str, default=None, help="Compose prompt file (default: stage5_compose.txt)")
compose_parser.add_argument("--output", "-o", type=str, default=None, help="Output file path")
compose_parser.add_argument("--category", "-c", type=str, default=None, help="Filter to a specific category")
compose_parser.add_argument("--model", type=str, default=None, help="Override LLM model")
compose_parser.add_argument("--modality", type=str, default=None, choices=["chat", "thinking"])
args = parser.parse_args()
# If no subcommand, check for --fixture for backward compat
@ -437,6 +716,55 @@ def main() -> int:
if args.command == "promote":
return promote_prompt(args.prompt, args.stage, args.reason, args.commit)
if args.command == "compose":
# Resolve default compose prompt
prompt_path = args.prompt
if prompt_path is None:
settings = get_settings()
prompt_path = str(Path(settings.prompts_path) / "stage5_compose.txt")
overall_start = time.monotonic()
pages, exit_code = run_compose(
existing_page_path=args.existing_page,
existing_fixture_path=args.existing_fixture,
new_fixture_path=args.fixture,
prompt_path=prompt_path,
category_filter=args.category,
model_override=args.model,
modality=args.modality,
)
if not pages and exit_code != 0:
return exit_code
output = {
"existing_page_source": args.existing_page,
"existing_fixture_source": args.existing_fixture,
"new_fixture_source": args.fixture,
"prompt_source": prompt_path,
"category_filter": args.category,
"pages": pages,
"metadata": {
"page_count": len(pages),
"total_words": sum(_count_page_words(p) for p in pages),
"elapsed_seconds": round(time.monotonic() - overall_start, 1),
},
}
output_json = json.dumps(output, indent=2, ensure_ascii=False)
if args.output:
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
Path(args.output).write_text(output_json, encoding="utf-8")
_log("OUTPUT", f"Written to: {args.output} ({len(output_json) / 1024:.1f} KB)")
else:
print(output_json)
_log("OUTPUT", f"Printed to stdout ({len(output_json) / 1024:.1f} KB)")
total_elapsed = time.monotonic() - overall_start
_log("DONE", f"Compose completed in {total_elapsed:.1f}s (exit_code={exit_code})")
return exit_code
# -- run command --
prompt_path = args.prompt
if prompt_path is None: