feat: Added personality extraction pipeline: prompt template, 3-tier tr…

- "prompts/personality_extraction.txt"
- "backend/pipeline/stages.py"
- "backend/schemas.py"
- "backend/routers/admin.py"

GSD-Task: S06/T02
This commit is contained in:
jlightner 2026-04-04 08:28:18 +00:00
parent 442d0ca48b
commit 15299232a8
4 changed files with 373 additions and 0 deletions

View file

@ -2592,3 +2592,271 @@ def stage_highlight_detection(self, video_id: str, run_id: str | None = None) ->
raise self.retry(exc=exc)
finally:
session.close()
# ── Personality profile extraction ───────────────────────────────────────────
def _sample_creator_transcripts(
moments: list,
creator_id: str,
max_chars: int = 40000,
) -> tuple[str, int]:
"""Sample transcripts from a creator's key moments, respecting size tiers.
- Small (<20K chars total): use all text.
- Medium (20K-60K): first 300 chars from each moment, up to budget.
- Large (>60K): random sample seeded by creator_id, attempts topic diversity
via Redis classification data.
Returns (sampled_text, total_char_count).
"""
import random
transcripts = [
(m.source_video_id, m.raw_transcript)
for m in moments
if m.raw_transcript and m.raw_transcript.strip()
]
if not transcripts:
return ("", 0)
total_chars = sum(len(t) for _, t in transcripts)
# Small: use everything
if total_chars <= 20_000:
text = "\n\n---\n\n".join(t for _, t in transcripts)
return (text, total_chars)
# Medium: first 300 chars from each moment
if total_chars <= 60_000:
excerpts = []
budget = max_chars
for _, t in transcripts:
chunk = t[:300]
if budget - len(chunk) < 0:
break
excerpts.append(chunk)
budget -= len(chunk)
text = "\n\n---\n\n".join(excerpts)
return (text, total_chars)
# Large: random sample with optional topic diversity from Redis
topic_map: dict[str, list[tuple[str, str]]] = {}
try:
import redis as _redis
settings = get_settings()
r = _redis.from_url(settings.redis_url)
video_ids = {str(vid) for vid, _ in transcripts}
for vid in video_ids:
raw = r.get(f"chrysopedia:classification:{vid}")
if raw:
classification = json.loads(raw)
if isinstance(classification, list):
for item in classification:
cat = item.get("topic_category", "unknown")
moment_id = item.get("moment_id")
if moment_id:
topic_map.setdefault(cat, []).append(moment_id)
r.close()
except Exception:
# Fall back to random sampling without topic diversity
pass
rng = random.Random(creator_id)
if topic_map:
# Interleave from different categories for diversity
ordered = []
cat_lists = list(topic_map.values())
rng.shuffle(cat_lists)
idx = 0
while any(cat_lists):
for cat in cat_lists:
if cat:
ordered.append(cat.pop(0))
cat_lists = [c for c in cat_lists if c]
# Map moment IDs back to transcripts
moment_lookup = {str(m.id): m.raw_transcript for m in moments if m.raw_transcript}
diverse_transcripts = [
moment_lookup[mid] for mid in ordered if mid in moment_lookup
]
if diverse_transcripts:
transcripts_list = diverse_transcripts
else:
transcripts_list = [t for _, t in transcripts]
else:
transcripts_list = [t for _, t in transcripts]
rng.shuffle(transcripts_list)
excerpts = []
budget = max_chars
for t in transcripts_list:
chunk = t[:600]
if budget - len(chunk) < 0:
break
excerpts.append(chunk)
budget -= len(chunk)
text = "\n\n---\n\n".join(excerpts)
return (text, total_chars)
@celery_app.task(bind=True, max_retries=2, default_retry_delay=60)
def extract_personality_profile(self, creator_id: str) -> str:
"""Extract a personality profile from a creator's transcripts via LLM.
Aggregates and samples transcripts from all of the creator's key moments,
sends them to the LLM with the personality_extraction prompt, validates
the response, and stores the profile as JSONB on Creator.personality_profile.
Returns the creator_id for chain compatibility.
"""
from datetime import datetime, timezone
start = time.monotonic()
logger.info("Personality extraction starting for creator_id=%s", creator_id)
_emit_event(creator_id, "personality_extraction", "start")
session = _get_sync_session()
try:
# Load creator
creator = session.execute(
select(Creator).where(Creator.id == creator_id)
).scalar_one_or_none()
if not creator:
logger.error("Creator not found: %s", creator_id)
_emit_event(
creator_id, "personality_extraction", "error",
payload={"error": "creator_not_found"},
)
return creator_id
# Load all key moments with transcripts for this creator
moments = (
session.execute(
select(KeyMoment)
.join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id)
.where(SourceVideo.creator_id == creator.id)
.where(KeyMoment.raw_transcript.isnot(None))
)
.scalars()
.all()
)
if not moments:
logger.warning(
"No transcripts found for creator_id=%s (%s), skipping extraction",
creator_id, creator.name,
)
_emit_event(
creator_id, "personality_extraction", "complete",
payload={"skipped": True, "reason": "no_transcripts"},
)
return creator_id
# Sample transcripts
sampled_text, total_chars = _sample_creator_transcripts(
moments, creator_id,
)
if not sampled_text.strip():
logger.warning(
"Empty transcript sample for creator_id=%s, skipping", creator_id,
)
_emit_event(
creator_id, "personality_extraction", "complete",
payload={"skipped": True, "reason": "empty_sample"},
)
return creator_id
# Load prompt and call LLM
system_prompt = _load_prompt("personality_extraction.txt")
user_prompt = (
f"Creator: {creator.name}\n\n"
f"Transcript excerpts ({len(moments)} moments, {total_chars} total chars, "
f"sample below):\n\n{sampled_text}"
)
llm = _get_llm_client()
callback = _make_llm_callback(
creator_id, "personality_extraction",
system_prompt=system_prompt,
user_prompt=user_prompt,
)
response = llm.complete(
system_prompt=system_prompt,
user_prompt=user_prompt,
response_model=object, # triggers JSON mode
on_complete=callback,
)
# Parse and validate
from schemas import PersonalityProfile as ProfileValidator
try:
raw_profile = json.loads(str(response))
except json.JSONDecodeError as jde:
logger.warning(
"LLM returned invalid JSON for creator_id=%s, retrying: %s",
creator_id, jde,
)
raise self.retry(exc=jde)
try:
validated = ProfileValidator.model_validate(raw_profile)
except ValidationError as ve:
logger.warning(
"LLM profile failed validation for creator_id=%s, retrying: %s",
creator_id, ve,
)
raise self.retry(exc=ve)
# Build final profile dict with metadata
profile_dict = validated.model_dump()
profile_dict["_metadata"] = {
"extracted_at": datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
"transcript_sample_size": total_chars,
"moments_count": len(moments),
"model_used": getattr(response, "finish_reason", None) or "unknown",
}
# Low sample size note
if total_chars < 500:
profile_dict["_metadata"]["low_sample_size"] = True
# Store on creator
creator.personality_profile = profile_dict
session.commit()
elapsed = time.monotonic() - start
_emit_event(
creator_id, "personality_extraction", "complete",
duration_ms=int(elapsed * 1000),
payload={
"moments_count": len(moments),
"transcript_chars": total_chars,
"sample_chars": len(sampled_text),
},
)
logger.info(
"Personality extraction completed for creator_id=%s (%s) in %.1fs — "
"%d moments, %d chars sampled",
creator_id, creator.name, elapsed, len(moments), len(sampled_text),
)
return creator_id
except Exception as exc:
if isinstance(exc, (self.MaxRetriesExceededError,)):
raise
session.rollback()
_emit_event(
creator_id, "personality_extraction", "error",
payload={"error": str(exc)[:500]},
)
logger.error(
"Personality extraction failed for creator_id=%s: %s", creator_id, exc,
)
raise self.retry(exc=exc)
finally:
session.close()

View file

@ -236,3 +236,29 @@ async def get_impersonation_log(
)
for log, admin_name, target_name in rows
]
@router.post("/creators/{slug}/extract-profile")
async def extract_creator_profile(
slug: str,
_admin: Annotated[User, Depends(_require_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Queue personality profile extraction for a creator. Admin only."""
from models import Creator
result = await session.execute(
select(Creator).where(Creator.slug == slug)
)
creator = result.scalar_one_or_none()
if creator is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Creator not found: {slug}",
)
from pipeline.stages import extract_personality_profile
extract_personality_profile.delay(str(creator.id))
logger.info("Queued personality extraction for creator=%s (%s)", slug, creator.id)
return {"status": "queued", "creator_id": str(creator.id)}

View file

@ -732,3 +732,40 @@ class FollowedCreatorItem(BaseModel):
creator_name: str
creator_slug: str
followed_at: datetime
# ── Personality Profile (LLM output validation) ─────────────────────────────
class VocabularyProfile(BaseModel):
signature_phrases: list[str] = []
jargon_level: str = "mixed"
filler_words: list[str] = []
distinctive_terms: list[str] = []
sound_descriptions: list[str] = []
class ToneProfile(BaseModel):
formality: str = "conversational"
energy: str = "moderate"
humor: str = "none"
teaching_style: str = ""
descriptors: list[str] = []
class StyleMarkersProfile(BaseModel):
explanation_approach: str = "step-by-step"
uses_analogies: bool = False
analogy_examples: list[str] = []
sound_words: list[str] = []
self_references: str = ""
audience_engagement: str = ""
pacing: str = "moderate"
class PersonalityProfile(BaseModel):
"""Validates LLM-generated personality profile before storage."""
vocabulary: VocabularyProfile = Field(default_factory=VocabularyProfile)
tone: ToneProfile = Field(default_factory=ToneProfile)
style_markers: StyleMarkersProfile = Field(default_factory=StyleMarkersProfile)
summary: str = ""

View file

@ -0,0 +1,42 @@
You are a music production educator analyst. You will receive transcript excerpts from a single creator's tutorials. Your task is to identify what makes this creator's communication style DISTINCTIVE — not universal traits shared by all educators.
Analyze the transcripts for:
1. **Vocabulary patterns**: Signature phrases they repeat, jargon level (beginner-friendly vs advanced), filler words or verbal tics, distinctive terminology or invented words, how they name sounds or techniques.
2. **Tone**: Formality level, energy (calm/methodical vs enthusiastic/hype), humor style (dry, self-deprecating, none), teaching warmth, use of encouragement or critique.
3. **Style markers**: How they explain concepts (step-by-step vs intuitive/exploratory), use of analogies or metaphors, onomatopoeia or sound words, self-references and personal anecdotes, how they address the audience, pacing and rhythm of explanation.
Focus on what makes THIS creator stand out. Ignore generic traits like "knowledgeable about music production" or "explains things clearly" — those apply to everyone.
You MUST respond with ONLY valid JSON matching this exact structure:
{
"vocabulary": {
"signature_phrases": ["phrase1", "phrase2"],
"jargon_level": "beginner-friendly | intermediate | advanced | mixed",
"filler_words": ["um", "like"],
"distinctive_terms": ["term1", "term2"],
"sound_descriptions": ["how they describe sounds"]
},
"tone": {
"formality": "casual | conversational | professional | academic",
"energy": "calm | moderate | high | variable",
"humor": "none | occasional | frequent | core-style",
"teaching_style": "one short descriptor, e.g. 'encouraging coach' or 'no-nonsense mentor'",
"descriptors": ["adjective1", "adjective2", "adjective3"]
},
"style_markers": {
"explanation_approach": "step-by-step | exploratory | demo-first | theory-then-practice",
"uses_analogies": true,
"analogy_examples": ["example1"],
"sound_words": ["onomatopoeia they use"],
"self_references": "how they reference themselves or their experience",
"audience_engagement": "how they address/involve the viewer",
"pacing": "fast | moderate | slow | variable"
},
"summary": "One paragraph (3-5 sentences) capturing what makes this creator's voice distinctive. Be specific — reference actual phrases or patterns from the transcripts."
}
No markdown code fences, no explanation, no preamble — just the raw JSON object.