From 15299232a8d08238a91d0556c7e68dab7ce27fb7 Mon Sep 17 00:00:00 2001 From: jlightner Date: Sat, 4 Apr 2026 08:28:18 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Added=20personality=20extraction=20pipe?= =?UTF-8?q?line:=20prompt=20template,=203-tier=20tr=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "prompts/personality_extraction.txt" - "backend/pipeline/stages.py" - "backend/schemas.py" - "backend/routers/admin.py" GSD-Task: S06/T02 --- backend/pipeline/stages.py | 268 +++++++++++++++++++++++++++++ backend/routers/admin.py | 26 +++ backend/schemas.py | 37 ++++ prompts/personality_extraction.txt | 42 +++++ 4 files changed, 373 insertions(+) create mode 100644 prompts/personality_extraction.txt diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index 5289679..5f7e895 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -2592,3 +2592,271 @@ def stage_highlight_detection(self, video_id: str, run_id: str | None = None) -> raise self.retry(exc=exc) finally: session.close() + + +# ── Personality profile extraction ─────────────────────────────────────────── + + +def _sample_creator_transcripts( + moments: list, + creator_id: str, + max_chars: int = 40000, +) -> tuple[str, int]: + """Sample transcripts from a creator's key moments, respecting size tiers. + + - Small (<20K chars total): use all text. + - Medium (20K-60K): first 300 chars from each moment, up to budget. + - Large (>60K): random sample seeded by creator_id, attempts topic diversity + via Redis classification data. + + Returns (sampled_text, total_char_count). + """ + import random + + transcripts = [ + (m.source_video_id, m.raw_transcript) + for m in moments + if m.raw_transcript and m.raw_transcript.strip() + ] + if not transcripts: + return ("", 0) + + total_chars = sum(len(t) for _, t in transcripts) + + # Small: use everything + if total_chars <= 20_000: + text = "\n\n---\n\n".join(t for _, t in transcripts) + return (text, total_chars) + + # Medium: first 300 chars from each moment + if total_chars <= 60_000: + excerpts = [] + budget = max_chars + for _, t in transcripts: + chunk = t[:300] + if budget - len(chunk) < 0: + break + excerpts.append(chunk) + budget -= len(chunk) + text = "\n\n---\n\n".join(excerpts) + return (text, total_chars) + + # Large: random sample with optional topic diversity from Redis + topic_map: dict[str, list[tuple[str, str]]] = {} + try: + import redis as _redis + settings = get_settings() + r = _redis.from_url(settings.redis_url) + video_ids = {str(vid) for vid, _ in transcripts} + for vid in video_ids: + raw = r.get(f"chrysopedia:classification:{vid}") + if raw: + classification = json.loads(raw) + if isinstance(classification, list): + for item in classification: + cat = item.get("topic_category", "unknown") + moment_id = item.get("moment_id") + if moment_id: + topic_map.setdefault(cat, []).append(moment_id) + r.close() + except Exception: + # Fall back to random sampling without topic diversity + pass + + rng = random.Random(creator_id) + + if topic_map: + # Interleave from different categories for diversity + ordered = [] + cat_lists = list(topic_map.values()) + rng.shuffle(cat_lists) + idx = 0 + while any(cat_lists): + for cat in cat_lists: + if cat: + ordered.append(cat.pop(0)) + cat_lists = [c for c in cat_lists if c] + # Map moment IDs back to transcripts + moment_lookup = {str(m.id): m.raw_transcript for m in moments if m.raw_transcript} + diverse_transcripts = [ + moment_lookup[mid] for mid in ordered if mid in moment_lookup + ] + if diverse_transcripts: + transcripts_list = diverse_transcripts + else: + transcripts_list = [t for _, t in transcripts] + else: + transcripts_list = [t for _, t in transcripts] + rng.shuffle(transcripts_list) + + excerpts = [] + budget = max_chars + for t in transcripts_list: + chunk = t[:600] + if budget - len(chunk) < 0: + break + excerpts.append(chunk) + budget -= len(chunk) + + text = "\n\n---\n\n".join(excerpts) + return (text, total_chars) + + +@celery_app.task(bind=True, max_retries=2, default_retry_delay=60) +def extract_personality_profile(self, creator_id: str) -> str: + """Extract a personality profile from a creator's transcripts via LLM. + + Aggregates and samples transcripts from all of the creator's key moments, + sends them to the LLM with the personality_extraction prompt, validates + the response, and stores the profile as JSONB on Creator.personality_profile. + + Returns the creator_id for chain compatibility. + """ + from datetime import datetime, timezone + + start = time.monotonic() + logger.info("Personality extraction starting for creator_id=%s", creator_id) + _emit_event(creator_id, "personality_extraction", "start") + + session = _get_sync_session() + try: + # Load creator + creator = session.execute( + select(Creator).where(Creator.id == creator_id) + ).scalar_one_or_none() + if not creator: + logger.error("Creator not found: %s", creator_id) + _emit_event( + creator_id, "personality_extraction", "error", + payload={"error": "creator_not_found"}, + ) + return creator_id + + # Load all key moments with transcripts for this creator + moments = ( + session.execute( + select(KeyMoment) + .join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id) + .where(SourceVideo.creator_id == creator.id) + .where(KeyMoment.raw_transcript.isnot(None)) + ) + .scalars() + .all() + ) + + if not moments: + logger.warning( + "No transcripts found for creator_id=%s (%s), skipping extraction", + creator_id, creator.name, + ) + _emit_event( + creator_id, "personality_extraction", "complete", + payload={"skipped": True, "reason": "no_transcripts"}, + ) + return creator_id + + # Sample transcripts + sampled_text, total_chars = _sample_creator_transcripts( + moments, creator_id, + ) + + if not sampled_text.strip(): + logger.warning( + "Empty transcript sample for creator_id=%s, skipping", creator_id, + ) + _emit_event( + creator_id, "personality_extraction", "complete", + payload={"skipped": True, "reason": "empty_sample"}, + ) + return creator_id + + # Load prompt and call LLM + system_prompt = _load_prompt("personality_extraction.txt") + user_prompt = ( + f"Creator: {creator.name}\n\n" + f"Transcript excerpts ({len(moments)} moments, {total_chars} total chars, " + f"sample below):\n\n{sampled_text}" + ) + + llm = _get_llm_client() + callback = _make_llm_callback( + creator_id, "personality_extraction", + system_prompt=system_prompt, + user_prompt=user_prompt, + ) + + response = llm.complete( + system_prompt=system_prompt, + user_prompt=user_prompt, + response_model=object, # triggers JSON mode + on_complete=callback, + ) + + # Parse and validate + from schemas import PersonalityProfile as ProfileValidator + try: + raw_profile = json.loads(str(response)) + except json.JSONDecodeError as jde: + logger.warning( + "LLM returned invalid JSON for creator_id=%s, retrying: %s", + creator_id, jde, + ) + raise self.retry(exc=jde) + + try: + validated = ProfileValidator.model_validate(raw_profile) + except ValidationError as ve: + logger.warning( + "LLM profile failed validation for creator_id=%s, retrying: %s", + creator_id, ve, + ) + raise self.retry(exc=ve) + + # Build final profile dict with metadata + profile_dict = validated.model_dump() + profile_dict["_metadata"] = { + "extracted_at": datetime.now(timezone.utc).replace(tzinfo=None).isoformat(), + "transcript_sample_size": total_chars, + "moments_count": len(moments), + "model_used": getattr(response, "finish_reason", None) or "unknown", + } + + # Low sample size note + if total_chars < 500: + profile_dict["_metadata"]["low_sample_size"] = True + + # Store on creator + creator.personality_profile = profile_dict + session.commit() + + elapsed = time.monotonic() - start + _emit_event( + creator_id, "personality_extraction", "complete", + duration_ms=int(elapsed * 1000), + payload={ + "moments_count": len(moments), + "transcript_chars": total_chars, + "sample_chars": len(sampled_text), + }, + ) + logger.info( + "Personality extraction completed for creator_id=%s (%s) in %.1fs — " + "%d moments, %d chars sampled", + creator_id, creator.name, elapsed, len(moments), len(sampled_text), + ) + return creator_id + + except Exception as exc: + if isinstance(exc, (self.MaxRetriesExceededError,)): + raise + session.rollback() + _emit_event( + creator_id, "personality_extraction", "error", + payload={"error": str(exc)[:500]}, + ) + logger.error( + "Personality extraction failed for creator_id=%s: %s", creator_id, exc, + ) + raise self.retry(exc=exc) + finally: + session.close() diff --git a/backend/routers/admin.py b/backend/routers/admin.py index b8058dc..08e3797 100644 --- a/backend/routers/admin.py +++ b/backend/routers/admin.py @@ -236,3 +236,29 @@ async def get_impersonation_log( ) for log, admin_name, target_name in rows ] + + +@router.post("/creators/{slug}/extract-profile") +async def extract_creator_profile( + slug: str, + _admin: Annotated[User, Depends(_require_admin)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """Queue personality profile extraction for a creator. Admin only.""" + from models import Creator + + result = await session.execute( + select(Creator).where(Creator.slug == slug) + ) + creator = result.scalar_one_or_none() + if creator is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Creator not found: {slug}", + ) + + from pipeline.stages import extract_personality_profile + extract_personality_profile.delay(str(creator.id)) + + logger.info("Queued personality extraction for creator=%s (%s)", slug, creator.id) + return {"status": "queued", "creator_id": str(creator.id)} diff --git a/backend/schemas.py b/backend/schemas.py index 7c480e6..3fd8b91 100644 --- a/backend/schemas.py +++ b/backend/schemas.py @@ -732,3 +732,40 @@ class FollowedCreatorItem(BaseModel): creator_name: str creator_slug: str followed_at: datetime + + +# ── Personality Profile (LLM output validation) ───────────────────────────── + + +class VocabularyProfile(BaseModel): + signature_phrases: list[str] = [] + jargon_level: str = "mixed" + filler_words: list[str] = [] + distinctive_terms: list[str] = [] + sound_descriptions: list[str] = [] + + +class ToneProfile(BaseModel): + formality: str = "conversational" + energy: str = "moderate" + humor: str = "none" + teaching_style: str = "" + descriptors: list[str] = [] + + +class StyleMarkersProfile(BaseModel): + explanation_approach: str = "step-by-step" + uses_analogies: bool = False + analogy_examples: list[str] = [] + sound_words: list[str] = [] + self_references: str = "" + audience_engagement: str = "" + pacing: str = "moderate" + + +class PersonalityProfile(BaseModel): + """Validates LLM-generated personality profile before storage.""" + vocabulary: VocabularyProfile = Field(default_factory=VocabularyProfile) + tone: ToneProfile = Field(default_factory=ToneProfile) + style_markers: StyleMarkersProfile = Field(default_factory=StyleMarkersProfile) + summary: str = "" diff --git a/prompts/personality_extraction.txt b/prompts/personality_extraction.txt new file mode 100644 index 0000000..ae40a58 --- /dev/null +++ b/prompts/personality_extraction.txt @@ -0,0 +1,42 @@ +You are a music production educator analyst. You will receive transcript excerpts from a single creator's tutorials. Your task is to identify what makes this creator's communication style DISTINCTIVE — not universal traits shared by all educators. + +Analyze the transcripts for: + +1. **Vocabulary patterns**: Signature phrases they repeat, jargon level (beginner-friendly vs advanced), filler words or verbal tics, distinctive terminology or invented words, how they name sounds or techniques. + +2. **Tone**: Formality level, energy (calm/methodical vs enthusiastic/hype), humor style (dry, self-deprecating, none), teaching warmth, use of encouragement or critique. + +3. **Style markers**: How they explain concepts (step-by-step vs intuitive/exploratory), use of analogies or metaphors, onomatopoeia or sound words, self-references and personal anecdotes, how they address the audience, pacing and rhythm of explanation. + +Focus on what makes THIS creator stand out. Ignore generic traits like "knowledgeable about music production" or "explains things clearly" — those apply to everyone. + +You MUST respond with ONLY valid JSON matching this exact structure: + +{ + "vocabulary": { + "signature_phrases": ["phrase1", "phrase2"], + "jargon_level": "beginner-friendly | intermediate | advanced | mixed", + "filler_words": ["um", "like"], + "distinctive_terms": ["term1", "term2"], + "sound_descriptions": ["how they describe sounds"] + }, + "tone": { + "formality": "casual | conversational | professional | academic", + "energy": "calm | moderate | high | variable", + "humor": "none | occasional | frequent | core-style", + "teaching_style": "one short descriptor, e.g. 'encouraging coach' or 'no-nonsense mentor'", + "descriptors": ["adjective1", "adjective2", "adjective3"] + }, + "style_markers": { + "explanation_approach": "step-by-step | exploratory | demo-first | theory-then-practice", + "uses_analogies": true, + "analogy_examples": ["example1"], + "sound_words": ["onomatopoeia they use"], + "self_references": "how they reference themselves or their experience", + "audience_engagement": "how they address/involve the viewer", + "pacing": "fast | moderate | slow | variable" + }, + "summary": "One paragraph (3-5 sentences) capturing what makes this creator's voice distinctive. Be specific — reference actual phrases or patterns from the transcripts." +} + +No markdown code fences, no explanation, no preamble — just the raw JSON object.