diff --git a/backend/config.py b/backend/config.py index 6064212..17d5bc9 100644 --- a/backend/config.py +++ b/backend/config.py @@ -47,6 +47,9 @@ class Settings(BaseSettings): llm_max_tokens_hard_limit: int = 32768 # Hard ceiling for dynamic estimator llm_max_tokens: int = 65536 # Fallback when no estimate is provided + # Stage 5 synthesis chunking — max moments per LLM call before splitting + synthesis_chunk_size: int = 30 + # Embedding endpoint embedding_api_url: str = "http://localhost:11434/v1" embedding_model: str = "nomic-embed-text" diff --git a/backend/pipeline/schemas.py b/backend/pipeline/schemas.py index e96709e..98a5114 100644 --- a/backend/pipeline/schemas.py +++ b/backend/pipeline/schemas.py @@ -91,6 +91,10 @@ class SynthesizedPage(BaseModel): default="mixed", description="One of: structured, mixed, unstructured", ) + moment_indices: list[int] = Field( + default_factory=list, + description="Indices of source moments (from the input list) that this page covers", + ) class SynthesisResult(BaseModel): diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index c623767..e8085cd 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -643,7 +643,7 @@ def stage4_classification(self, video_id: str, run_id: str | None = None) -> str classification_data.append({ "moment_id": str(moment.id), - "topic_category": cls.topic_category, + "topic_category": cls.topic_category.strip().title(), "topic_tags": cls.topic_tags, }) @@ -784,6 +784,196 @@ def _capture_pipeline_metadata() -> dict: # ── Stage 5: Synthesis ─────────────────────────────────────────────────────── + +def _compute_page_tags( + moment_indices: list[int], + moment_group: list[tuple], + all_tags: set[str], +) -> list[str] | None: + """Compute tags for a specific page from its linked moment indices. + + If moment_indices are available, collects tags only from those moments. + Falls back to all_tags for the category group if no indices provided. + """ + if not moment_indices: + return list(all_tags) if all_tags else None + + page_tags: set[str] = set() + for idx in moment_indices: + if 0 <= idx < len(moment_group): + _, cls_info = moment_group[idx] + page_tags.update(cls_info.get("topic_tags", [])) + + return list(page_tags) if page_tags else None + + +def _build_moments_text( + moment_group: list[tuple[KeyMoment, dict]], + category: str, +) -> tuple[str, set[str]]: + """Build the moments prompt text and collect all tags for a group of moments. + + Returns (moments_text, all_tags). + """ + moments_lines = [] + all_tags: set[str] = set() + for i, (m, cls_info) in enumerate(moment_group): + tags = cls_info.get("topic_tags", []) + all_tags.update(tags) + moments_lines.append( + f"[{i}] Title: {m.title}\n" + f" Summary: {m.summary}\n" + f" Content type: {m.content_type.value}\n" + f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n" + f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n" + f" Category: {category}\n" + f" Tags: {', '.join(tags) if tags else 'none'}\n" + f" Transcript excerpt: {(m.raw_transcript or '')[:300]}" + ) + return "\n\n".join(moments_lines), all_tags + + +def _synthesize_chunk( + chunk: list[tuple[KeyMoment, dict]], + category: str, + creator_name: str, + system_prompt: str, + llm: LLMClient, + model_override: str | None, + modality: str, + hard_limit: int, + video_id: str, + run_id: str | None, + chunk_label: str, +) -> SynthesisResult: + """Run a single synthesis LLM call for a chunk of moments. + + Returns the parsed SynthesisResult. + """ + moments_text, _ = _build_moments_text(chunk, category) + user_prompt = f"{creator_name}\n\n{moments_text}\n" + + estimated_input = estimate_max_tokens(system_prompt, user_prompt, stage="stage5_synthesis", hard_limit=hard_limit) + logger.info( + "Stage 5: Synthesizing %s — %d moments, max_tokens=%d", + chunk_label, len(chunk), estimated_input, + ) + + raw = llm.complete( + system_prompt, user_prompt, response_model=SynthesisResult, + on_complete=_make_llm_callback( + video_id, "stage5_synthesis", + system_prompt=system_prompt, user_prompt=user_prompt, + run_id=run_id, context_label=chunk_label, + ), + modality=modality, model_override=model_override, max_tokens=estimated_input, + ) + return _safe_parse_llm_response( + raw, SynthesisResult, llm, system_prompt, user_prompt, + modality=modality, model_override=model_override, max_tokens=estimated_input, + ) + + +def _slug_base(slug: str) -> str: + """Extract the slug prefix before the creator name suffix for merge grouping. + + E.g. 'wavetable-sound-design-copycatt' → 'wavetable-sound-design' + Also normalizes casing. + """ + return slug.lower().strip() + + +def _merge_pages_by_slug( + all_pages: list, + creator_name: str, + llm: LLMClient, + model_override: str | None, + modality: str, + hard_limit: int, + video_id: str, + run_id: str | None, +) -> list: + """Detect pages with the same slug across chunks and merge them via LLM. + + Pages with unique slugs pass through unchanged. Pages sharing a slug + get sent to a merge prompt that combines them into one cohesive page. + + Returns the final list of SynthesizedPage objects. + """ + from pipeline.schemas import SynthesizedPage + + # Group pages by slug + by_slug: dict[str, list] = defaultdict(list) + for page in all_pages: + by_slug[_slug_base(page.slug)].append(page) + + final_pages = [] + for slug, pages_group in by_slug.items(): + if len(pages_group) == 1: + # Unique slug — no merge needed + final_pages.append(pages_group[0]) + continue + + # Multiple pages share this slug — merge via LLM + logger.info( + "Stage 5: Merging %d partial pages with slug '%s' for video_id=%s", + len(pages_group), slug, video_id, + ) + + # Serialize partial pages to JSON for the merge prompt + pages_json = json.dumps( + [p.model_dump() for p in pages_group], + indent=2, ensure_ascii=False, + ) + + merge_system_prompt = _load_prompt("stage5_merge.txt") + merge_user_prompt = f"{creator_name}\n\n{pages_json}\n" + + max_tokens = estimate_max_tokens( + merge_system_prompt, merge_user_prompt, + stage="stage5_synthesis", hard_limit=hard_limit, + ) + logger.info( + "Stage 5: Merge call for slug '%s' — %d partial pages, max_tokens=%d", + slug, len(pages_group), max_tokens, + ) + + raw = llm.complete( + merge_system_prompt, merge_user_prompt, + response_model=SynthesisResult, + on_complete=_make_llm_callback( + video_id, "stage5_synthesis", + system_prompt=merge_system_prompt, + user_prompt=merge_user_prompt, + run_id=run_id, context_label=f"merge:{slug}", + ), + modality=modality, model_override=model_override, + max_tokens=max_tokens, + ) + merge_result = _safe_parse_llm_response( + raw, SynthesisResult, llm, + merge_system_prompt, merge_user_prompt, + modality=modality, model_override=model_override, + max_tokens=max_tokens, + ) + + if merge_result.pages: + final_pages.extend(merge_result.pages) + logger.info( + "Stage 5: Merge produced %d page(s) for slug '%s'", + len(merge_result.pages), slug, + ) + else: + # Merge returned nothing — fall back to keeping the partials + logger.warning( + "Stage 5: Merge returned 0 pages for slug '%s', keeping %d partials", + slug, len(pages_group), + ) + final_pages.extend(pages_group) + + return final_pages + + @celery_app.task(bind=True, max_retries=3, default_retry_delay=30) def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: """Synthesize technique pages from classified key moments. @@ -792,7 +982,11 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: each group into a TechniquePage, creates/updates page rows, and links KeyMoments to their TechniquePage. - Sets processing_status to 'published'. + For large category groups (exceeding synthesis_chunk_size), moments are + split into chronological chunks, synthesized independently, then pages + with matching slugs are merged via a dedicated merge LLM call. + + Sets processing_status to 'complete'. Returns the video_id for chain compatibility. """ @@ -801,6 +995,7 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: _emit_event(video_id, "stage5_synthesis", "start", run_id=run_id) settings = get_settings() + chunk_size = settings.synthesis_chunk_size session = _get_sync_session() try: # Load video and moments @@ -833,77 +1028,115 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: cls_by_moment_id = {c["moment_id"]: c for c in classification_data} # Group moments by topic_category (from classification) + # Normalize category casing to prevent near-duplicate groups + # (e.g., "Sound design" vs "Sound Design") groups: dict[str, list[tuple[KeyMoment, dict]]] = defaultdict(list) for moment in moments: cls_info = cls_by_moment_id.get(str(moment.id), {}) - category = cls_info.get("topic_category", "Uncategorized") + category = cls_info.get("topic_category", "Uncategorized").strip().title() groups[category].append((moment, cls_info)) system_prompt = _load_prompt("stage5_synthesis.txt") llm = _get_llm_client() model_override, modality = _get_stage_config(5) - hard_limit = get_settings().llm_max_tokens_hard_limit + hard_limit = settings.llm_max_tokens_hard_limit logger.info("Stage 5 using model=%s, modality=%s", model_override or "default", modality) pages_created = 0 for category, moment_group in groups.items(): - # Build moments text for the LLM - moments_lines = [] + # Collect all tags across the full group (used for DB writes later) all_tags: set[str] = set() - for i, (m, cls_info) in enumerate(moment_group): - tags = cls_info.get("topic_tags", []) - all_tags.update(tags) - moments_lines.append( - f"[{i}] Title: {m.title}\n" - f" Summary: {m.summary}\n" - f" Content type: {m.content_type.value}\n" - f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n" - f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n" - f" Category: {category}\n" - f" Tags: {', '.join(tags) if tags else 'none'}\n" - f" Transcript excerpt: {(m.raw_transcript or '')[:300]}" + for _, cls_info in moment_group: + all_tags.update(cls_info.get("topic_tags", [])) + + # ── Chunked synthesis ──────────────────────────────────── + if len(moment_group) <= chunk_size: + # Small group — single LLM call (original behavior) + result = _synthesize_chunk( + moment_group, category, creator_name, + system_prompt, llm, model_override, modality, hard_limit, + video_id, run_id, f"category:{category}", + ) + synthesized_pages = list(result.pages) + logger.info( + "Stage 5: category '%s' — %d moments, %d page(s) from single call", + category, len(moment_group), len(synthesized_pages), + ) + else: + # Large group — split into chunks, synthesize each, then merge + num_chunks = (len(moment_group) + chunk_size - 1) // chunk_size + logger.info( + "Stage 5: category '%s' has %d moments — splitting into %d chunks of ≤%d", + category, len(moment_group), num_chunks, chunk_size, ) - moments_text = "\n\n".join(moments_lines) - user_prompt = f"{creator_name}\n\n{moments_text}\n" + chunk_pages = [] + for chunk_idx in range(num_chunks): + chunk_start = chunk_idx * chunk_size + chunk_end = min(chunk_start + chunk_size, len(moment_group)) + chunk = moment_group[chunk_start:chunk_end] + chunk_label = f"category:{category} chunk:{chunk_idx + 1}/{num_chunks}" - max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage5_synthesis", hard_limit=hard_limit) - raw = llm.complete(system_prompt, user_prompt, response_model=SynthesisResult, on_complete=_make_llm_callback(video_id, "stage5_synthesis", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id, context_label=category), - modality=modality, model_override=model_override, max_tokens=max_tokens) - result = _safe_parse_llm_response(raw, SynthesisResult, llm, system_prompt, user_prompt, - modality=modality, model_override=model_override, max_tokens=max_tokens) + result = _synthesize_chunk( + chunk, category, creator_name, + system_prompt, llm, model_override, modality, hard_limit, + video_id, run_id, chunk_label, + ) + chunk_pages.extend(result.pages) + logger.info( + "Stage 5: %s produced %d page(s)", + chunk_label, len(result.pages), + ) + # Merge pages with matching slugs across chunks + logger.info( + "Stage 5: category '%s' — %d total pages from %d chunks, checking for merges", + category, len(chunk_pages), num_chunks, + ) + synthesized_pages = _merge_pages_by_slug( + chunk_pages, creator_name, + llm, model_override, modality, hard_limit, + video_id, run_id, + ) + logger.info( + "Stage 5: category '%s' — %d final page(s) after merge", + category, len(synthesized_pages), + ) + + # ── Persist pages to DB ────────────────────────────────── # Load prior pages from this video (snapshot taken before pipeline reset) prior_page_ids = _load_prior_pages(video_id) - # Create/update TechniquePage rows - for page_data in result.pages: + for page_data in synthesized_pages: existing = None - # First: check prior pages from this video by creator + category - if prior_page_ids: + # First: check by slug (most specific match) + if existing is None: + existing = session.execute( + select(TechniquePage).where(TechniquePage.slug == page_data.slug) + ).scalar_one_or_none() + + # Fallback: check prior pages from this video by creator + category + # Use .first() since multiple pages may share a category + if existing is None and prior_page_ids: existing = session.execute( select(TechniquePage).where( TechniquePage.id.in_(prior_page_ids), TechniquePage.creator_id == video.creator_id, - TechniquePage.topic_category == (page_data.topic_category or category), + func.lower(TechniquePage.topic_category) == func.lower(page_data.topic_category or category), ) - ).scalar_one_or_none() + ).scalars().first() if existing: logger.info( "Stage 5: Matched prior page '%s' (id=%s) by creator+category for video_id=%s", existing.slug, existing.id, video_id, ) - # Fallback: check by slug (handles cross-video dedup) - if existing is None: - existing = session.execute( - select(TechniquePage).where(TechniquePage.slug == page_data.slug) - ).scalar_one_or_none() - if existing: # Snapshot existing content before overwriting try: + sq = existing.source_quality + sq_value = sq.value if hasattr(sq, 'value') else sq snapshot = { "title": existing.title, "slug": existing.slug, @@ -913,7 +1146,7 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: "body_sections": existing.body_sections, "signal_chains": existing.signal_chains, "plugins": existing.plugins, - "source_quality": existing.source_quality.value if existing.source_quality else None, + "source_quality": sq_value, } version_count = session.execute( select(func.count()).where( @@ -946,7 +1179,8 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: existing.body_sections = page_data.body_sections existing.signal_chains = page_data.signal_chains existing.plugins = page_data.plugins if page_data.plugins else None - existing.topic_tags = list(all_tags) if all_tags else None + page_tags = _compute_page_tags(page_moment_indices, moment_group, all_tags) + existing.topic_tags = page_tags existing.source_quality = page_data.source_quality page = existing else: @@ -955,7 +1189,7 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: title=page_data.title, slug=page_data.slug, topic_category=page_data.topic_category or category, - topic_tags=list(all_tags) if all_tags else None, + topic_tags=_compute_page_tags(page_moment_indices, moment_group, all_tags), summary=page_data.summary, body_sections=page_data.body_sections, signal_chains=page_data.signal_chains, @@ -967,9 +1201,25 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: pages_created += 1 - # Link moments to the technique page - for m, _ in moment_group: - m.technique_page_id = page.id + # Link moments to the technique page using moment_indices + page_moment_indices = getattr(page_data, "moment_indices", None) or [] + + if page_moment_indices: + # LLM specified which moments belong to this page + for idx in page_moment_indices: + if 0 <= idx < len(moment_group): + moment_group[idx][0].technique_page_id = page.id + elif len(synthesized_pages) == 1: + # Single page — link all moments (safe fallback) + for m, _ in moment_group: + m.technique_page_id = page.id + else: + # Multiple pages but no moment_indices — log warning + logger.warning( + "Stage 5: page '%s' has no moment_indices and is one of %d pages " + "for category '%s'. Moments will not be linked to this page.", + page_data.slug, len(synthesized_pages), category, + ) # Update processing_status video.processing_status = ProcessingStatus.complete diff --git a/prompts/stage5_synthesis.txt b/prompts/stage5_synthesis.txt index 66659ce..44e59d0 100644 --- a/prompts/stage5_synthesis.txt +++ b/prompts/stage5_synthesis.txt @@ -73,7 +73,7 @@ The creator name is provided in a tag. Key moments are provided inside ## Output format -Return a JSON object with a single key "pages" containing a list of synthesized pages. Most inputs produce a single page, but if the moments clearly cover two distinctly separate techniques (e.g., moments about both "kick design" and "hi-hat design" that happen to share a topic_category), split them into separate pages. +Return a JSON object with a single key "pages" containing a list of synthesized pages. Most inputs produce a single page, but if the moments clearly cover two distinctly separate techniques (e.g., moments about both "kick design" and "hi-hat design" that happen to share a topic_category), split them into separate pages. When splitting, you MUST assign each moment to exactly one page via the moment_indices field — every input moment index must appear in exactly one page's moment_indices array. ```json { @@ -100,7 +100,8 @@ Return a JSON object with a single key "pages" containing a list of synthesized } ], "plugins": ["Vital", "Kilohearts Transient Shaper", "FabFilter Pro-Q 3", "iZotope Trash 2"], - "source_quality": "structured" + "source_quality": "structured", + "moment_indices": [0, 1, 2, 3, 4] } ] } @@ -117,6 +118,7 @@ Return a JSON object with a single key "pages" containing a list of synthesized - **signal_chains**: Array of signal chain objects. Each has a "name" (what this chain is for) and "steps" (ordered list of stages with plugin names, settings, and roles). Only include when explicitly demonstrated by the creator. Empty array if not applicable. - **plugins**: Deduplicated array of all plugins, instruments, and specific tools mentioned across the moments. Use " " format consistently (e.g., "FabFilter Pro-Q 3" not "Pro-Q", "Xfer Serum" not just "Serum", "Valhalla VintageVerb" not "Valhalla reverb", "Kilohearts Disperser" not "Disperser"). Always include the manufacturer name for disambiguation. - **source_quality**: One of "structured", "mixed", "unstructured". +- **moment_indices**: Array of integer indices from the input moments list that this page covers. Every moment index must appear in exactly one page. If you produce a single page, include all indices. If you split into multiple pages, partition the indices so each moment is assigned to the page it most closely relates to. This field is required. ## Critical rules