diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py
index 2e6fe5d..eee7b00 100644
--- a/backend/pipeline/stages.py
+++ b/backend/pipeline/stages.py
@@ -24,6 +24,8 @@ from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker
from config import get_settings
+from sqlalchemy.dialects.postgresql import insert as pg_insert
+
from models import (
Creator,
KeyMoment,
@@ -33,6 +35,7 @@ from models import (
SourceVideo,
TechniquePage,
TechniquePageVersion,
+ TechniquePageVideo,
TranscriptSegment,
)
from pipeline.embedding_client import EmbeddingClient
@@ -980,6 +983,117 @@ def _build_moments_text(
return "\n\n".join(moments_lines), all_tags
+def _build_compose_user_prompt(
+ existing_page: TechniquePage,
+ existing_moments: list[KeyMoment],
+ new_moments: list[tuple[KeyMoment, dict]],
+ creator_name: str,
+) -> str:
+ """Build the user prompt for composing new moments into an existing page.
+
+ Existing moments keep indices [0]-[N-1].
+ New moments get indices [N]-[N+M-1].
+ XML-tagged prompt structure matches test_harness.py build_compose_prompt().
+ """
+ category = existing_page.topic_category or "Uncategorized"
+
+ # Serialize existing page to dict matching SynthesizedPage shape
+ sq = existing_page.source_quality
+ sq_value = sq.value if hasattr(sq, "value") else sq
+ page_dict = {
+ "title": existing_page.title,
+ "slug": existing_page.slug,
+ "topic_category": existing_page.topic_category,
+ "summary": existing_page.summary,
+ "body_sections": existing_page.body_sections,
+ "signal_chains": existing_page.signal_chains,
+ "plugins": existing_page.plugins,
+ "source_quality": sq_value,
+ }
+
+ # Format existing moments [0]-[N-1] using _build_moments_text pattern
+ # Existing moments don't have classification data — use empty dict
+ existing_as_tuples = [(m, {}) for m in existing_moments]
+ existing_text, _ = _build_moments_text(existing_as_tuples, category)
+
+ # Format new moments [N]-[N+M-1] with offset indices
+ n = len(existing_moments)
+ new_lines = []
+ for i, (m, cls_info) in enumerate(new_moments):
+ tags = cls_info.get("topic_tags", [])
+ new_lines.append(
+ f"[{n + i}] Title: {m.title}\n"
+ f" Summary: {m.summary}\n"
+ f" Content type: {m.content_type.value}\n"
+ f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
+ f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
+ f" Category: {category}\n"
+ f" Tags: {', '.join(tags) if tags else 'none'}\n"
+ f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
+ )
+ new_text = "\n\n".join(new_lines)
+
+ page_json = json.dumps(page_dict, indent=2, ensure_ascii=False, default=str)
+
+ return (
+ f"\n{page_json}\n\n"
+ f"\n{existing_text}\n\n"
+ f"\n{new_text}\n\n"
+ f"{creator_name}"
+ )
+
+
+def _compose_into_existing(
+ existing_page: TechniquePage,
+ existing_moments: list[KeyMoment],
+ new_moment_group: list[tuple[KeyMoment, dict]],
+ category: str,
+ creator_name: str,
+ system_prompt: str,
+ llm: LLMClient,
+ model_override: str | None,
+ modality: str,
+ hard_limit: int,
+ video_id: str,
+ run_id: str | None,
+) -> SynthesisResult:
+ """Compose new moments into an existing technique page via LLM.
+
+ Loads the compose system prompt, builds the compose user prompt, and
+ calls the LLM with the same retry/parse pattern as _synthesize_chunk().
+ """
+ compose_prompt = _load_prompt("stage5_compose.txt", video_id=video_id)
+ user_prompt = _build_compose_user_prompt(
+ existing_page, existing_moments, new_moment_group, creator_name,
+ )
+
+ estimated_input = estimate_max_tokens(
+ compose_prompt, user_prompt,
+ stage="stage5_synthesis", hard_limit=hard_limit,
+ )
+ logger.info(
+ "Stage 5: Composing into '%s' — %d existing + %d new moments, max_tokens=%d",
+ existing_page.slug, len(existing_moments), len(new_moment_group), estimated_input,
+ )
+
+ raw = llm.complete(
+ compose_prompt, user_prompt, response_model=SynthesisResult,
+ on_complete=_make_llm_callback(
+ video_id, "stage5_synthesis",
+ system_prompt=compose_prompt, user_prompt=user_prompt,
+ run_id=run_id, context_label=f"compose:{category}",
+ request_params=_build_request_params(
+ estimated_input, model_override, modality, "SynthesisResult", hard_limit,
+ ),
+ ),
+ modality=modality, model_override=model_override, max_tokens=estimated_input,
+ )
+ return _safe_parse_llm_response(
+ raw, SynthesisResult, llm, compose_prompt, user_prompt,
+ modality=modality, model_override=model_override, max_tokens=estimated_input,
+ )
+
+
def _synthesize_chunk(
chunk: list[tuple[KeyMoment, dict]],
category: str,
@@ -1198,8 +1312,52 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
for _, cls_info in moment_group:
all_tags.update(cls_info.get("topic_tags", []))
+ # ── Compose-or-create detection ────────────────────────
+ # Check if an existing technique page already covers this
+ # creator + category combination (from a prior video run).
+ compose_matches = session.execute(
+ select(TechniquePage).where(
+ TechniquePage.creator_id == video.creator_id,
+ func.lower(TechniquePage.topic_category) == func.lower(category),
+ )
+ ).scalars().all()
+
+ if len(compose_matches) > 1:
+ logger.warning(
+ "Stage 5: Multiple existing pages (%d) match creator=%s category='%s'. "
+ "Using first match '%s'.",
+ len(compose_matches), video.creator_id, category,
+ compose_matches[0].slug,
+ )
+
+ compose_target = compose_matches[0] if compose_matches else None
+
+ if compose_target is not None:
+ # Load existing moments linked to this page
+ existing_moments = session.execute(
+ select(KeyMoment)
+ .where(KeyMoment.technique_page_id == compose_target.id)
+ .order_by(KeyMoment.start_time)
+ ).scalars().all()
+
+ logger.info(
+ "Stage 5: Composing into existing page '%s' "
+ "(%d existing moments + %d new moments)",
+ compose_target.slug,
+ len(existing_moments),
+ len(moment_group),
+ )
+
+ compose_result = _compose_into_existing(
+ compose_target, existing_moments, moment_group,
+ category, creator_name, system_prompt,
+ llm, model_override, modality, hard_limit,
+ video_id, run_id,
+ )
+ synthesized_pages = list(compose_result.pages)
+
# ── Chunked synthesis with truncation recovery ─────────
- if len(moment_group) <= chunk_size:
+ elif len(moment_group) <= chunk_size:
# Small group — try single LLM call first
try:
result = _synthesize_chunk(
@@ -1379,6 +1537,16 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
pages_created += 1
+ # Set body_sections_format on every page (new or updated)
+ page.body_sections_format = "v2"
+
+ # Track contributing video via TechniquePageVideo
+ stmt = pg_insert(TechniquePageVideo.__table__).values(
+ technique_page_id=page.id,
+ source_video_id=video.id,
+ ).on_conflict_do_nothing()
+ session.execute(stmt)
+
# Link moments to the technique page using moment_indices
if page_moment_indices: