feat: Built backend/watcher.py with PollingObserver-based folder watchi…

- "backend/watcher.py"
- "backend/requirements.txt"

GSD-Task: S03/T01
This commit is contained in:
jlightner 2026-03-30 19:17:47 +00:00
parent 07eaa7f309
commit 5e408dff5a
11 changed files with 971 additions and 2 deletions

View file

@ -7,7 +7,7 @@ Make the pipeline fully transparent — every LLM call's full prompt and respons
| ID | Slice | Risk | Depends | Done | After this |
|----|-------|------|---------|------|------------|
| S01 | Pipeline Debug Mode — Full LLM I/O Capture and Token Accounting | medium | — | ✅ | Toggle debug mode on for a video, trigger pipeline, see full prompt + response stored for every LLM call with per-stage token breakdown |
| S02 | Debug Payload Viewer — Inline View, Copy, and Export in Admin UI | low | S01 | | Admin clicks an LLM call event, sees full prompt and response in a readable viewer, copies to clipboard, or downloads as JSON |
| S02 | Debug Payload Viewer — Inline View, Copy, and Export in Admin UI | low | S01 | | Admin clicks an LLM call event, sees full prompt and response in a readable viewer, copies to clipboard, or downloads as JSON |
| S03 | Transcript Folder Watcher — Auto-Ingest Service | medium | — | ⬜ | Drop a transcript JSON into the watch folder on ub01 → file is detected, validated, POSTed to /ingest, pipeline triggers automatically |
| S04 | Admin UX Audit — Prune, Streamline, and Polish | low | S02 | ⬜ | Admin pipeline page is cleaner and more efficient for daily content management. Dead UI elements removed. Workflow improvements visible. |
| S05 | Key Moment Card Text Overflow Fix | low | — | ⬜ | Key moment cards with long filenames and timestamp ranges display cleanly — text truncated with ellipsis or wrapped within card bounds, no horizontal bleed |

View file

@ -0,0 +1,79 @@
---
id: S02
parent: M007
milestone: M007
provides:
- DebugPayloadViewer component for viewing LLM I/O in admin UI
- Per-section clipboard copy for prompt/response text
- JSON export of debug payloads
requires:
- slice: S01
provides: PipelineEvent model with debug text fields populated by debug mode pipeline runs
affects:
- S04
key_files:
- frontend/src/pages/AdminPipeline.tsx
- frontend/src/api/public-client.ts
- frontend/src/App.css
key_decisions:
- Used clipboard API with execCommand fallback for non-HTTPS admin tool context
- DebugPayloadViewer renders only when at least one debug text field is non-null
- Field names use system_prompt_text/user_prompt_text/response_text matching the backend PipelineEvent model
patterns_established:
- BEM-style component CSS with debug-viewer__ prefix following existing JsonViewer pattern
- Conditional component rendering gated on non-null payload fields
observability_surfaces:
- Debug viewer surfaces full LLM prompts and responses inline — this IS the observability tool for pipeline prompt debugging
drill_down_paths:
- .gsd/milestones/M007/slices/S02/tasks/T01-SUMMARY.md
duration: ""
verification_result: passed
completed_at: 2026-03-30T19:10:25.752Z
blocker_discovered: false
---
# S02: Debug Payload Viewer — Inline View, Copy, and Export in Admin UI
**Added DebugPayloadViewer component to the admin pipeline page — LLM call events now show collapsible System Prompt / User Prompt / Response sections with per-section clipboard copy and full JSON export.**
## What Happened
This slice delivered the frontend viewer for LLM debug payloads captured by S01's debug mode infrastructure. A single task (T01) added three optional string fields to the PipelineEvent TypeScript interface (`system_prompt_text`, `user_prompt_text`, `response_text`), built the `DebugPayloadViewer` component with independently collapsible sections, per-section copy-to-clipboard (with `execCommand` fallback for non-HTTPS contexts), and a JSON export button that downloads all debug fields as a formatted file. The component renders inline on every `llm_call` event row in the pipeline event log, gated on at least one debug field being non-null. CSS follows the existing JsonViewer BEM pattern with `var(--color-*)` custom properties per D017. Changes were applied directly on ub01 (canonical dev directory) and deployed via Docker Compose rebuild. Browser verification confirmed all UI elements render correctly with real pipeline data.
## Verification
Container chrysopedia-web-8096 healthy and serving HTTP 200. DebugPayloadViewer component present in deployed JS bundle (grep confirmed). Browser verification: `.debug-viewer` selector visible, "LLM Debug" label rendered, collapsible section headers (System Prompt / User Prompt / Response) present in DOM with Copy buttons, JSON export button visible. Expanded System Prompt section displays full prompt text in pre-formatted block. Component renders on all 3 llm_call events in the expanded video's event list.
## Requirements Advanced
None.
## Requirements Validated
None.
## New Requirements Surfaced
None.
## Requirements Invalidated or Re-scoped
None.
## Deviations
Changes applied directly on ub01 rather than pushing through git — diverged branches prevented git pull. Service name in plan verification command was `chrysopedia-web-8096` but Docker Compose build target is `chrysopedia-web` (the `-8096` is only the container name). Unicode escape issue in export button required a second build pass.
## Known Limitations
Local dev01 copy is out of sync with ub01 — all changes live only on ub01. The plan verification command in S02-PLAN.md has shell quoting issues (unterminated quotes in the SSH command) that would always fail as written.
## Follow-ups
None.
## Files Created/Modified
- `frontend/src/pages/AdminPipeline.tsx` — Added DebugPayloadViewer component (collapsible sections, copy, export) and wired into llm_call event rows
- `frontend/src/api/public-client.ts` — Added system_prompt_text, user_prompt_text, response_text fields to PipelineEvent interface
- `frontend/src/App.css` — Added ~100 lines of debug-viewer CSS using var(--color-*) custom properties

View file

@ -0,0 +1,62 @@
# S02: Debug Payload Viewer — Inline View, Copy, and Export in Admin UI — UAT
**Milestone:** M007
**Written:** 2026-03-30T19:10:25.752Z
# S02 UAT: Debug Payload Viewer
## Preconditions
- Chrysopedia running on ub01:8096 with chrysopedia-web-8096 container healthy
- At least one video has been processed with debug mode enabled (S01) so pipeline events contain non-null `system_prompt_text`, `user_prompt_text`, and `response_text` fields
## Test Cases
### TC1: Debug viewer appears on LLM call events
1. Navigate to http://ub01:8096/admin/pipeline
2. Click any video card that shows events (e.g. one with "extracted" status and >10 events)
3. Look for events with the 🤖 emoji and `llm_call` event type
4. **Expected:** Each llm_call event shows a "LLM DEBUG" header with a "↓ JSON" export button
5. **Expected:** Below the header, three collapsible sections: "System Prompt", "User Prompt", "Response" — each with a "Copy" button
### TC2: Debug viewer does NOT appear on non-LLM events
1. In the same expanded video event list, look at `complete`, `start`, or other non-llm_call events
2. **Expected:** No "LLM DEBUG" section appears on these events
### TC3: Collapsible sections expand and collapse
1. Click the "▸ System Prompt" toggle on any llm_call event
2. **Expected:** Section expands showing the full system prompt text in a pre-formatted block with dark background
3. Click the "▾ System Prompt" toggle again
4. **Expected:** Section collapses, hiding the content
5. Repeat for "User Prompt" and "Response" sections
6. **Expected:** Each section expands/collapses independently — expanding one does not affect others
### TC4: Copy to clipboard
1. Expand the "System Prompt" section on any llm_call event
2. Click the "Copy" button next to "System Prompt"
3. Paste into a text editor
4. **Expected:** The full system prompt text is in the clipboard, matching what's displayed in the viewer
### TC5: JSON export download
1. Click the "↓ JSON" button in the debug viewer header
2. **Expected:** Browser downloads a JSON file
3. Open the downloaded file
4. **Expected:** File contains a JSON object with keys `system_prompt`, `user_prompt`, and `response`, each containing the corresponding text (or null if the field was empty)
### TC6: Empty debug fields handling
1. If a video was processed WITHOUT debug mode, expand its events
2. Find an llm_call event
3. **Expected:** Either no debug viewer appears, or the viewer shows with empty/null sections — no crash or rendering error
### Edge Cases
### TC7: Very long prompts
1. Find an llm_call event where the system prompt is very long (stage 3 extraction prompts are typically 2000+ characters)
2. Expand the System Prompt section
3. **Expected:** Text renders in a scrollable pre-formatted block without breaking the page layout
4. **Expected:** The section does not push other events off-screen permanently — collapsing it restores normal layout
### TC8: Multiple viewers on same page
1. Expand a video with multiple llm_call events (3+)
2. **Expected:** Each event has its own independent debug viewer
3. Expand System Prompt on the first event and User Prompt on the second
4. **Expected:** Both sections stay expanded independently — they don't interfere with each other

View file

@ -0,0 +1,42 @@
{
"schemaVersion": 1,
"taskId": "T01",
"unitId": "M007/S02/T01",
"timestamp": 1774897643823,
"passed": false,
"discoverySource": "task-plan",
"checks": [
{
"command": "ssh ub01 \"cd /vmPool/r/repos/xpltdco/chrysopedia",
"exitCode": 2,
"durationMs": 5,
"verdict": "fail"
},
{
"command": "git pull",
"exitCode": 128,
"durationMs": 8,
"verdict": "fail"
},
{
"command": "docker compose build chrysopedia-web-8096",
"exitCode": 1,
"durationMs": 63,
"verdict": "fail"
},
{
"command": "docker compose up -d chrysopedia-web-8096\"",
"exitCode": 2,
"durationMs": 4,
"verdict": "fail"
},
{
"command": "sleep 5",
"exitCode": 0,
"durationMs": 5006,
"verdict": "pass"
}
],
"retryAttempt": 1,
"maxRetries": 2
}

View file

@ -1,6 +1,143 @@
# S03: Transcript Folder Watcher — Auto-Ingest Service
**Goal:** New Python service that watches a configured directory for new .json files, validates them against the Chrysopedia transcript schema, and POSTs them to the ingest endpoint. Runs as a Docker Compose service.
**Goal:** A folder-watching service monitors a directory for new transcript JSON files, validates them, and POSTs them to the /api/v1/ingest endpoint, replacing manual curl/upload for getting transcripts into the pipeline.
**Demo:** After this: Drop a transcript JSON into the watch folder on ub01 → file is detected, validated, POSTed to /ingest, pipeline triggers automatically
## Tasks
- [x] **T01: Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars** — Create the core watcher script that monitors a folder for new transcript JSON files and POSTs them to the ingest API.
**Slice:** S03 — Transcript Folder Watcher — Auto-Ingest Service
**Milestone:** M007
## Description
Build `backend/watcher.py` — a standalone Python script using the `watchdog` library to monitor a configurable folder for new `.json` files. When a file is detected:
1. Wait for file stability (size unchanged for 2 seconds) to handle SCP/rsync partial writes
2. Read and validate JSON structure (must contain `source_file`, `creator_folder`, `duration_seconds`, `segments`)
3. POST the file as multipart upload to the ingest API endpoint using httpx
4. Move successfully processed files to `processed/` subfolder
5. Move failed files to `failed/` subfolder with a `.error` sidecar file containing the error details
The script uses `PollingObserver` (not inotify Observer) for reliability on ZFS/network-mounted filesystems. It watches recursively.
Add `watchdog` to `backend/requirements.txt`.
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| Ingest API | Move file to `failed/` with HTTP status + body in `.error` sidecar | httpx timeout (30s default) → same as error | Log warning, move to `failed/` |
| Filesystem | Log exception, skip file (watchdog continues) | N/A | N/A |
## Negative Tests
- **Malformed inputs**: Invalid JSON (parse error) → `failed/` with error sidecar. Missing required keys → `failed/` with validation error. Non-JSON files → ignored (only `.json` suffix triggers processing). Empty file → `failed/`.
- **Error paths**: API returns 4xx/5xx → `failed/` with response body. API unreachable → `failed/` with connection error. File deleted between detection and read → log warning, skip.
- **Boundary conditions**: File still being written (size changing) → wait until stable. Same filename in `processed/` already exists → overwrite (idempotent).
## Steps
1. Add `watchdog>=4.0,<5.0` to `backend/requirements.txt`
2. Create `backend/watcher.py` with:
- Configuration from environment variables: `WATCH_FOLDER` (default `/watch`), `WATCHER_API_URL` (default `http://chrysopedia-api:8000/api/v1/ingest`), `WATCHER_STABILITY_SECONDS` (default `2`), `WATCHER_POLL_INTERVAL` (default `5`)
- `TranscriptHandler(FileSystemEventHandler)` class that handles `on_created` events for `.json` files
- `wait_for_stability(path, seconds)` function — polls file size every 0.5s, returns True when size is unchanged for `seconds` duration, returns False if file disappears
- `validate_transcript(data)` function — checks for required keys (`source_file`, `creator_folder`, `duration_seconds`, `segments`)
- `post_to_ingest(filepath)` function — reads file, validates JSON, POSTs as multipart upload via httpx, returns (success, detail)
- `move_to_processed(filepath)` / `move_to_failed(filepath, error)` functions — create subdirs as needed, move file, write `.error` sidecar for failures
- Main block: create `processed/` and `failed/` subdirs, start `PollingObserver` with recursive=True, run until KeyboardInterrupt
- Structured logging with `logging.basicConfig` at INFO level, logger name `watcher`
3. Verify `python -m py_compile backend/watcher.py` passes
## Must-Haves
- [x] watchdog added to requirements.txt
- [x] watcher.py handles file stability check
- [x] watcher.py validates JSON keys before POST
- [x] watcher.py POSTs via httpx multipart upload
- [x] Processed files move to processed/ subfolder
- [x] Failed files move to failed/ with .error sidecar
- [x] Structured logging with watcher.* logger
- [x] PollingObserver used (not inotify) for ZFS reliability
## Verification
- `python -m py_compile backend/watcher.py` exits 0
- `grep -q 'watchdog' backend/requirements.txt` exits 0
- `grep -q 'PollingObserver' backend/watcher.py` exits 0
- `grep -q 'processed' backend/watcher.py && grep -q 'failed' backend/watcher.py` exits 0
## Observability Impact
- Signals added: `watcher` logger with structured messages for pickup, stability wait, validation, POST result, and file moves
- How a future agent inspects this: `docker logs chrysopedia-watcher` shows all watcher activity
- Failure state exposed: `.error` sidecar files in `failed/` directory with HTTP status, response body, or exception traceback
- Estimate: 45m
- Files: backend/watcher.py, backend/requirements.txt
- Verify: python -m py_compile backend/watcher.py && grep -q 'watchdog' backend/requirements.txt && grep -q 'PollingObserver' backend/watcher.py
- [ ] **T02: Docker Compose integration and end-to-end verification on ub01** — Add the chrysopedia-watcher service to docker-compose.yml, create the watch folder on ub01, build and deploy, and verify end-to-end file-drop ingestion.
**Slice:** S03 — Transcript Folder Watcher — Auto-Ingest Service
**Milestone:** M007
## Description
Wire the watcher script into the Docker Compose stack as a new service. The watcher reuses the existing `Dockerfile.api` image (same Python + deps) but runs `python watcher.py` instead of uvicorn. The watch folder on ub01 is `/vmPool/r/services/chrysopedia_watch/` bind-mounted to `/watch` in the container.
This task deploys to ub01 and verifies end-to-end: drop a transcript JSON → watcher picks it up → POSTs to API → video record created.
**Important context for executor:** The canonical codebase lives on ub01 at `/vmPool/r/repos/xpltdco/chrysopedia`. You must SSH to ub01 to edit docker-compose.yml, build, and test. The watcher.py from T01 needs to be committed and available in the repo on ub01.
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| Docker build | Fix Dockerfile or requirements, rebuild | N/A | N/A |
| API healthcheck | Watcher waits (depends_on condition: service_healthy) | Start timeout → container restarts | N/A |
## Steps
1. SSH to ub01 and add the `chrysopedia-watcher` service to `docker-compose.yml`:
- `build: context: . dockerfile: docker/Dockerfile.api` (reuse existing image)
- `container_name: chrysopedia-watcher`
- `restart: unless-stopped`
- `command: ["python", "watcher.py"]`
- `environment: WATCHER_API_URL: http://chrysopedia-api:8000/api/v1/ingest, WATCH_FOLDER: /watch`
- `volumes: /vmPool/r/services/chrysopedia_watch:/watch`
- `depends_on: chrysopedia-api: condition: service_healthy`
- `networks: chrysopedia`
- `healthcheck: test: ["CMD-SHELL", "pgrep -f watcher.py || exit 1"] interval: 30s timeout: 5s retries: 3`
2. Create the watch folder on ub01: `mkdir -p /vmPool/r/services/chrysopedia_watch`
3. Ensure T01's `backend/watcher.py` and updated `backend/requirements.txt` are committed and pushed to the repo on ub01 (git pull if needed)
4. Build and deploy: `cd /vmPool/r/repos/xpltdco/chrysopedia && docker compose build chrysopedia-watcher && docker compose up -d chrysopedia-watcher`
5. Verify watcher is running: `docker exec chrysopedia-watcher pgrep -f watcher.py`
6. End-to-end test: Copy a known-good transcript JSON to `/vmPool/r/services/chrysopedia_watch/test_ingest.json` → check watcher logs for pickup and 200 → verify file moved to `processed/`
7. Error test: Create an invalid JSON file in the watch folder → verify it moves to `failed/` with `.error` sidecar
## Must-Haves
- [x] chrysopedia-watcher service defined in docker-compose.yml
- [x] Watch folder bind mount configured
- [x] Watcher container starts and stays healthy
- [x] End-to-end file-drop ingestion verified
- [x] Failed file handling verified
## Verification
- `ssh ub01 "docker ps --filter name=chrysopedia-watcher --format '{{.Status}}'"` shows healthy/running
- `ssh ub01 "docker logs chrysopedia-watcher 2>&1 | tail -5"` shows watcher startup message
- `ssh ub01 "ls /vmPool/r/services/chrysopedia_watch/processed/"` contains the test file
- `ssh ub01 "ls /vmPool/r/services/chrysopedia_watch/failed/"` contains the invalid test file + `.error` sidecar
## Inputs
- `backend/watcher.py` — watcher script from T01
- `backend/requirements.txt` — updated with watchdog dependency from T01
- `docker-compose.yml` — existing compose file to modify
## Expected Output
- `docker-compose.yml` — updated with chrysopedia-watcher service definition
- Estimate: 30m
- Files: docker-compose.yml
- Verify: ssh ub01 "docker ps --filter name=chrysopedia-watcher --format '{{.Status}}'" | grep -qi 'healthy\|up'

View file

@ -0,0 +1,130 @@
# S03 Research — Transcript Folder Watcher — Auto-Ingest Service
## Summary
Add a folder-watching service that monitors a directory for new transcript JSON files, validates them, and POSTs them to the existing `/api/v1/ingest` endpoint. This replaces manual curl/upload for getting transcripts into the pipeline.
The work is straightforward — filesystem monitoring + HTTP POST using well-understood Python libraries already in the project. The ingest endpoint is mature (6 integration tests, handles idempotent re-upload, auto-dispatches pipeline). The watcher just needs to be a reliable bridge from "file appears on disk" to "POST /api/v1/ingest".
## Recommendation
**watchdog** library + **httpx** (already a dependency) in a lightweight Python script, deployed as a new Docker Compose service with the watch folder bind-mounted. Polling fallback via `PollingObserver` for reliability on network-mounted filesystems.
## Implementation Landscape
### Current Ingest Flow
- `POST /api/v1/ingest` accepts multipart file upload (`UploadFile`)
- Validates JSON structure (requires `source_file`, `creator_folder`, `duration_seconds`, `segments`)
- Creates/updates Creator, upserts SourceVideo, bulk-inserts TranscriptSegments
- Saves raw JSON to `/data/transcripts/{creator_folder}/`
- Dispatches Celery pipeline via `run_pipeline.delay(str(video.id))`
- Returns `TranscriptIngestResponse` with `video_id`, `processing_status`, etc.
- Already handles idempotent re-upload (content hash matching, duplicate detection)
### Transcript JSON Format
Files produced by `whisper/transcribe.py` on hal0022:
```json
{
"source_file": "filename.mp4",
"creator_folder": "CreatorName",
"duration_seconds": 2547,
"segments": [{"start": 0.0, "end": 4.52, "text": "...", "words": [...]}]
}
```
### Deployment Topology
- All services run on **ub01** via Docker Compose (`xpltd_chrysopedia` project)
- Data volume: `/vmPool/r/services/chrysopedia_data` → mounted at `/data` in API/worker containers
- Existing transcripts live at `/vmPool/r/services/chrysopedia_data/transcripts/{CreatorName}/`
- The watcher needs a **separate watch folder** (not the transcripts store) — the ingest endpoint copies files there itself
- Proposed watch folder: `/vmPool/r/services/chrysopedia_watch/` → mounted at `/watch` in the watcher container
- API is reachable from other containers at `http://chrysopedia-api:8000` on the `chrysopedia` Docker network
### Key Design Decisions
**Watch folder ≠ transcript storage.** The ingest endpoint already saves transcripts to `/data/transcripts/`. The watch folder is an inbox — files land there, get POSTed, then get moved to a `processed/` or `failed/` subfolder. This prevents double-processing and gives visibility into what's been handled.
**File stability check.** When watchdog fires `on_created`, the file may still be being written (especially over network mounts or SCP). The watcher should wait until the file size stops changing (poll twice with a 1-2s delay) before attempting to read and POST it.
**Recursive vs flat watching.** Watch recursively — the whisper script outputs to `{output_dir}/{video_stem}.json` but users might organize into creator subfolders. The `creator_folder` field inside the JSON is authoritative, not the filesystem path.
**Error handling.** Failed files move to `failed/` with a `.error` sidecar file containing the HTTP response or exception. This avoids infinite retry loops and gives the admin a clear queue of problems to fix.
### Files to Create/Modify
| File | Action | Purpose |
|------|--------|---------|
| `backend/watcher.py` | **Create** | Main watcher script — watchdog Observer + httpx POST logic |
| `docker/Dockerfile.watcher` | **Create** | Lightweight image (could reuse Dockerfile.api or be minimal) |
| `docker-compose.yml` | **Modify** | Add `chrysopedia-watcher` service with watch folder bind mount |
| `backend/config.py` | **Modify** | Add `watch_folder_path` and `watcher_api_url` settings |
### Dependencies
- **watchdog** — needs adding to `requirements.txt` (not currently a dependency)
- **httpx** — already in `requirements.txt`, used for HTTP client
- **json** / **os** / **shutil** / **time** / **logging** — stdlib only
### Watcher Service Architecture
```
chrysopedia-watcher container:
┌──────────────────────────────────┐
│ watchdog Observer │
│ monitors /watch/ recursively │
│ │
│ on_created *.json: │
│ 1. Wait for file stability │
│ 2. Read & validate JSON keys │
│ 3. POST to chrysopedia-api │
│ 4. Move to processed/ or │
│ failed/ with sidecar │
└──────────────────────────────────┘
```
### Docker Compose Service
```yaml
chrysopedia-watcher:
build:
context: .
dockerfile: docker/Dockerfile.api # reuse same image
container_name: chrysopedia-watcher
restart: unless-stopped
command: ["python", "watcher.py"]
environment:
WATCHER_API_URL: http://chrysopedia-api:8000/api/v1/ingest
WATCH_FOLDER: /watch
volumes:
- /vmPool/r/services/chrysopedia_watch:/watch
depends_on:
chrysopedia-api:
condition: service_healthy
networks:
- chrysopedia
healthcheck:
test: ["CMD-SHELL", "pgrep -f watcher.py || exit 1"]
interval: 30s
timeout: 5s
retries: 3
```
### Natural Task Seams
1. **T01: Create watcher.py** — the core script with watchdog Observer, file stability check, JSON validation, httpx multipart POST, processed/failed file management, structured logging. This is ~80% of the work.
2. **T02: Docker integration** — Dockerfile (or reuse), compose service definition, config settings, host folder creation. Verify end-to-end: drop file → watcher detects → POSTs to API → pipeline triggers.
### Verification Strategy
- **Unit-level:** Drop a valid transcript JSON into the watch folder on ub01 → confirm watcher logs pickup, API returns 200, video appears in DB with `transcribed` status, pipeline dispatches.
- **Error case:** Drop an invalid JSON → confirm it moves to `failed/` with error sidecar, watcher continues running.
- **Stability:** Watcher survives API container restart (retry logic or health-check-gated startup).
- **Idempotency:** Drop same file twice → second copy moves to processed (API handles idempotent re-upload).
### Risks
**Low risk overall.** The ingest endpoint is battle-tested. The watcher is a thin bridge. Main concern is file stability detection on the bind mount — watchdog's inotify works well on local Linux filesystems, which is what `/vmPool/r/services/` is (ZFS on ub01). If files arrive via SCP or rsync, partial writes could trigger premature pickup — the stability check (size-stable for 2s) handles this.
### Skills
No specialized skills needed. Standard Python, Docker Compose, watchdog library. No frontend work.

View file

@ -0,0 +1,89 @@
---
estimated_steps: 51
estimated_files: 2
skills_used: []
---
# T01: Create watcher.py — watchdog observer with file stability, validation, and POST logic
Create the core watcher script that monitors a folder for new transcript JSON files and POSTs them to the ingest API.
**Slice:** S03 — Transcript Folder Watcher — Auto-Ingest Service
**Milestone:** M007
## Description
Build `backend/watcher.py` — a standalone Python script using the `watchdog` library to monitor a configurable folder for new `.json` files. When a file is detected:
1. Wait for file stability (size unchanged for 2 seconds) to handle SCP/rsync partial writes
2. Read and validate JSON structure (must contain `source_file`, `creator_folder`, `duration_seconds`, `segments`)
3. POST the file as multipart upload to the ingest API endpoint using httpx
4. Move successfully processed files to `processed/` subfolder
5. Move failed files to `failed/` subfolder with a `.error` sidecar file containing the error details
The script uses `PollingObserver` (not inotify Observer) for reliability on ZFS/network-mounted filesystems. It watches recursively.
Add `watchdog` to `backend/requirements.txt`.
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| Ingest API | Move file to `failed/` with HTTP status + body in `.error` sidecar | httpx timeout (30s default) → same as error | Log warning, move to `failed/` |
| Filesystem | Log exception, skip file (watchdog continues) | N/A | N/A |
## Negative Tests
- **Malformed inputs**: Invalid JSON (parse error) → `failed/` with error sidecar. Missing required keys → `failed/` with validation error. Non-JSON files → ignored (only `.json` suffix triggers processing). Empty file → `failed/`.
- **Error paths**: API returns 4xx/5xx → `failed/` with response body. API unreachable → `failed/` with connection error. File deleted between detection and read → log warning, skip.
- **Boundary conditions**: File still being written (size changing) → wait until stable. Same filename in `processed/` already exists → overwrite (idempotent).
## Steps
1. Add `watchdog>=4.0,<5.0` to `backend/requirements.txt`
2. Create `backend/watcher.py` with:
- Configuration from environment variables: `WATCH_FOLDER` (default `/watch`), `WATCHER_API_URL` (default `http://chrysopedia-api:8000/api/v1/ingest`), `WATCHER_STABILITY_SECONDS` (default `2`), `WATCHER_POLL_INTERVAL` (default `5`)
- `TranscriptHandler(FileSystemEventHandler)` class that handles `on_created` events for `.json` files
- `wait_for_stability(path, seconds)` function — polls file size every 0.5s, returns True when size is unchanged for `seconds` duration, returns False if file disappears
- `validate_transcript(data)` function — checks for required keys (`source_file`, `creator_folder`, `duration_seconds`, `segments`)
- `post_to_ingest(filepath)` function — reads file, validates JSON, POSTs as multipart upload via httpx, returns (success, detail)
- `move_to_processed(filepath)` / `move_to_failed(filepath, error)` functions — create subdirs as needed, move file, write `.error` sidecar for failures
- Main block: create `processed/` and `failed/` subdirs, start `PollingObserver` with recursive=True, run until KeyboardInterrupt
- Structured logging with `logging.basicConfig` at INFO level, logger name `watcher`
3. Verify `python -m py_compile backend/watcher.py` passes
## Must-Haves
- [x] watchdog added to requirements.txt
- [x] watcher.py handles file stability check
- [x] watcher.py validates JSON keys before POST
- [x] watcher.py POSTs via httpx multipart upload
- [x] Processed files move to processed/ subfolder
- [x] Failed files move to failed/ with .error sidecar
- [x] Structured logging with watcher.* logger
- [x] PollingObserver used (not inotify) for ZFS reliability
## Verification
- `python -m py_compile backend/watcher.py` exits 0
- `grep -q 'watchdog' backend/requirements.txt` exits 0
- `grep -q 'PollingObserver' backend/watcher.py` exits 0
- `grep -q 'processed' backend/watcher.py && grep -q 'failed' backend/watcher.py` exits 0
## Observability Impact
- Signals added: `watcher` logger with structured messages for pickup, stability wait, validation, POST result, and file moves
- How a future agent inspects this: `docker logs chrysopedia-watcher` shows all watcher activity
- Failure state exposed: `.error` sidecar files in `failed/` directory with HTTP status, response body, or exception traceback
## Inputs
- `backend/requirements.txt`
## Expected Output
- `backend/watcher.py`
- `backend/requirements.txt`
## Verification
python -m py_compile backend/watcher.py && grep -q 'watchdog' backend/requirements.txt && grep -q 'PollingObserver' backend/watcher.py

View file

@ -0,0 +1,81 @@
---
id: T01
parent: S03
milestone: M007
provides: []
requires: []
affects: []
key_files: ["backend/watcher.py", "backend/requirements.txt"]
key_decisions: ["Used httpx sync client since watcher runs in synchronous watchdog callback threads", "Capped error sidecar response body at 500 chars", "Ignore files in processed/ and failed/ subdirs to prevent re-processing loops"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "All four slice-level verification checks pass: py_compile exits 0, watchdog present in requirements.txt, PollingObserver present in watcher.py, processed and failed strings present in watcher.py."
completed_at: 2026-03-30T19:17:44.067Z
blocker_discovered: false
---
# T01: Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars
> Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars
## What Happened
---
id: T01
parent: S03
milestone: M007
key_files:
- backend/watcher.py
- backend/requirements.txt
key_decisions:
- Used httpx sync client since watcher runs in synchronous watchdog callback threads
- Capped error sidecar response body at 500 chars
- Ignore files in processed/ and failed/ subdirs to prevent re-processing loops
duration: ""
verification_result: passed
completed_at: 2026-03-30T19:17:44.068Z
blocker_discovered: false
---
# T01: Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars
**Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars**
## What Happened
Created backend/watcher.py as a standalone Python script using watchdog's PollingObserver for ZFS/NFS reliability. Monitors WATCH_FOLDER for new .json files, waits for file size stability (SCP/rsync handling), validates JSON structure against required keys, POSTs as multipart upload via httpx to the ingest API. Successfully processed files move to processed/, failures move to failed/ with .error sidecar files. Also created backend/requirements.txt synced from ub01 production with watchdog added.
## Verification
All four slice-level verification checks pass: py_compile exits 0, watchdog present in requirements.txt, PollingObserver present in watcher.py, processed and failed strings present in watcher.py.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `python -m py_compile backend/watcher.py` | 0 | ✅ pass | 500ms |
| 2 | `grep -q 'watchdog' backend/requirements.txt` | 0 | ✅ pass | 50ms |
| 3 | `grep -q 'PollingObserver' backend/watcher.py` | 0 | ✅ pass | 50ms |
| 4 | `grep -q 'processed' backend/watcher.py && grep -q 'failed' backend/watcher.py` | 0 | ✅ pass | 50ms |
## Deviations
None.
## Known Issues
None.
## Files Created/Modified
- `backend/watcher.py`
- `backend/requirements.txt`
## Deviations
None.
## Known Issues
None.

View file

@ -0,0 +1,85 @@
---
estimated_steps: 46
estimated_files: 1
skills_used: []
---
# T02: Docker Compose integration and end-to-end verification on ub01
Add the chrysopedia-watcher service to docker-compose.yml, create the watch folder on ub01, build and deploy, and verify end-to-end file-drop ingestion.
**Slice:** S03 — Transcript Folder Watcher — Auto-Ingest Service
**Milestone:** M007
## Description
Wire the watcher script into the Docker Compose stack as a new service. The watcher reuses the existing `Dockerfile.api` image (same Python + deps) but runs `python watcher.py` instead of uvicorn. The watch folder on ub01 is `/vmPool/r/services/chrysopedia_watch/` bind-mounted to `/watch` in the container.
This task deploys to ub01 and verifies end-to-end: drop a transcript JSON → watcher picks it up → POSTs to API → video record created.
**Important context for executor:** The canonical codebase lives on ub01 at `/vmPool/r/repos/xpltdco/chrysopedia`. You must SSH to ub01 to edit docker-compose.yml, build, and test. The watcher.py from T01 needs to be committed and available in the repo on ub01.
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| Docker build | Fix Dockerfile or requirements, rebuild | N/A | N/A |
| API healthcheck | Watcher waits (depends_on condition: service_healthy) | Start timeout → container restarts | N/A |
## Steps
1. SSH to ub01 and add the `chrysopedia-watcher` service to `docker-compose.yml`:
- `build: context: . dockerfile: docker/Dockerfile.api` (reuse existing image)
- `container_name: chrysopedia-watcher`
- `restart: unless-stopped`
- `command: ["python", "watcher.py"]`
- `environment: WATCHER_API_URL: http://chrysopedia-api:8000/api/v1/ingest, WATCH_FOLDER: /watch`
- `volumes: /vmPool/r/services/chrysopedia_watch:/watch`
- `depends_on: chrysopedia-api: condition: service_healthy`
- `networks: chrysopedia`
- `healthcheck: test: ["CMD-SHELL", "pgrep -f watcher.py || exit 1"] interval: 30s timeout: 5s retries: 3`
2. Create the watch folder on ub01: `mkdir -p /vmPool/r/services/chrysopedia_watch`
3. Ensure T01's `backend/watcher.py` and updated `backend/requirements.txt` are committed and pushed to the repo on ub01 (git pull if needed)
4. Build and deploy: `cd /vmPool/r/repos/xpltdco/chrysopedia && docker compose build chrysopedia-watcher && docker compose up -d chrysopedia-watcher`
5. Verify watcher is running: `docker exec chrysopedia-watcher pgrep -f watcher.py`
6. End-to-end test: Copy a known-good transcript JSON to `/vmPool/r/services/chrysopedia_watch/test_ingest.json` → check watcher logs for pickup and 200 → verify file moved to `processed/`
7. Error test: Create an invalid JSON file in the watch folder → verify it moves to `failed/` with `.error` sidecar
## Must-Haves
- [x] chrysopedia-watcher service defined in docker-compose.yml
- [x] Watch folder bind mount configured
- [x] Watcher container starts and stays healthy
- [x] End-to-end file-drop ingestion verified
- [x] Failed file handling verified
## Verification
- `ssh ub01 "docker ps --filter name=chrysopedia-watcher --format '{{.Status}}'"` shows healthy/running
- `ssh ub01 "docker logs chrysopedia-watcher 2>&1 | tail -5"` shows watcher startup message
- `ssh ub01 "ls /vmPool/r/services/chrysopedia_watch/processed/"` contains the test file
- `ssh ub01 "ls /vmPool/r/services/chrysopedia_watch/failed/"` contains the invalid test file + `.error` sidecar
## Inputs
- `backend/watcher.py` — watcher script from T01
- `backend/requirements.txt` — updated with watchdog dependency from T01
- `docker-compose.yml` — existing compose file to modify
## Expected Output
- `docker-compose.yml` — updated with chrysopedia-watcher service definition
## Inputs
- `backend/watcher.py`
- `backend/requirements.txt`
- `docker-compose.yml`
## Expected Output
- `docker-compose.yml`
## Verification
ssh ub01 "docker ps --filter name=chrysopedia-watcher --format '{{.Status}}'" | grep -qi 'healthy\|up'

20
backend/requirements.txt Normal file
View file

@ -0,0 +1,20 @@
fastapi>=0.115.0,<1.0
uvicorn[standard]>=0.32.0,<1.0
sqlalchemy[asyncio]>=2.0,<3.0
asyncpg>=0.30.0,<1.0
alembic>=1.14.0,<2.0
pydantic>=2.0,<3.0
pydantic-settings>=2.0,<3.0
celery[redis]>=5.4.0,<6.0
redis>=5.0,<6.0
python-dotenv>=1.0,<2.0
python-multipart>=0.0.9,<1.0
httpx>=0.27.0,<1.0
openai>=1.0,<2.0
qdrant-client>=1.9,<2.0
pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0
watchdog>=4.0,<5.0
# Test dependencies
pytest>=8.0,<10.0
pytest-asyncio>=0.24,<1.0

244
backend/watcher.py Normal file
View file

@ -0,0 +1,244 @@
"""Transcript folder watcher — monitors a directory for new JSON files and POSTs them to the ingest API.
Designed to run as a standalone service (or Docker container) that watches for Whisper
transcript JSON files dropped via SCP/rsync and automatically ingests them into Chrysopedia.
Uses PollingObserver (not inotify) for reliability on ZFS and network-mounted filesystems.
Environment variables:
WATCH_FOLDER directory to monitor (default: /watch)
WATCHER_API_URL ingest endpoint URL (default: http://chrysopedia-api:8000/api/v1/ingest)
WATCHER_STABILITY_SECONDS seconds file size must be stable before processing (default: 2)
WATCHER_POLL_INTERVAL seconds between filesystem polls (default: 5)
"""
import json
import logging
import os
import shutil
import sys
import time
from pathlib import Path
import httpx
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
# ── Configuration ────────────────────────────────────────────────────────────
WATCH_FOLDER = os.environ.get("WATCH_FOLDER", "/watch")
API_URL = os.environ.get("WATCHER_API_URL", "http://chrysopedia-api:8000/api/v1/ingest")
STABILITY_SECONDS = float(os.environ.get("WATCHER_STABILITY_SECONDS", "2"))
POLL_INTERVAL = float(os.environ.get("WATCHER_POLL_INTERVAL", "5"))
REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"}
logger = logging.getLogger("watcher")
# ── Helpers ──────────────────────────────────────────────────────────────────
def wait_for_stability(path: str, seconds: float = STABILITY_SECONDS) -> bool:
"""Wait until a file's size is unchanged for *seconds* consecutive seconds.
Returns True when stable, False if the file disappears during the wait.
Polls every 0.5 s.
"""
prev_size = -1
stable_since: float | None = None
while True:
try:
current_size = os.path.getsize(path)
except OSError:
logger.warning("File disappeared during stability wait: %s", path)
return False
if current_size == prev_size:
if stable_since is None:
stable_since = time.monotonic()
elif time.monotonic() - stable_since >= seconds:
return True
else:
prev_size = current_size
stable_since = None
time.sleep(0.5)
def validate_transcript(data: dict) -> str | None:
"""Validate that *data* has the required transcript keys.
Returns None on success, or a human-readable error string on failure.
"""
if not isinstance(data, dict):
return "Top-level JSON value is not an object"
missing = REQUIRED_KEYS - data.keys()
if missing:
return f"Missing required keys: {', '.join(sorted(missing))}"
if not isinstance(data.get("segments"), list):
return "'segments' must be an array"
return None
def post_to_ingest(filepath: str) -> tuple[bool, str]:
"""Read *filepath*, validate its JSON, and POST it to the ingest API.
Returns (success: bool, detail: str).
"""
# Read and parse
try:
raw_bytes = Path(filepath).read_bytes()
except OSError as exc:
return False, f"File read error: {exc}"
if not raw_bytes:
return False, "File is empty"
try:
data = json.loads(raw_bytes)
except json.JSONDecodeError as exc:
return False, f"JSON parse error: {exc}"
# Validate structure
error = validate_transcript(data)
if error:
return False, f"Validation error: {error}"
# POST as multipart upload (field name: "file")
filename = os.path.basename(filepath)
try:
with httpx.Client(timeout=30.0) as client:
response = client.post(
API_URL,
files={"file": (filename, raw_bytes, "application/json")},
)
except httpx.TimeoutException:
return False, f"HTTP timeout after 30s contacting {API_URL}"
except httpx.ConnectError as exc:
return False, f"Connection error: {exc}"
except httpx.HTTPError as exc:
return False, f"HTTP error: {exc}"
if response.status_code >= 400:
body = response.text[:500] # cap sidecar size
return False, f"HTTP {response.status_code}: {body}"
return True, f"HTTP {response.status_code} — ingested successfully"
def move_to_processed(filepath: str, watch_folder: str) -> None:
"""Move *filepath* into the ``processed/`` subfolder of *watch_folder*."""
dest_dir = os.path.join(watch_folder, "processed")
os.makedirs(dest_dir, exist_ok=True)
dest = os.path.join(dest_dir, os.path.basename(filepath))
shutil.move(filepath, dest)
logger.info("Moved to processed: %s", dest)
def move_to_failed(filepath: str, error: str, watch_folder: str) -> None:
"""Move *filepath* into the ``failed/`` subfolder and write a ``.error`` sidecar."""
dest_dir = os.path.join(watch_folder, "failed")
os.makedirs(dest_dir, exist_ok=True)
dest = os.path.join(dest_dir, os.path.basename(filepath))
shutil.move(filepath, dest)
sidecar = dest + ".error"
try:
Path(sidecar).write_text(error, encoding="utf-8")
except OSError as exc:
logger.error("Failed to write error sidecar %s: %s", sidecar, exc)
logger.warning("Moved to failed: %s%s", dest, error)
# ── Watchdog handler ─────────────────────────────────────────────────────────
class TranscriptHandler(FileSystemEventHandler):
"""Handle new ``.json`` files in the watch folder."""
def __init__(self, watch_folder: str) -> None:
super().__init__()
self.watch_folder = watch_folder
def on_created(self, event): # noqa: ANN001
if event.is_directory:
return
filepath = event.src_path
# Only process .json files
if not filepath.lower().endswith(".json"):
return
# Ignore files inside processed/ or failed/ subdirs
rel = os.path.relpath(filepath, self.watch_folder)
parts = Path(rel).parts
if parts and parts[0] in ("processed", "failed"):
return
logger.info("Detected new file: %s", filepath)
# Wait for file to finish writing (SCP/rsync)
if not wait_for_stability(filepath):
logger.warning("File vanished before stable: %s", filepath)
return
logger.info("File stable, processing: %s", filepath)
# Attempt ingest
success, detail = post_to_ingest(filepath)
if success:
logger.info("Ingest succeeded for %s: %s", filepath, detail)
try:
move_to_processed(filepath, self.watch_folder)
except OSError as exc:
logger.error("Failed to move %s to processed/: %s", filepath, exc)
else:
logger.error("Ingest failed for %s: %s", filepath, detail)
try:
move_to_failed(filepath, detail, self.watch_folder)
except OSError as exc:
logger.error("Failed to move %s to failed/: %s", filepath, exc)
# ── Main ─────────────────────────────────────────────────────────────────────
def main() -> None:
"""Entry point — start the polling observer and block until interrupted."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
watch_path = os.path.abspath(WATCH_FOLDER)
logger.info("Starting watcher on %s (poll every %.1fs, stability %.1fs)",
watch_path, POLL_INTERVAL, STABILITY_SECONDS)
# Ensure base dirs exist
os.makedirs(watch_path, exist_ok=True)
os.makedirs(os.path.join(watch_path, "processed"), exist_ok=True)
os.makedirs(os.path.join(watch_path, "failed"), exist_ok=True)
handler = TranscriptHandler(watch_path)
observer = PollingObserver(timeout=POLL_INTERVAL)
observer.schedule(handler, watch_path, recursive=True)
observer.start()
logger.info("Watcher running. Press Ctrl+C to stop.")
try:
while observer.is_alive():
observer.join(timeout=1)
except KeyboardInterrupt:
logger.info("Shutting down watcher.")
finally:
observer.stop()
observer.join()
logger.info("Watcher stopped.")
if __name__ == "__main__":
main()