feat: Added _build_compose_user_prompt(), _compose_into_existing(), and…

- "backend/pipeline/stages.py"

GSD-Task: S04/T01
This commit is contained in:
jlightner 2026-04-03 01:29:21 +00:00
parent 66b02dd94e
commit 943a5102fe

View file

@ -24,6 +24,8 @@ from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker
from config import get_settings
from sqlalchemy.dialects.postgresql import insert as pg_insert
from models import (
Creator,
KeyMoment,
@ -33,6 +35,7 @@ from models import (
SourceVideo,
TechniquePage,
TechniquePageVersion,
TechniquePageVideo,
TranscriptSegment,
)
from pipeline.embedding_client import EmbeddingClient
@ -980,6 +983,117 @@ def _build_moments_text(
return "\n\n".join(moments_lines), all_tags
def _build_compose_user_prompt(
existing_page: TechniquePage,
existing_moments: list[KeyMoment],
new_moments: list[tuple[KeyMoment, dict]],
creator_name: str,
) -> str:
"""Build the user prompt for composing new moments into an existing page.
Existing moments keep indices [0]-[N-1].
New moments get indices [N]-[N+M-1].
XML-tagged prompt structure matches test_harness.py build_compose_prompt().
"""
category = existing_page.topic_category or "Uncategorized"
# Serialize existing page to dict matching SynthesizedPage shape
sq = existing_page.source_quality
sq_value = sq.value if hasattr(sq, "value") else sq
page_dict = {
"title": existing_page.title,
"slug": existing_page.slug,
"topic_category": existing_page.topic_category,
"summary": existing_page.summary,
"body_sections": existing_page.body_sections,
"signal_chains": existing_page.signal_chains,
"plugins": existing_page.plugins,
"source_quality": sq_value,
}
# Format existing moments [0]-[N-1] using _build_moments_text pattern
# Existing moments don't have classification data — use empty dict
existing_as_tuples = [(m, {}) for m in existing_moments]
existing_text, _ = _build_moments_text(existing_as_tuples, category)
# Format new moments [N]-[N+M-1] with offset indices
n = len(existing_moments)
new_lines = []
for i, (m, cls_info) in enumerate(new_moments):
tags = cls_info.get("topic_tags", [])
new_lines.append(
f"[{n + i}] Title: {m.title}\n"
f" Summary: {m.summary}\n"
f" Content type: {m.content_type.value}\n"
f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
f" Category: {category}\n"
f" Tags: {', '.join(tags) if tags else 'none'}\n"
f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
)
new_text = "\n\n".join(new_lines)
page_json = json.dumps(page_dict, indent=2, ensure_ascii=False, default=str)
return (
f"<existing_page>\n{page_json}\n</existing_page>\n"
f"<existing_moments>\n{existing_text}\n</existing_moments>\n"
f"<new_moments>\n{new_text}\n</new_moments>\n"
f"<creator>{creator_name}</creator>"
)
def _compose_into_existing(
existing_page: TechniquePage,
existing_moments: list[KeyMoment],
new_moment_group: list[tuple[KeyMoment, dict]],
category: str,
creator_name: str,
system_prompt: str,
llm: LLMClient,
model_override: str | None,
modality: str,
hard_limit: int,
video_id: str,
run_id: str | None,
) -> SynthesisResult:
"""Compose new moments into an existing technique page via LLM.
Loads the compose system prompt, builds the compose user prompt, and
calls the LLM with the same retry/parse pattern as _synthesize_chunk().
"""
compose_prompt = _load_prompt("stage5_compose.txt", video_id=video_id)
user_prompt = _build_compose_user_prompt(
existing_page, existing_moments, new_moment_group, creator_name,
)
estimated_input = estimate_max_tokens(
compose_prompt, user_prompt,
stage="stage5_synthesis", hard_limit=hard_limit,
)
logger.info(
"Stage 5: Composing into '%s'%d existing + %d new moments, max_tokens=%d",
existing_page.slug, len(existing_moments), len(new_moment_group), estimated_input,
)
raw = llm.complete(
compose_prompt, user_prompt, response_model=SynthesisResult,
on_complete=_make_llm_callback(
video_id, "stage5_synthesis",
system_prompt=compose_prompt, user_prompt=user_prompt,
run_id=run_id, context_label=f"compose:{category}",
request_params=_build_request_params(
estimated_input, model_override, modality, "SynthesisResult", hard_limit,
),
),
modality=modality, model_override=model_override, max_tokens=estimated_input,
)
return _safe_parse_llm_response(
raw, SynthesisResult, llm, compose_prompt, user_prompt,
modality=modality, model_override=model_override, max_tokens=estimated_input,
)
def _synthesize_chunk(
chunk: list[tuple[KeyMoment, dict]],
category: str,
@ -1198,8 +1312,52 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
for _, cls_info in moment_group:
all_tags.update(cls_info.get("topic_tags", []))
# ── Compose-or-create detection ────────────────────────
# Check if an existing technique page already covers this
# creator + category combination (from a prior video run).
compose_matches = session.execute(
select(TechniquePage).where(
TechniquePage.creator_id == video.creator_id,
func.lower(TechniquePage.topic_category) == func.lower(category),
)
).scalars().all()
if len(compose_matches) > 1:
logger.warning(
"Stage 5: Multiple existing pages (%d) match creator=%s category='%s'. "
"Using first match '%s'.",
len(compose_matches), video.creator_id, category,
compose_matches[0].slug,
)
compose_target = compose_matches[0] if compose_matches else None
if compose_target is not None:
# Load existing moments linked to this page
existing_moments = session.execute(
select(KeyMoment)
.where(KeyMoment.technique_page_id == compose_target.id)
.order_by(KeyMoment.start_time)
).scalars().all()
logger.info(
"Stage 5: Composing into existing page '%s' "
"(%d existing moments + %d new moments)",
compose_target.slug,
len(existing_moments),
len(moment_group),
)
compose_result = _compose_into_existing(
compose_target, existing_moments, moment_group,
category, creator_name, system_prompt,
llm, model_override, modality, hard_limit,
video_id, run_id,
)
synthesized_pages = list(compose_result.pages)
# ── Chunked synthesis with truncation recovery ─────────
if len(moment_group) <= chunk_size:
elif len(moment_group) <= chunk_size:
# Small group — try single LLM call first
try:
result = _synthesize_chunk(
@ -1379,6 +1537,16 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
pages_created += 1
# Set body_sections_format on every page (new or updated)
page.body_sections_format = "v2"
# Track contributing video via TechniquePageVideo
stmt = pg_insert(TechniquePageVideo.__table__).values(
technique_page_id=page.id,
source_video_id=video.id,
).on_conflict_do_nothing()
session.execute(stmt)
# Link moments to the technique page using moment_indices
if page_moment_indices: