diff --git a/.gsd/milestones/M014/slices/S02/S02-PLAN.md b/.gsd/milestones/M014/slices/S02/S02-PLAN.md
index 910f8fe..325489a 100644
--- a/.gsd/milestones/M014/slices/S02/S02-PLAN.md
+++ b/.gsd/milestones/M014/slices/S02/S02-PLAN.md
@@ -34,7 +34,7 @@ assert json_match, 'no JSON example found'
json.loads(json_match.group(1))
print('All structural checks passed')
"
-- [ ] **T02: Add compose subcommand to test harness** — Add a `compose` subcommand to `backend/pipeline/test_harness.py` that loads an existing page JSON + new moments fixture, builds a compose user prompt, calls the LLM via the compose prompt, and validates the output. Also extract the compose user-prompt builder as a testable function.
+- [x] **T02: Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing** — Add a `compose` subcommand to `backend/pipeline/test_harness.py` that loads an existing page JSON + new moments fixture, builds a compose user prompt, calls the LLM via the compose prompt, and validates the output. Also extract the compose user-prompt builder as a testable function.
Steps:
1. Read `backend/pipeline/test_harness.py` fully to understand the existing `run` and `promote` subcommand patterns.
diff --git a/.gsd/milestones/M014/slices/S02/tasks/T01-VERIFY.json b/.gsd/milestones/M014/slices/S02/tasks/T01-VERIFY.json
new file mode 100644
index 0000000..dae9b31
--- /dev/null
+++ b/.gsd/milestones/M014/slices/S02/tasks/T01-VERIFY.json
@@ -0,0 +1,9 @@
+{
+ "schemaVersion": 1,
+ "taskId": "T01",
+ "unitId": "M014/S02/T01",
+ "timestamp": 1775178181291,
+ "passed": true,
+ "discoverySource": "none",
+ "checks": []
+}
diff --git a/.gsd/milestones/M014/slices/S02/tasks/T02-SUMMARY.md b/.gsd/milestones/M014/slices/S02/tasks/T02-SUMMARY.md
new file mode 100644
index 0000000..f50c851
--- /dev/null
+++ b/.gsd/milestones/M014/slices/S02/tasks/T02-SUMMARY.md
@@ -0,0 +1,76 @@
+---
+id: T02
+parent: S02
+milestone: M014
+provides: []
+requires: []
+affects: []
+key_files: ["backend/pipeline/test_harness.py"]
+key_decisions: ["Compose accepts both harness output (with .pages[]) and raw SynthesizedPage JSON", "New moment indices built manually with offset to ensure correct [N+i] numbering"]
+patterns_established: []
+drill_down_paths: []
+observability_surfaces: []
+duration: ""
+verification_result: "compose --help exits 0 with correct argument listing. build_compose_prompt() smoke test verified XML tag presence, correct index offsets, and creator tag content."
+completed_at: 2026-04-03T01:05:22.034Z
+blocker_discovered: false
+---
+
+# T02: Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing
+
+> Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing
+
+## What Happened
+---
+id: T02
+parent: S02
+milestone: M014
+key_files:
+ - backend/pipeline/test_harness.py
+key_decisions:
+ - Compose accepts both harness output (with .pages[]) and raw SynthesizedPage JSON
+ - New moment indices built manually with offset to ensure correct [N+i] numbering
+duration: ""
+verification_result: passed
+completed_at: 2026-04-03T01:05:22.034Z
+blocker_discovered: false
+---
+
+# T02: Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing
+
+**Added compose subcommand with build_compose_prompt(), run_compose(), and CLI wiring for offline composition testing**
+
+## What Happened
+
+Added build_compose_prompt() (builds user prompt with XML tags and offset-based citation indices), run_compose() (loads existing page + two fixtures, calls LLM, validates output, logs compose-specific metrics), and the compose CLI subcommand with --existing-page, --fixture, --existing-fixture args plus optional prompt/output/category/model/modality overrides.
+
+## Verification
+
+compose --help exits 0 with correct argument listing. build_compose_prompt() smoke test verified XML tag presence, correct index offsets, and creator tag content.
+
+## Verification Evidence
+
+| # | Command | Exit Code | Verdict | Duration |
+|---|---------|-----------|---------|----------|
+| 1 | `cd backend && python -m pipeline.test_harness compose --help` | 0 | ✅ pass | 500ms |
+| 2 | `python -c build_compose_prompt assertions (XML tags, index offsets, creator)` | 0 | ✅ pass | 400ms |
+
+
+## Deviations
+
+None.
+
+## Known Issues
+
+None.
+
+## Files Created/Modified
+
+- `backend/pipeline/test_harness.py`
+
+
+## Deviations
+None.
+
+## Known Issues
+None.
diff --git a/backend/pipeline/test_harness.py b/backend/pipeline/test_harness.py
index 976fc54..7e17eba 100644
--- a/backend/pipeline/test_harness.py
+++ b/backend/pipeline/test_harness.py
@@ -35,7 +35,7 @@ from pydantic import ValidationError
from config import get_settings
from pipeline.citation_utils import validate_citations
from pipeline.llm_client import LLMClient, estimate_max_tokens
-from pipeline.schemas import SynthesisResult
+from pipeline.schemas import SynthesizedPage, SynthesisResult
# ── Lightweight stand-in for KeyMoment ORM model ───────────────────────────
@@ -318,6 +318,274 @@ def run_synthesis(
return all_pages, exit_code
+# ── Compose: merge new moments into existing page ──────────────────────────
+
+def _count_page_words(page_dict: dict) -> int:
+ """Count total words in a page's body sections."""
+ return sum(
+ len(s.get("content", "").split())
+ + sum(len(sub.get("content", "").split()) for sub in s.get("subsections", []))
+ for s in page_dict.get("body_sections", [])
+ )
+
+
+def build_compose_prompt(
+ existing_page: dict,
+ existing_moments: list[tuple[MockKeyMoment, dict]],
+ new_moments: list[tuple[MockKeyMoment, dict]],
+ creator_name: str,
+) -> str:
+ """Build the user prompt for composition (merging new moments into an existing page).
+
+ Existing moments keep indices [0]-[N-1].
+ New moments get indices [N]-[N+M-1].
+ Uses build_moments_text() for formatting, with index offsets applied for new moments.
+ """
+ category = existing_page.get("topic_category", "Uncategorized")
+
+ # Format existing moments [0]-[N-1]
+ existing_text, _ = build_moments_text(existing_moments, category)
+
+ # Format new moments with offset indices [N]-[N+M-1]
+ n = len(existing_moments)
+ new_lines = []
+ for i, (m, cls_info) in enumerate(new_moments):
+ tags = cls_info.get("topic_tags", [])
+ new_lines.append(
+ f"[{n + i}] Title: {m.title}\n"
+ f" Summary: {m.summary}\n"
+ f" Content type: {m.content_type.value}\n"
+ f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
+ f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
+ f" Category: {category}\n"
+ f" Tags: {', '.join(tags) if tags else 'none'}\n"
+ f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
+ )
+ new_text = "\n\n".join(new_lines)
+
+ page_json = json.dumps(existing_page, indent=2, ensure_ascii=False)
+
+ return (
+ f"\n{page_json}\n\n"
+ f"\n{existing_text}\n\n"
+ f"\n{new_text}\n\n"
+ f"{creator_name}"
+ )
+
+
+def run_compose(
+ existing_page_path: str,
+ existing_fixture_path: str,
+ new_fixture_path: str,
+ prompt_path: str,
+ category_filter: str | None = None,
+ model_override: str | None = None,
+ modality: str | None = None,
+) -> tuple[list[dict], int]:
+ """Run composition: merge new fixture moments into an existing page.
+
+ Returns (pages, exit_code) — same shape as run_synthesis().
+ """
+ # Load existing page JSON
+ existing_page_file = Path(existing_page_path)
+ if not existing_page_file.exists():
+ _log("ERROR", f"Existing page not found: {existing_page_path}", level="ERROR")
+ return [], 3
+
+ try:
+ existing_raw = json.loads(existing_page_file.read_text(encoding="utf-8"))
+ except json.JSONDecodeError as exc:
+ _log("ERROR", f"Invalid JSON in existing page: {exc}", level="ERROR")
+ return [], 3
+
+ # The existing page file might be a harness output (with .pages[]) or a raw SynthesizedPage
+ if "pages" in existing_raw and isinstance(existing_raw["pages"], list):
+ page_dicts = existing_raw["pages"]
+ _log("COMPOSE", f"Loaded harness output with {len(page_dicts)} pages")
+ elif "title" in existing_raw and "body_sections" in existing_raw:
+ page_dicts = [existing_raw]
+ _log("COMPOSE", "Loaded single SynthesizedPage")
+ else:
+ _log("ERROR", "Existing page JSON must be a SynthesizedPage or harness output with 'pages' key", level="ERROR")
+ return [], 3
+
+ # Validate each page against SynthesizedPage
+ validated_pages: list[dict] = []
+ for pd in page_dicts:
+ try:
+ SynthesizedPage.model_validate(pd)
+ validated_pages.append(pd)
+ except ValidationError as exc:
+ _log("WARN", f"Skipping invalid page '{pd.get('title', '?')}': {exc}", level="WARN")
+
+ if not validated_pages:
+ _log("ERROR", "No valid SynthesizedPage found in existing page file", level="ERROR")
+ return [], 3
+
+ # Apply category filter
+ if category_filter:
+ validated_pages = [p for p in validated_pages if p.get("topic_category") == category_filter]
+ if not validated_pages:
+ _log("ERROR", f"No pages match category '{category_filter}'", level="ERROR")
+ return [], 3
+
+ # Load existing moments fixture (the original moments the page was built from)
+ try:
+ existing_fixture = load_fixture(existing_fixture_path)
+ except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
+ _log("ERROR", f"Existing fixture error: {exc}", level="ERROR")
+ return [], 3
+
+ # Load new moments fixture
+ try:
+ new_fixture = load_fixture(new_fixture_path)
+ except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
+ _log("ERROR", f"New fixture error: {exc}", level="ERROR")
+ return [], 3
+
+ # Load prompt
+ prompt_file = Path(prompt_path)
+ if not prompt_file.exists():
+ _log("ERROR", f"Prompt file not found: {prompt_path}", level="ERROR")
+ return [], 3
+ system_prompt = prompt_file.read_text(encoding="utf-8")
+ _log("PROMPT", f"Loading compose prompt: {prompt_path} ({len(system_prompt)} chars)")
+
+ # Setup LLM
+ settings = get_settings()
+ llm = LLMClient(settings)
+ stage_model = model_override or settings.llm_stage5_model or settings.llm_model
+ stage_modality = modality or settings.llm_stage5_modality or "thinking"
+ hard_limit = settings.llm_max_tokens_hard_limit
+ _log("LLM", f"Model: {stage_model}, modality: {stage_modality}, hard_limit: {hard_limit}")
+
+ all_pages: list[dict] = []
+ exit_code = 0
+
+ for page_idx, existing_page in enumerate(validated_pages, 1):
+ page_category = existing_page.get("topic_category", "Uncategorized")
+ page_title = existing_page.get("title", "Untitled")
+ _log("COMPOSE", f"Page {page_idx}/{len(validated_pages)}: '{page_title}' ({page_category})")
+
+ # Get existing moments for this page's category
+ existing_moments = existing_fixture.groups.get(page_category, [])
+ if not existing_moments:
+ _log("WARN", f" No existing moments found for category '{page_category}' — skipping", level="WARN")
+ continue
+
+ # Get new moments for this page's category
+ new_moments = new_fixture.groups.get(page_category, [])
+ if not new_moments:
+ _log("WARN", f" No new moments for category '{page_category}' — nothing to compose", level="WARN")
+ all_pages.append(existing_page)
+ continue
+
+ n_existing = len(existing_moments)
+ n_new = len(new_moments)
+ total_moments = n_existing + n_new
+
+ # Before metrics
+ before_words = _count_page_words(existing_page)
+ before_sections = len(existing_page.get("body_sections", []))
+
+ _log(
+ "COMPOSE",
+ f" Existing: {n_existing} moments, {before_sections} sections, {before_words} words | "
+ f"New: {n_new} moments | Total citation space: [0]-[{total_moments - 1}]",
+ )
+
+ # Build compose prompt
+ user_prompt = build_compose_prompt(
+ existing_page=existing_page,
+ existing_moments=existing_moments,
+ new_moments=new_moments,
+ creator_name=existing_fixture.creator_name,
+ )
+
+ estimated_tokens = estimate_max_tokens(
+ system_prompt, user_prompt,
+ stage="stage5_synthesis",
+ hard_limit=hard_limit,
+ )
+ _log("COMPOSE", f" Prompt built: {len(user_prompt)} chars, max_tokens={estimated_tokens}")
+
+ # Call LLM
+ call_start = time.monotonic()
+ _log("LLM", f" Calling: model={stage_model}, max_tokens={estimated_tokens}, modality={stage_modality}")
+
+ try:
+ raw = llm.complete(
+ system_prompt,
+ user_prompt,
+ response_model=SynthesisResult,
+ modality=stage_modality,
+ model_override=stage_model,
+ max_tokens=estimated_tokens,
+ )
+ except Exception as exc:
+ _log("ERROR", f" LLM call failed: {exc}", level="ERROR")
+ exit_code = 1
+ continue
+
+ call_duration_ms = int((time.monotonic() - call_start) * 1000)
+ prompt_tokens = getattr(raw, "prompt_tokens", None) or 0
+ completion_tokens = getattr(raw, "completion_tokens", None) or 0
+ finish_reason = getattr(raw, "finish_reason", "unknown")
+
+ _log(
+ "LLM",
+ f" Response: {prompt_tokens} prompt + {completion_tokens} completion tokens, "
+ f"{call_duration_ms}ms, finish_reason={finish_reason}",
+ )
+
+ if finish_reason == "length":
+ _log("WARN", " finish_reason=length — output likely truncated!", level="WARN")
+
+ # Parse response
+ try:
+ result = SynthesisResult.model_validate_json(str(raw))
+ except (ValidationError, json.JSONDecodeError) as exc:
+ _log("ERROR", f" Parse failed: {exc}", level="ERROR")
+ _log("ERROR", f" Raw response (first 2000 chars): {str(raw)[:2000]}", level="ERROR")
+ exit_code = 2
+ continue
+
+ # Log compose-specific metrics
+ for page in result.pages:
+ page_dict = page.model_dump()
+ after_words = _count_page_words(page_dict)
+ after_sections = len(page.body_sections or [])
+
+ # Identify new sections (headings not in the original)
+ existing_headings = {s.get("heading", "") for s in existing_page.get("body_sections", [])}
+ new_section_headings = [
+ s.heading for s in (page.body_sections or []) if s.heading not in existing_headings
+ ]
+
+ _log(
+ "COMPOSE",
+ f" Result: '{page.title}' — "
+ f"words {before_words}→{after_words} ({after_words - before_words:+d}), "
+ f"sections {before_sections}→{after_sections} ({after_sections - before_sections:+d})"
+ + (f", new sections: {new_section_headings}" if new_section_headings else ""),
+ )
+
+ # Citation validation with unified moment count
+ cit = validate_citations(page.body_sections or [], total_moments)
+ _log(
+ "CITE",
+ f" Citations: {cit['total_citations']}/{total_moments} moments cited "
+ f"({cit['coverage_pct']}% coverage)"
+ + (f", invalid indices: {cit['invalid_indices']}" if cit['invalid_indices'] else "")
+ + (f", uncited: {cit['uncited_moments']}" if cit['uncited_moments'] else ""),
+ )
+
+ all_pages.append(page_dict)
+
+ _log("SUMMARY", f"Compose complete: {len(all_pages)} pages")
+ return all_pages, exit_code
+
+
# ── Promote: deploy a prompt to production ─────────────────────────────────
_STAGE_PROMPT_MAP = {
@@ -426,6 +694,17 @@ def main() -> int:
promo_parser.add_argument("--reason", "-r", type=str, required=True, help="Why this prompt is being promoted")
promo_parser.add_argument("--commit", action="store_true", help="Also create a git commit")
+ # -- compose subcommand --
+ compose_parser = sub.add_parser("compose", help="Merge new moments into an existing page")
+ compose_parser.add_argument("--existing-page", type=str, required=True, help="Existing page JSON (harness output or raw SynthesizedPage)")
+ compose_parser.add_argument("--fixture", "-f", type=str, required=True, help="New moments fixture JSON")
+ compose_parser.add_argument("--existing-fixture", type=str, required=True, help="Original moments fixture JSON (for citation context)")
+ compose_parser.add_argument("--prompt", "-p", type=str, default=None, help="Compose prompt file (default: stage5_compose.txt)")
+ compose_parser.add_argument("--output", "-o", type=str, default=None, help="Output file path")
+ compose_parser.add_argument("--category", "-c", type=str, default=None, help="Filter to a specific category")
+ compose_parser.add_argument("--model", type=str, default=None, help="Override LLM model")
+ compose_parser.add_argument("--modality", type=str, default=None, choices=["chat", "thinking"])
+
args = parser.parse_args()
# If no subcommand, check for --fixture for backward compat
@@ -437,6 +716,55 @@ def main() -> int:
if args.command == "promote":
return promote_prompt(args.prompt, args.stage, args.reason, args.commit)
+ if args.command == "compose":
+ # Resolve default compose prompt
+ prompt_path = args.prompt
+ if prompt_path is None:
+ settings = get_settings()
+ prompt_path = str(Path(settings.prompts_path) / "stage5_compose.txt")
+
+ overall_start = time.monotonic()
+ pages, exit_code = run_compose(
+ existing_page_path=args.existing_page,
+ existing_fixture_path=args.existing_fixture,
+ new_fixture_path=args.fixture,
+ prompt_path=prompt_path,
+ category_filter=args.category,
+ model_override=args.model,
+ modality=args.modality,
+ )
+
+ if not pages and exit_code != 0:
+ return exit_code
+
+ output = {
+ "existing_page_source": args.existing_page,
+ "existing_fixture_source": args.existing_fixture,
+ "new_fixture_source": args.fixture,
+ "prompt_source": prompt_path,
+ "category_filter": args.category,
+ "pages": pages,
+ "metadata": {
+ "page_count": len(pages),
+ "total_words": sum(_count_page_words(p) for p in pages),
+ "elapsed_seconds": round(time.monotonic() - overall_start, 1),
+ },
+ }
+
+ output_json = json.dumps(output, indent=2, ensure_ascii=False)
+
+ if args.output:
+ Path(args.output).parent.mkdir(parents=True, exist_ok=True)
+ Path(args.output).write_text(output_json, encoding="utf-8")
+ _log("OUTPUT", f"Written to: {args.output} ({len(output_json) / 1024:.1f} KB)")
+ else:
+ print(output_json)
+ _log("OUTPUT", f"Printed to stdout ({len(output_json) / 1024:.1f} KB)")
+
+ total_elapsed = time.monotonic() - overall_start
+ _log("DONE", f"Compose completed in {total_elapsed:.1f}s (exit_code={exit_code})")
+ return exit_code
+
# -- run command --
prompt_path = args.prompt
if prompt_path is None: