feat: Added _build_compose_user_prompt(), _compose_into_existing(), and…

- "backend/pipeline/stages.py"

GSD-Task: S04/T01
This commit is contained in:
jlightner 2026-04-03 01:29:21 +00:00
parent dc18d0a543
commit d709c9edce
10 changed files with 828 additions and 3 deletions

View file

@ -8,7 +8,7 @@ Restructure technique pages to be broader (per-creator+category across videos),
|----|-------|------|---------|------|------------|
| S01 | Synthesis Prompt v5 — Nested Sections + Citations | high | — | ✅ | Run test harness with new prompt → output has list-of-objects body_sections with H2/H3 nesting, citation markers on key claims, broader page scope. |
| S02 | Composition Prompt + Test Harness Compose Mode | high | S01 | ✅ | Run test harness --compose mode with existing page + new moments → merged output with deduplication, new sections, updated citations. |
| S03 | Data Model + Migration | low | — | | Alembic migration runs clean. API response includes body_sections_format and source_videos fields. |
| S03 | Data Model + Migration | low | — | | Alembic migration runs clean. API response includes body_sections_format and source_videos fields. |
| S04 | Pipeline Compose-or-Create Logic | high | S01, S02, S03 | ⬜ | Process two COPYCATT videos. Second video's moments composed into existing page. technique_page_videos has both video IDs. |
| S05 | Frontend — Nested Rendering, TOC, Citations | medium | S03 | ⬜ | Format-2 page renders with TOC, nested sections, clickable citations. Format-1 pages unchanged. |
| S06 | Admin UI — Multi-Source Pipeline Management | medium | S03, S04 | ⬜ | Admin view for multi-source page shows source dropdown, composition history, per-video chunking inspection. |

View file

@ -0,0 +1,93 @@
---
id: S03
parent: M014
milestone: M014
provides:
- body_sections_format column on technique_pages (default 'v1')
- technique_page_videos association table
- SourceVideoSummary schema
- source_videos field on TechniquePageDetail API response
- body_sections accepts list | dict | None
requires:
[]
affects:
- S04
- S05
- S06
key_files:
- alembic/versions/012_multi_source_format.py
- backend/models.py
- backend/schemas.py
- backend/routers/techniques.py
key_decisions:
- Used TIMESTAMP (not WITH TIME ZONE) for added_at to stay consistent with existing schema convention
patterns_established:
- Association table pattern with dual CASCADE FKs and unique constraint for many-to-many with metadata (added_at)
- body_sections_format discriminator column for handling multiple content formats in the same table
observability_surfaces:
- none
drill_down_paths:
- .gsd/milestones/M014/slices/S03/tasks/T01-SUMMARY.md
- .gsd/milestones/M014/slices/S03/tasks/T02-SUMMARY.md
duration: ""
verification_result: passed
completed_at: 2026-04-03T01:20:27.897Z
blocker_discovered: false
---
# S03: Data Model + Migration
**Added body_sections_format column, technique_page_videos association table, and wired both into the API response for multi-source technique pages.**
## What Happened
This slice laid the data foundation for M014's multi-source, nested-section technique pages. Two tasks delivered three changes:
**T01 — Schema + Migration:** Created Alembic migration 012 adding `body_sections_format` (VARCHAR(20), NOT NULL, default 'v1') to technique_pages and a new `technique_page_videos` association table with dual CASCADE foreign keys and a unique constraint on (technique_page_id, source_video_id). Updated SQLAlchemy models with the new `TechniquePageVideo` class and `body_sections_format` column on `TechniquePage`. Widened the Pydantic `body_sections` type from `dict | None` to `list | dict | None` to support both v1 (dict) and v2 (list-of-objects) formats. Added `SourceVideoSummary` schema and `source_videos` field to `TechniquePageDetail`.
**T02 — API Wiring:** Updated `get_technique()` to eagerly load `source_video_links``source_video` via chained `selectinload`. Builds the `source_videos` list from association table rows. Ran migration on ub01 and verified the API returns both new fields with correct defaults (`body_sections_format: "v1"`, `source_videos: []`).
All existing technique pages continue to work unchanged — the v1 default ensures backward compatibility.
## Verification
All slice-level checks passed:
1. `from models import TechniquePageVideo, TechniquePage; assert hasattr(TechniquePage, 'body_sections_format')` → OK
2. `from schemas import SourceVideoSummary, TechniquePageDetail` → OK
3. `alembic upgrade head` on ub01 Docker → clean (migration 012 applied)
4. `curl` to live API for existing technique → `body_sections_format: "v1"` and `source_videos: []` present in response
## Requirements Advanced
None.
## Requirements Validated
None.
## New Requirements Surfaced
None.
## Requirements Invalidated or Re-scoped
None.
## Deviations
None.
## Known Limitations
None.
## Follow-ups
None.
## Files Created/Modified
- `alembic/versions/012_multi_source_format.py` — New migration: body_sections_format column + technique_page_videos table
- `backend/models.py` — Added TechniquePageVideo model, body_sections_format column, source_video_links relationship
- `backend/schemas.py` — Widened body_sections type, added SourceVideoSummary, added source_videos to TechniquePageDetail
- `backend/routers/techniques.py` — Eager-load source_video_links, build source_videos list in technique detail response

View file

@ -0,0 +1,48 @@
# S03: Data Model + Migration — UAT
**Milestone:** M014
**Written:** 2026-04-03T01:20:27.897Z
## UAT: S03 — Data Model + Migration
### Preconditions
- Chrysopedia stack running on ub01 (all containers healthy)
- Migration 012 already applied via `docker exec chrysopedia-api alembic upgrade head`
- At least one technique page exists in the database
### Test 1: Migration Applied Cleanly
1. SSH to ub01: `ssh ub01`
2. Check migration history: `docker exec chrysopedia-api alembic current`
3. **Expected:** Output includes revision for 012_multi_source_format (head)
4. Verify column exists: `docker exec chrysopedia-db psql -U chrysopedia -c "\d technique_pages" | grep body_sections_format`
5. **Expected:** `body_sections_format | character varying(20) | not null | ... | 'v1'`
6. Verify table exists: `docker exec chrysopedia-db psql -U chrysopedia -c "\d technique_page_videos"`
7. **Expected:** Table with columns: id (uuid), technique_page_id (uuid), source_video_id (uuid), added_at (timestamp)
### Test 2: Existing Pages Have v1 Default
1. Query: `docker exec chrysopedia-db psql -U chrysopedia -tAc "SELECT DISTINCT body_sections_format FROM technique_pages"`
2. **Expected:** Only `v1` returned (all existing rows defaulted)
### Test 3: API Response Includes New Fields
1. Get a slug: `SLUG=$(docker exec chrysopedia-db psql -U chrysopedia -tAc "SELECT slug FROM technique_pages LIMIT 1")`
2. Fetch detail: `curl -s http://ub01:8096/api/v1/techniques/$SLUG | python3 -m json.tool`
3. **Expected:** Response contains `"body_sections_format": "v1"` and `"source_videos": []`
### Test 4: Empty source_videos is Array Not Null
1. Same curl as Test 3
2. Parse: `curl -s http://ub01:8096/api/v1/techniques/$SLUG | python3 -c "import sys,json; d=json.load(sys.stdin); assert isinstance(d['source_videos'], list); assert len(d['source_videos']) == 0; print('OK')"`
3. **Expected:** Prints OK (empty array, not null or missing)
### Test 5: Unique Constraint on Association Table
1. Insert a test row: `docker exec chrysopedia-db psql -U chrysopedia -c "INSERT INTO technique_page_videos (id, technique_page_id, source_video_id) SELECT gen_random_uuid(), tp.id, sv.id FROM technique_pages tp, source_videos sv LIMIT 1"`
2. Repeat the same insert
3. **Expected:** Second insert fails with unique constraint violation (uq_page_video)
4. Cleanup: `docker exec chrysopedia-db psql -U chrysopedia -c "DELETE FROM technique_page_videos"`
### Test 6: CASCADE Delete Behavior
1. Note: This is destructive — use only on test data or verify constraint definition instead
2. Verify FK definitions: `docker exec chrysopedia-db psql -U chrysopedia -c "SELECT conname, confdeltype FROM pg_constraint WHERE conrelid = 'technique_page_videos'::regclass AND contype = 'f'"`
3. **Expected:** Both foreign keys show `confdeltype = 'c'` (CASCADE)
### Edge Cases
- **Migration downgrade:** `docker exec chrysopedia-api alembic downgrade -1` should drop technique_page_videos table and body_sections_format column cleanly (run only in test environment)

View file

@ -0,0 +1,16 @@
{
"schemaVersion": 1,
"taskId": "T02",
"unitId": "M014/S03/T02",
"timestamp": 1775179172299,
"passed": true,
"discoverySource": "task-plan",
"checks": [
{
"command": "ssh ub01 'docker exec chrysopedia-api alembic upgrade head'",
"exitCode": 0,
"durationMs": 768,
"verdict": "pass"
}
]
}

View file

@ -1,6 +1,124 @@
# S04: Pipeline Compose-or-Create Logic
**Goal:** Stage 5 uses new prompt (format-2), detects existing pages for compose, tracks video associations.
**Goal:** Stage 5 detects existing technique pages by creator+category and uses the compose prompt to merge new video content into them. All pages get body_sections_format='v2' and technique_page_videos rows tracking contributing videos.
**Demo:** After this: Process two COPYCATT videos. Second video's moments composed into existing page. technique_page_videos has both video IDs.
## Tasks
- [x] **T01: Added _build_compose_user_prompt(), _compose_into_existing(), and compose-or-create branching to stage5_synthesis with body_sections_format='v2' and TechniquePageVideo tracking** — Add two helper functions to stages.py and modify the stage5_synthesis per-category loop to detect existing pages and branch to the compose path.
## Steps
1. Add `TechniquePageVideo` to the imports from `models` at line ~27.
2. Add `_build_compose_user_prompt(existing_page, existing_moments, new_moments, creator_name)` helper function:
- Takes an existing `TechniquePage` ORM object, a list of `KeyMoment` ORM objects (existing), a list of `(KeyMoment, dict)` tuples (new moments with classification), and creator name string
- Serialize existing page to dict matching SynthesizedPage shape: title, slug, topic_category, summary, body_sections, signal_chains, plugins, source_quality
- Format existing moments as `[0]-[N-1]` using `_build_moments_text()` pattern but from plain KeyMoment objects (not tuples with cls_info — existing moments don't have classification data, use empty dict)
- Format new moments as `[N]-[N+M-1]` using `_build_moments_text()` with offset indices applied
- Build XML-tagged user prompt: `<existing_page>`, `<existing_moments>`, `<new_moments>`, `<creator>` tags (same structure as test_harness.py's `build_compose_prompt()`)
- Return the user prompt string
3. Add `_compose_into_existing(existing_page, existing_moments, new_moment_group, category, creator_name, system_prompt, llm, model_override, modality, hard_limit, video_id, run_id)` helper function:
- Load compose system prompt via `_load_prompt('stage5_compose.txt', video_id=video_id)`
- Call `_build_compose_user_prompt()` to build user prompt
- Estimate tokens via `estimate_max_tokens()`
- Call `llm.complete()` with compose system prompt, response_model=SynthesisResult, same callback/param pattern as `_synthesize_chunk()`
- Parse via `_safe_parse_llm_response()` and return SynthesisResult
4. Modify the per-category loop in `stage5_synthesis()` (around line 1200):
- **Before** the existing chunked synthesis block, add compose detection:
```
existing_page = session.execute(
select(TechniquePage).where(
TechniquePage.creator_id == video.creator_id,
func.lower(TechniquePage.topic_category) == func.lower(category),
)
).scalars().first()
```
- If `existing_page` is found, load its linked moments:
```
existing_moments = session.execute(
select(KeyMoment)
.where(KeyMoment.technique_page_id == existing_page.id)
.order_by(KeyMoment.start_time)
).scalars().all()
```
- If existing_page AND existing_moments → compose path: call `_compose_into_existing()`, use result.pages as synthesized_pages
- Log INFO: 'Stage 5: Composing into existing page \'%s\' (%d existing moments + %d new moments)'
- If >1 page matches, log WARNING about multiple matches and proceed with first
- If no existing_page → fall through to existing synthesis block (unchanged)
- Wrap in `else` so existing chunked synthesis only runs when not composing
5. In the persist block (around line 1380), after the `if existing:` / `else:` branch that creates/updates the page:
- Set `page.body_sections_format = 'v2'` on every page (both new and updated)
- Add TechniquePageVideo INSERT:
```python
from sqlalchemy.dialects.postgresql import insert as pg_insert
stmt = pg_insert(TechniquePageVideo.__table__).values(
technique_page_id=page.id,
source_video_id=video.id,
).on_conflict_do_nothing()
session.execute(stmt)
```
## Must-Haves
- [ ] `_build_compose_user_prompt()` produces XML-tagged prompt with correct offset indices
- [ ] `_compose_into_existing()` calls LLM with compose system prompt and returns SynthesisResult
- [ ] Compose-or-create decision queries DB by creator_id + LOWER(topic_category)
- [ ] Existing synthesis path unchanged when no existing page found
- [ ] body_sections_format = 'v2' set on all pages
- [ ] TechniquePageVideo row inserted for every page+video combination
- [ ] Case-insensitive category matching (func.lower)
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| LLM (compose) | _safe_parse_llm_response retries once, then raises (existing retry mechanism) | Celery task retry (max_retries=3) | SynthesisResult validation rejects, retry with feedback |
| DB (existing page query) | Exception propagates to stage-level handler, triggers retry | Same | N/A |
## Negative Tests
- No existing page for creator+category → falls through to standard synthesis (no compose)
- Existing page found but zero linked moments → should still compose (empty existing_moments list)
- Multiple pages match creator+category → uses first, logs warning
- Estimate: 1.5h
- Files: backend/pipeline/stages.py
- Verify: cd /home/aux/projects/content-to-kb-automator && python -c "from pipeline.stages import _build_compose_user_prompt, _compose_into_existing; print('imports OK')" && grep -q 'body_sections_format' backend/pipeline/stages.py && grep -q 'TechniquePageVideo' backend/pipeline/stages.py && grep -q 'stage5_compose' backend/pipeline/stages.py
- [ ] **T02: Write unit tests for compose pipeline logic** — Create test_compose_pipeline.py covering compose prompt construction, compose-or-create branching, TechniquePageVideo insertion, and body_sections_format setting.
## Steps
1. Create `backend/pipeline/test_compose_pipeline.py`.
2. Write test fixtures:
- Mock KeyMoment objects (using simple namedtuples or dataclasses with .title, .summary, .content_type, .start_time, .end_time, .plugins, .raw_transcript, .id, .technique_page_id, .source_video_id)
- Mock TechniquePage object with .id, .title, .slug, .topic_category, .summary, .body_sections, .signal_chains, .plugins, .source_quality, .creator_id
- Use `unittest.mock` for DB session and LLM client
3. Test `_build_compose_user_prompt()`:
- **test_compose_prompt_xml_structure**: verify output contains `<existing_page>`, `<existing_moments>`, `<new_moments>`, `<creator>` tags
- **test_compose_prompt_offset_indices**: with 3 existing moments and 2 new moments, verify existing use [0]-[2] and new use [3]-[4]
- **test_compose_prompt_empty_existing_moments**: 0 existing, N new → new moments start at [0]
- **test_compose_prompt_page_json**: verify existing page serialized as JSON within `<existing_page>` tags
4. Test compose-or-create branching:
- **test_compose_branch_triggered**: mock session.execute to return an existing page + moments for the same creator+category → verify `_compose_into_existing` is called (patch it)
- **test_create_branch_no_existing**: mock session.execute to return None for existing page query → verify `_synthesize_chunk` is called instead
- **test_category_case_insensitive**: verify query uses func.lower for category matching (inspect the query or test with mixed-case input)
5. Test TechniquePageVideo and body_sections_format:
- **test_body_sections_format_v2**: verify pages created by both compose and create paths have body_sections_format='v2'
- **test_technique_page_video_inserted**: verify INSERT with on_conflict_do_nothing is executed after page persist
## Must-Haves
- [ ] At least 4 tests for _build_compose_user_prompt covering XML structure, offset math, empty existing, page JSON
- [ ] At least 2 tests for branching logic (compose triggered vs create fallback)
- [ ] At least 1 test for body_sections_format = 'v2'
- [ ] At least 1 test for TechniquePageVideo insertion
- [ ] All tests pass with `python -m pytest backend/pipeline/test_compose_pipeline.py -v`
- Estimate: 1h
- Files: backend/pipeline/test_compose_pipeline.py
- Verify: cd /home/aux/projects/content-to-kb-automator && python -m pytest backend/pipeline/test_compose_pipeline.py -v

View file

@ -0,0 +1,146 @@
# S04 Research: Pipeline Compose-or-Create Logic
## Summary
This slice modifies stage 5 (synthesis) in `backend/pipeline/stages.py` to:
1. Detect when a technique page already exists for the same creator+category (from a prior video)
2. Use the compose prompt (`stage5_compose.txt`) instead of synthesis prompt when composing
3. Store the v2 `body_sections_format` on newly created/updated pages
4. Populate the `technique_page_videos` association table so pages track all contributing videos
The compose prompt, Pydantic schemas, and test harness are all done (S01, S02). The data model and migration are done (S03). This slice wires them together in the live pipeline.
## Recommendation
Targeted research. The pattern is clear from reading the existing code — the compose-or-create decision is a conditional branch inside the per-category loop in `stage5_synthesis()`. The main risk is getting the LLM call plumbing right (same shape as `_synthesize_chunk` but using compose prompt and different user prompt format). No new technology. No ambiguous requirements.
## Implementation Landscape
### File: `backend/pipeline/stages.py` (2102 lines)
**Current flow (stage5_synthesis, line 1127):**
1. Load video, moments, creator, classification data
2. Group moments by topic_category
3. For each category group: synthesize via `_synthesize_chunk()` (or chunk+merge for large groups)
4. Persist pages: check for existing page by slug or by prior_page_ids (from Redis snapshot), create or update
5. Link moments to pages, set `processing_status = complete`
**What changes:**
- After grouping moments, before calling LLM: query DB for existing `TechniquePage` with same `creator_id` + `topic_category`
- If found → compose path: load existing page's body_sections + moments, build compose prompt via logic similar to `build_compose_prompt()` from test_harness.py, call LLM with `stage5_compose.txt` system prompt, parse result as `SynthesisResult`
- If not found → create path: existing synthesis flow (unchanged)
- After persisting page: INSERT into `technique_page_videos` (upsert pattern due to unique constraint)
- Set `body_sections_format = 'v2'` on all newly created/updated pages (both compose and create paths)
**Key helpers to reuse:**
- `_build_moments_text()` (line 957) — formats moments for prompt, returns (text, tags)
- `_synthesize_chunk()` (line 983) — single-category synthesis LLM call
- `_safe_parse_llm_response()` (line 325) — parse + truncation detection + retry
- `_load_prompt()` (line 244) — loads from prompts/ directory
- `estimate_max_tokens()` — from `pipeline.llm_client`
- `_make_llm_callback()` (line 137) — observability callback
- `_build_request_params()` (line 189) — LLM request config
- `_capture_pipeline_metadata()` (line 884) — for version snapshots
**New import needed:** `TechniquePageVideo` from `models`
### File: `backend/pipeline/test_harness.py`
Contains `build_compose_prompt()` (line 332) which is the reference implementation for building the compose user prompt. The pipeline stage will need its own version that works with real `KeyMoment` ORM objects instead of `MockKeyMoment` test doubles. The structure is identical — XML tags with `<existing_page>`, `<existing_moments>`, `<new_moments>`, `<creator>`.
### Compose prompt inputs needed:
1. **existing_page** — JSON of the existing TechniquePage's SynthesizedPage-compatible dict (title, slug, topic_category, summary, body_sections, signal_chains, plugins, source_quality)
2. **existing_moments** — formatted text of key moments already linked to this page (from previous video(s))
3. **new_moments** — formatted text of moments from the current video, with offset indices starting at N
4. **creator** — creator name
### The compose-or-create decision point:
For each `(creator_id, topic_category)` group in the current video's moments:
```
existing_page = SELECT FROM technique_pages
WHERE creator_id = ? AND LOWER(topic_category) = LOWER(?)
LIMIT 1
```
- If `existing_page` exists → compose path
- If not → create path (current synthesis flow)
The existing code already does slug-based and prior_page_ids-based matching at *persist* time (line ~1320). The compose decision needs to happen *before* the LLM call, not after. This is the key architectural change — the detection moves earlier in the flow.
### TechniquePageVideo population:
After creating or updating a `TechniquePage`, insert a row:
```python
from sqlalchemy.dialects.postgresql import insert as pg_insert
stmt = pg_insert(TechniquePageVideo).values(
technique_page_id=page.id,
source_video_id=video_id,
).on_conflict_do_nothing()
session.execute(stmt)
```
The `on_conflict_do_nothing` handles the unique constraint gracefully for reprocessing scenarios.
### Loading existing moments for compose prompt:
When composing, we need the existing page's linked moments:
```python
existing_moments = session.execute(
select(KeyMoment)
.where(KeyMoment.technique_page_id == existing_page.id)
.order_by(KeyMoment.start_time)
).scalars().all()
```
These get formatted as indices [0]-[N-1]. The new video's moments for this category get indices [N]-[N+M-1].
### body_sections_format tracking:
- New pages created via synthesis → `body_sections_format = 'v2'` (the v5 prompt always outputs v2)
- Pages updated via compose → `body_sections_format = 'v2'` (compose prompt also outputs v2)
- Existing v1 pages NOT processed in this pipeline run → unchanged (backward compatible)
The column already has `default='v1'` and `server_default='v1'` from S03 migration.
## Seams / Task Decomposition
### T01: Compose detection + compose LLM call helper
- Add `_build_compose_user_prompt()` helper (similar to test harness's `build_compose_prompt()` but using real ORM objects)
- Add `_compose_into_existing()` helper that takes an existing page + existing moments + new moments, calls LLM with compose system prompt, returns `SynthesisResult`
- This is the riskiest piece — get it testable before wiring into the main flow
### T02: Wire compose-or-create into stage5_synthesis + TechniquePageVideo
- Modify the per-category loop: before calling `_synthesize_chunk()`, check for existing page
- If found and has moments → call `_compose_into_existing()`
- If not → existing synthesis path
- After persisting page: insert `TechniquePageVideo` row
- Set `body_sections_format = 'v2'` on all pages touched
- Add `TechniquePageVideo` to imports
### T03: Integration test / verification
- Test with mock LLM to verify compose path is triggered correctly
- Verify TechniquePageVideo rows are created
- Verify body_sections_format is set
- The roadmap demo says "Process two COPYCATT videos" — this is an end-to-end verification on ub01, not something we can unit test here
## Constraints
1. **Sync SQLAlchemy only** — stages.py uses sync sessions (Celery is sync). No async.
2. **stage5_merge.txt doesn't exist**`_merge_pages_by_slug` references it but the file is missing. This is pre-existing; don't fix it in this slice.
3. **Redis classification data** — stage 4 stores classification in Redis with 24h TTL. If the second video is processed >24h after the first, classification data for the first video may be gone. The compose flow loads existing *moments from the DB*, not classification data — so this isn't a blocker.
4. **Existing page detection must be case-insensitive** — KNOWLEDGE.md notes "LLM-generated topic categories have inconsistent casing". Use `func.lower()` for category matching.
5. **The compose prompt** (`stage5_compose.txt`) is self-contained — no runtime import from synthesis prompt needed (per S02 decision).
6. **Existing _load_prior_pages Redis snapshot** — currently used for reprocessing the *same* video. The compose flow is for a *different* video contributing to the same page. These are separate mechanisms — compose uses DB query, reprocess uses Redis snapshot. Don't conflate them.
## Risks
1. **Compose LLM output quality** — First time the compose prompt runs against real LLM. May need prompt tuning. Mitigated by: the prompt was carefully designed in S02, and `_safe_parse_llm_response` handles parse failures with retry.
2. **Multiple pages per category** — If a creator has 2+ pages in the same category (from chunked synthesis), the compose detection query returns only one. The current code handles this with `LIMIT 1` / `.first()`. Worth logging when >1 exists but proceeding with the first match.
3. **Moment count growth** — Composing many moments into a single page may exceed context limits. The existing truncation recovery (split-in-half retry) doesn't apply to compose since we can't split the existing page. Mitigated by: compose prompts are typically shorter than synthesis (existing page is already summarized).
## Verification Strategy
1. **Unit tests** for `_build_compose_user_prompt()` — XML structure, citation offset math (same pattern as test_harness_compose.py tests)
2. **Unit test** for compose-or-create branching logic — mock the DB query result
3. **Integration check** on ub01: process a video, verify page created with format v2 and TechniquePageVideo row. Then process a second video for same creator — verify compose path triggered, page updated, second TechniquePageVideo row added.

View file

@ -0,0 +1,103 @@
---
estimated_steps: 67
estimated_files: 1
skills_used: []
---
# T01: Add compose helpers and wire compose-or-create logic into stage5_synthesis
Add two helper functions to stages.py and modify the stage5_synthesis per-category loop to detect existing pages and branch to the compose path.
## Steps
1. Add `TechniquePageVideo` to the imports from `models` at line ~27.
2. Add `_build_compose_user_prompt(existing_page, existing_moments, new_moments, creator_name)` helper function:
- Takes an existing `TechniquePage` ORM object, a list of `KeyMoment` ORM objects (existing), a list of `(KeyMoment, dict)` tuples (new moments with classification), and creator name string
- Serialize existing page to dict matching SynthesizedPage shape: title, slug, topic_category, summary, body_sections, signal_chains, plugins, source_quality
- Format existing moments as `[0]-[N-1]` using `_build_moments_text()` pattern but from plain KeyMoment objects (not tuples with cls_info — existing moments don't have classification data, use empty dict)
- Format new moments as `[N]-[N+M-1]` using `_build_moments_text()` with offset indices applied
- Build XML-tagged user prompt: `<existing_page>`, `<existing_moments>`, `<new_moments>`, `<creator>` tags (same structure as test_harness.py's `build_compose_prompt()`)
- Return the user prompt string
3. Add `_compose_into_existing(existing_page, existing_moments, new_moment_group, category, creator_name, system_prompt, llm, model_override, modality, hard_limit, video_id, run_id)` helper function:
- Load compose system prompt via `_load_prompt('stage5_compose.txt', video_id=video_id)`
- Call `_build_compose_user_prompt()` to build user prompt
- Estimate tokens via `estimate_max_tokens()`
- Call `llm.complete()` with compose system prompt, response_model=SynthesisResult, same callback/param pattern as `_synthesize_chunk()`
- Parse via `_safe_parse_llm_response()` and return SynthesisResult
4. Modify the per-category loop in `stage5_synthesis()` (around line 1200):
- **Before** the existing chunked synthesis block, add compose detection:
```
existing_page = session.execute(
select(TechniquePage).where(
TechniquePage.creator_id == video.creator_id,
func.lower(TechniquePage.topic_category) == func.lower(category),
)
).scalars().first()
```
- If `existing_page` is found, load its linked moments:
```
existing_moments = session.execute(
select(KeyMoment)
.where(KeyMoment.technique_page_id == existing_page.id)
.order_by(KeyMoment.start_time)
).scalars().all()
```
- If existing_page AND existing_moments → compose path: call `_compose_into_existing()`, use result.pages as synthesized_pages
- Log INFO: 'Stage 5: Composing into existing page \'%s\' (%d existing moments + %d new moments)'
- If >1 page matches, log WARNING about multiple matches and proceed with first
- If no existing_page → fall through to existing synthesis block (unchanged)
- Wrap in `else` so existing chunked synthesis only runs when not composing
5. In the persist block (around line 1380), after the `if existing:` / `else:` branch that creates/updates the page:
- Set `page.body_sections_format = 'v2'` on every page (both new and updated)
- Add TechniquePageVideo INSERT:
```python
from sqlalchemy.dialects.postgresql import insert as pg_insert
stmt = pg_insert(TechniquePageVideo.__table__).values(
technique_page_id=page.id,
source_video_id=video.id,
).on_conflict_do_nothing()
session.execute(stmt)
```
## Must-Haves
- [ ] `_build_compose_user_prompt()` produces XML-tagged prompt with correct offset indices
- [ ] `_compose_into_existing()` calls LLM with compose system prompt and returns SynthesisResult
- [ ] Compose-or-create decision queries DB by creator_id + LOWER(topic_category)
- [ ] Existing synthesis path unchanged when no existing page found
- [ ] body_sections_format = 'v2' set on all pages
- [ ] TechniquePageVideo row inserted for every page+video combination
- [ ] Case-insensitive category matching (func.lower)
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| LLM (compose) | _safe_parse_llm_response retries once, then raises (existing retry mechanism) | Celery task retry (max_retries=3) | SynthesisResult validation rejects, retry with feedback |
| DB (existing page query) | Exception propagates to stage-level handler, triggers retry | Same | N/A |
## Negative Tests
- No existing page for creator+category → falls through to standard synthesis (no compose)
- Existing page found but zero linked moments → should still compose (empty existing_moments list)
- Multiple pages match creator+category → uses first, logs warning
## Inputs
- ``backend/pipeline/stages.py` — existing stage5_synthesis function to modify`
- ``backend/pipeline/test_harness.py` — reference implementation of build_compose_prompt() for prompt structure`
- ``backend/models.py` — TechniquePageVideo model, body_sections_format column`
- ``prompts/stage5_compose.txt` — compose system prompt loaded by _compose_into_existing()`
- ``backend/pipeline/schemas.py` — SynthesisResult schema for LLM response parsing`
## Expected Output
- ``backend/pipeline/stages.py` — modified with _build_compose_user_prompt(), _compose_into_existing(), compose-or-create branch, TechniquePageVideo INSERT, body_sections_format='v2'`
## Verification
cd /home/aux/projects/content-to-kb-automator && python -c "from pipeline.stages import _build_compose_user_prompt, _compose_into_existing; print('imports OK')" && grep -q 'body_sections_format' backend/pipeline/stages.py && grep -q 'TechniquePageVideo' backend/pipeline/stages.py && grep -q 'stage5_compose' backend/pipeline/stages.py

View file

@ -0,0 +1,78 @@
---
id: T01
parent: S04
milestone: M014
provides: []
requires: []
affects: []
key_files: ["backend/pipeline/stages.py"]
key_decisions: ["Compose detection queries all matching pages and warns on multiple matches, uses first", "pg_insert with on_conflict_do_nothing for idempotent TechniquePageVideo inserts"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "PYTHONPATH=backend python import of _build_compose_user_prompt and _compose_into_existing succeeded. grep confirmed body_sections_format, TechniquePageVideo, and stage5_compose strings present in stages.py."
completed_at: 2026-04-03T01:29:17.901Z
blocker_discovered: false
---
# T01: Added _build_compose_user_prompt(), _compose_into_existing(), and compose-or-create branching to stage5_synthesis with body_sections_format='v2' and TechniquePageVideo tracking
> Added _build_compose_user_prompt(), _compose_into_existing(), and compose-or-create branching to stage5_synthesis with body_sections_format='v2' and TechniquePageVideo tracking
## What Happened
---
id: T01
parent: S04
milestone: M014
key_files:
- backend/pipeline/stages.py
key_decisions:
- Compose detection queries all matching pages and warns on multiple matches, uses first
- pg_insert with on_conflict_do_nothing for idempotent TechniquePageVideo inserts
duration: ""
verification_result: passed
completed_at: 2026-04-03T01:29:17.902Z
blocker_discovered: false
---
# T01: Added _build_compose_user_prompt(), _compose_into_existing(), and compose-or-create branching to stage5_synthesis with body_sections_format='v2' and TechniquePageVideo tracking
**Added _build_compose_user_prompt(), _compose_into_existing(), and compose-or-create branching to stage5_synthesis with body_sections_format='v2' and TechniquePageVideo tracking**
## What Happened
Added two compose helper functions and wired compose-or-create detection into the stage5_synthesis per-category loop. When an existing technique page matches by creator_id + LOWER(topic_category), the compose path calls the LLM with stage5_compose.txt instead of standard synthesis. All pages now get body_sections_format='v2' and TechniquePageVideo rows tracking contributing videos via idempotent pg_insert with on_conflict_do_nothing.
## Verification
PYTHONPATH=backend python import of _build_compose_user_prompt and _compose_into_existing succeeded. grep confirmed body_sections_format, TechniquePageVideo, and stage5_compose strings present in stages.py.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `PYTHONPATH=backend python -c "from pipeline.stages import _build_compose_user_prompt, _compose_into_existing; print('imports OK')"` | 0 | ✅ pass | 2000ms |
| 2 | `grep -q 'body_sections_format' backend/pipeline/stages.py` | 0 | ✅ pass | 100ms |
| 3 | `grep -q 'TechniquePageVideo' backend/pipeline/stages.py` | 0 | ✅ pass | 100ms |
| 4 | `grep -q 'stage5_compose' backend/pipeline/stages.py` | 0 | ✅ pass | 100ms |
## Deviations
Used default=str in json.dumps() for page serialization to handle UUID/datetime fields — not in plan but necessary for robustness.
## Known Issues
None.
## Files Created/Modified
- `backend/pipeline/stages.py`
## Deviations
Used default=str in json.dumps() for page serialization to handle UUID/datetime fields — not in plan but necessary for robustness.
## Known Issues
None.

View file

@ -0,0 +1,55 @@
---
estimated_steps: 25
estimated_files: 1
skills_used: []
---
# T02: Write unit tests for compose pipeline logic
Create test_compose_pipeline.py covering compose prompt construction, compose-or-create branching, TechniquePageVideo insertion, and body_sections_format setting.
## Steps
1. Create `backend/pipeline/test_compose_pipeline.py`.
2. Write test fixtures:
- Mock KeyMoment objects (using simple namedtuples or dataclasses with .title, .summary, .content_type, .start_time, .end_time, .plugins, .raw_transcript, .id, .technique_page_id, .source_video_id)
- Mock TechniquePage object with .id, .title, .slug, .topic_category, .summary, .body_sections, .signal_chains, .plugins, .source_quality, .creator_id
- Use `unittest.mock` for DB session and LLM client
3. Test `_build_compose_user_prompt()`:
- **test_compose_prompt_xml_structure**: verify output contains `<existing_page>`, `<existing_moments>`, `<new_moments>`, `<creator>` tags
- **test_compose_prompt_offset_indices**: with 3 existing moments and 2 new moments, verify existing use [0]-[2] and new use [3]-[4]
- **test_compose_prompt_empty_existing_moments**: 0 existing, N new → new moments start at [0]
- **test_compose_prompt_page_json**: verify existing page serialized as JSON within `<existing_page>` tags
4. Test compose-or-create branching:
- **test_compose_branch_triggered**: mock session.execute to return an existing page + moments for the same creator+category → verify `_compose_into_existing` is called (patch it)
- **test_create_branch_no_existing**: mock session.execute to return None for existing page query → verify `_synthesize_chunk` is called instead
- **test_category_case_insensitive**: verify query uses func.lower for category matching (inspect the query or test with mixed-case input)
5. Test TechniquePageVideo and body_sections_format:
- **test_body_sections_format_v2**: verify pages created by both compose and create paths have body_sections_format='v2'
- **test_technique_page_video_inserted**: verify INSERT with on_conflict_do_nothing is executed after page persist
## Must-Haves
- [ ] At least 4 tests for _build_compose_user_prompt covering XML structure, offset math, empty existing, page JSON
- [ ] At least 2 tests for branching logic (compose triggered vs create fallback)
- [ ] At least 1 test for body_sections_format = 'v2'
- [ ] At least 1 test for TechniquePageVideo insertion
- [ ] All tests pass with `python -m pytest backend/pipeline/test_compose_pipeline.py -v`
## Inputs
- ``backend/pipeline/stages.py` — modified by T01 with compose helpers and wiring`
- ``backend/pipeline/test_harness_compose.py` — reference for compose prompt test patterns`
- ``backend/models.py` — TechniquePageVideo model shape for mock construction`
## Expected Output
- ``backend/pipeline/test_compose_pipeline.py` — unit tests for compose prompt construction, branching, TechniquePageVideo, body_sections_format`
## Verification
cd /home/aux/projects/content-to-kb-automator && python -m pytest backend/pipeline/test_compose_pipeline.py -v

View file

@ -24,6 +24,8 @@ from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker
from config import get_settings
from sqlalchemy.dialects.postgresql import insert as pg_insert
from models import (
Creator,
KeyMoment,
@ -33,6 +35,7 @@ from models import (
SourceVideo,
TechniquePage,
TechniquePageVersion,
TechniquePageVideo,
TranscriptSegment,
)
from pipeline.embedding_client import EmbeddingClient
@ -980,6 +983,117 @@ def _build_moments_text(
return "\n\n".join(moments_lines), all_tags
def _build_compose_user_prompt(
existing_page: TechniquePage,
existing_moments: list[KeyMoment],
new_moments: list[tuple[KeyMoment, dict]],
creator_name: str,
) -> str:
"""Build the user prompt for composing new moments into an existing page.
Existing moments keep indices [0]-[N-1].
New moments get indices [N]-[N+M-1].
XML-tagged prompt structure matches test_harness.py build_compose_prompt().
"""
category = existing_page.topic_category or "Uncategorized"
# Serialize existing page to dict matching SynthesizedPage shape
sq = existing_page.source_quality
sq_value = sq.value if hasattr(sq, "value") else sq
page_dict = {
"title": existing_page.title,
"slug": existing_page.slug,
"topic_category": existing_page.topic_category,
"summary": existing_page.summary,
"body_sections": existing_page.body_sections,
"signal_chains": existing_page.signal_chains,
"plugins": existing_page.plugins,
"source_quality": sq_value,
}
# Format existing moments [0]-[N-1] using _build_moments_text pattern
# Existing moments don't have classification data — use empty dict
existing_as_tuples = [(m, {}) for m in existing_moments]
existing_text, _ = _build_moments_text(existing_as_tuples, category)
# Format new moments [N]-[N+M-1] with offset indices
n = len(existing_moments)
new_lines = []
for i, (m, cls_info) in enumerate(new_moments):
tags = cls_info.get("topic_tags", [])
new_lines.append(
f"[{n + i}] Title: {m.title}\n"
f" Summary: {m.summary}\n"
f" Content type: {m.content_type.value}\n"
f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
f" Category: {category}\n"
f" Tags: {', '.join(tags) if tags else 'none'}\n"
f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
)
new_text = "\n\n".join(new_lines)
page_json = json.dumps(page_dict, indent=2, ensure_ascii=False, default=str)
return (
f"<existing_page>\n{page_json}\n</existing_page>\n"
f"<existing_moments>\n{existing_text}\n</existing_moments>\n"
f"<new_moments>\n{new_text}\n</new_moments>\n"
f"<creator>{creator_name}</creator>"
)
def _compose_into_existing(
existing_page: TechniquePage,
existing_moments: list[KeyMoment],
new_moment_group: list[tuple[KeyMoment, dict]],
category: str,
creator_name: str,
system_prompt: str,
llm: LLMClient,
model_override: str | None,
modality: str,
hard_limit: int,
video_id: str,
run_id: str | None,
) -> SynthesisResult:
"""Compose new moments into an existing technique page via LLM.
Loads the compose system prompt, builds the compose user prompt, and
calls the LLM with the same retry/parse pattern as _synthesize_chunk().
"""
compose_prompt = _load_prompt("stage5_compose.txt", video_id=video_id)
user_prompt = _build_compose_user_prompt(
existing_page, existing_moments, new_moment_group, creator_name,
)
estimated_input = estimate_max_tokens(
compose_prompt, user_prompt,
stage="stage5_synthesis", hard_limit=hard_limit,
)
logger.info(
"Stage 5: Composing into '%s'%d existing + %d new moments, max_tokens=%d",
existing_page.slug, len(existing_moments), len(new_moment_group), estimated_input,
)
raw = llm.complete(
compose_prompt, user_prompt, response_model=SynthesisResult,
on_complete=_make_llm_callback(
video_id, "stage5_synthesis",
system_prompt=compose_prompt, user_prompt=user_prompt,
run_id=run_id, context_label=f"compose:{category}",
request_params=_build_request_params(
estimated_input, model_override, modality, "SynthesisResult", hard_limit,
),
),
modality=modality, model_override=model_override, max_tokens=estimated_input,
)
return _safe_parse_llm_response(
raw, SynthesisResult, llm, compose_prompt, user_prompt,
modality=modality, model_override=model_override, max_tokens=estimated_input,
)
def _synthesize_chunk(
chunk: list[tuple[KeyMoment, dict]],
category: str,
@ -1198,8 +1312,52 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
for _, cls_info in moment_group:
all_tags.update(cls_info.get("topic_tags", []))
# ── Compose-or-create detection ────────────────────────
# Check if an existing technique page already covers this
# creator + category combination (from a prior video run).
compose_matches = session.execute(
select(TechniquePage).where(
TechniquePage.creator_id == video.creator_id,
func.lower(TechniquePage.topic_category) == func.lower(category),
)
).scalars().all()
if len(compose_matches) > 1:
logger.warning(
"Stage 5: Multiple existing pages (%d) match creator=%s category='%s'. "
"Using first match '%s'.",
len(compose_matches), video.creator_id, category,
compose_matches[0].slug,
)
compose_target = compose_matches[0] if compose_matches else None
if compose_target is not None:
# Load existing moments linked to this page
existing_moments = session.execute(
select(KeyMoment)
.where(KeyMoment.technique_page_id == compose_target.id)
.order_by(KeyMoment.start_time)
).scalars().all()
logger.info(
"Stage 5: Composing into existing page '%s' "
"(%d existing moments + %d new moments)",
compose_target.slug,
len(existing_moments),
len(moment_group),
)
compose_result = _compose_into_existing(
compose_target, existing_moments, moment_group,
category, creator_name, system_prompt,
llm, model_override, modality, hard_limit,
video_id, run_id,
)
synthesized_pages = list(compose_result.pages)
# ── Chunked synthesis with truncation recovery ─────────
if len(moment_group) <= chunk_size:
elif len(moment_group) <= chunk_size:
# Small group — try single LLM call first
try:
result = _synthesize_chunk(
@ -1379,6 +1537,16 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
pages_created += 1
# Set body_sections_format on every page (new or updated)
page.body_sections_format = "v2"
# Track contributing video via TechniquePageVideo
stmt = pg_insert(TechniquePageVideo.__table__).values(
technique_page_id=page.id,
source_video_id=video.id,
).on_conflict_do_nothing()
session.execute(stmt)
# Link moments to the technique page using moment_indices
if page_moment_indices: