From 943a5102feb493a1417a51cb2b4c45bb5ad8a2d3 Mon Sep 17 00:00:00 2001 From: jlightner Date: Fri, 3 Apr 2026 01:29:21 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Added=20=5Fbuild=5Fcompose=5Fuser=5Fpro?= =?UTF-8?q?mpt(),=20=5Fcompose=5Finto=5Fexisting(),=20and=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/pipeline/stages.py" GSD-Task: S04/T01 --- backend/pipeline/stages.py | 170 ++++++++++++++++++++++++++++++++++++- 1 file changed, 169 insertions(+), 1 deletion(-) diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index 2e6fe5d..eee7b00 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -24,6 +24,8 @@ from sqlalchemy import create_engine, func, select from sqlalchemy.orm import Session, sessionmaker from config import get_settings +from sqlalchemy.dialects.postgresql import insert as pg_insert + from models import ( Creator, KeyMoment, @@ -33,6 +35,7 @@ from models import ( SourceVideo, TechniquePage, TechniquePageVersion, + TechniquePageVideo, TranscriptSegment, ) from pipeline.embedding_client import EmbeddingClient @@ -980,6 +983,117 @@ def _build_moments_text( return "\n\n".join(moments_lines), all_tags +def _build_compose_user_prompt( + existing_page: TechniquePage, + existing_moments: list[KeyMoment], + new_moments: list[tuple[KeyMoment, dict]], + creator_name: str, +) -> str: + """Build the user prompt for composing new moments into an existing page. + + Existing moments keep indices [0]-[N-1]. + New moments get indices [N]-[N+M-1]. + XML-tagged prompt structure matches test_harness.py build_compose_prompt(). + """ + category = existing_page.topic_category or "Uncategorized" + + # Serialize existing page to dict matching SynthesizedPage shape + sq = existing_page.source_quality + sq_value = sq.value if hasattr(sq, "value") else sq + page_dict = { + "title": existing_page.title, + "slug": existing_page.slug, + "topic_category": existing_page.topic_category, + "summary": existing_page.summary, + "body_sections": existing_page.body_sections, + "signal_chains": existing_page.signal_chains, + "plugins": existing_page.plugins, + "source_quality": sq_value, + } + + # Format existing moments [0]-[N-1] using _build_moments_text pattern + # Existing moments don't have classification data — use empty dict + existing_as_tuples = [(m, {}) for m in existing_moments] + existing_text, _ = _build_moments_text(existing_as_tuples, category) + + # Format new moments [N]-[N+M-1] with offset indices + n = len(existing_moments) + new_lines = [] + for i, (m, cls_info) in enumerate(new_moments): + tags = cls_info.get("topic_tags", []) + new_lines.append( + f"[{n + i}] Title: {m.title}\n" + f" Summary: {m.summary}\n" + f" Content type: {m.content_type.value}\n" + f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n" + f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n" + f" Category: {category}\n" + f" Tags: {', '.join(tags) if tags else 'none'}\n" + f" Transcript excerpt: {(m.raw_transcript or '')[:300]}" + ) + new_text = "\n\n".join(new_lines) + + page_json = json.dumps(page_dict, indent=2, ensure_ascii=False, default=str) + + return ( + f"\n{page_json}\n\n" + f"\n{existing_text}\n\n" + f"\n{new_text}\n\n" + f"{creator_name}" + ) + + +def _compose_into_existing( + existing_page: TechniquePage, + existing_moments: list[KeyMoment], + new_moment_group: list[tuple[KeyMoment, dict]], + category: str, + creator_name: str, + system_prompt: str, + llm: LLMClient, + model_override: str | None, + modality: str, + hard_limit: int, + video_id: str, + run_id: str | None, +) -> SynthesisResult: + """Compose new moments into an existing technique page via LLM. + + Loads the compose system prompt, builds the compose user prompt, and + calls the LLM with the same retry/parse pattern as _synthesize_chunk(). + """ + compose_prompt = _load_prompt("stage5_compose.txt", video_id=video_id) + user_prompt = _build_compose_user_prompt( + existing_page, existing_moments, new_moment_group, creator_name, + ) + + estimated_input = estimate_max_tokens( + compose_prompt, user_prompt, + stage="stage5_synthesis", hard_limit=hard_limit, + ) + logger.info( + "Stage 5: Composing into '%s' — %d existing + %d new moments, max_tokens=%d", + existing_page.slug, len(existing_moments), len(new_moment_group), estimated_input, + ) + + raw = llm.complete( + compose_prompt, user_prompt, response_model=SynthesisResult, + on_complete=_make_llm_callback( + video_id, "stage5_synthesis", + system_prompt=compose_prompt, user_prompt=user_prompt, + run_id=run_id, context_label=f"compose:{category}", + request_params=_build_request_params( + estimated_input, model_override, modality, "SynthesisResult", hard_limit, + ), + ), + modality=modality, model_override=model_override, max_tokens=estimated_input, + ) + return _safe_parse_llm_response( + raw, SynthesisResult, llm, compose_prompt, user_prompt, + modality=modality, model_override=model_override, max_tokens=estimated_input, + ) + + def _synthesize_chunk( chunk: list[tuple[KeyMoment, dict]], category: str, @@ -1198,8 +1312,52 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: for _, cls_info in moment_group: all_tags.update(cls_info.get("topic_tags", [])) + # ── Compose-or-create detection ──────────────────────── + # Check if an existing technique page already covers this + # creator + category combination (from a prior video run). + compose_matches = session.execute( + select(TechniquePage).where( + TechniquePage.creator_id == video.creator_id, + func.lower(TechniquePage.topic_category) == func.lower(category), + ) + ).scalars().all() + + if len(compose_matches) > 1: + logger.warning( + "Stage 5: Multiple existing pages (%d) match creator=%s category='%s'. " + "Using first match '%s'.", + len(compose_matches), video.creator_id, category, + compose_matches[0].slug, + ) + + compose_target = compose_matches[0] if compose_matches else None + + if compose_target is not None: + # Load existing moments linked to this page + existing_moments = session.execute( + select(KeyMoment) + .where(KeyMoment.technique_page_id == compose_target.id) + .order_by(KeyMoment.start_time) + ).scalars().all() + + logger.info( + "Stage 5: Composing into existing page '%s' " + "(%d existing moments + %d new moments)", + compose_target.slug, + len(existing_moments), + len(moment_group), + ) + + compose_result = _compose_into_existing( + compose_target, existing_moments, moment_group, + category, creator_name, system_prompt, + llm, model_override, modality, hard_limit, + video_id, run_id, + ) + synthesized_pages = list(compose_result.pages) + # ── Chunked synthesis with truncation recovery ───────── - if len(moment_group) <= chunk_size: + elif len(moment_group) <= chunk_size: # Small group — try single LLM call first try: result = _synthesize_chunk( @@ -1379,6 +1537,16 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: pages_created += 1 + # Set body_sections_format on every page (new or updated) + page.body_sections_format = "v2" + + # Track contributing video via TechniquePageVideo + stmt = pg_insert(TechniquePageVideo.__table__).values( + technique_page_id=page.id, + source_video_id=video.id, + ).on_conflict_do_nothing() + session.execute(stmt) + # Link moments to the technique page using moment_indices if page_moment_indices: