From 5e408dff5ab540f1e66f0147b3e4d03a84ed81f2 Mon Sep 17 00:00:00 2001 From: jlightner Date: Mon, 30 Mar 2026 19:17:47 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Built=20backend/watcher.py=20with=20Pol?= =?UTF-8?q?lingObserver-based=20folder=20watchi=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/watcher.py" - "backend/requirements.txt" GSD-Task: S03/T01 --- .gsd/milestones/M007/M007-ROADMAP.md | 2 +- .../milestones/M007/slices/S02/S02-SUMMARY.md | 79 ++++++ .gsd/milestones/M007/slices/S02/S02-UAT.md | 62 +++++ .../M007/slices/S02/tasks/T01-VERIFY.json | 42 +++ .gsd/milestones/M007/slices/S03/S03-PLAN.md | 139 +++++++++- .../M007/slices/S03/S03-RESEARCH.md | 130 ++++++++++ .../M007/slices/S03/tasks/T01-PLAN.md | 89 +++++++ .../M007/slices/S03/tasks/T01-SUMMARY.md | 81 ++++++ .../M007/slices/S03/tasks/T02-PLAN.md | 85 ++++++ backend/requirements.txt | 20 ++ backend/watcher.py | 244 ++++++++++++++++++ 11 files changed, 971 insertions(+), 2 deletions(-) create mode 100644 .gsd/milestones/M007/slices/S02/S02-SUMMARY.md create mode 100644 .gsd/milestones/M007/slices/S02/S02-UAT.md create mode 100644 .gsd/milestones/M007/slices/S02/tasks/T01-VERIFY.json create mode 100644 .gsd/milestones/M007/slices/S03/S03-RESEARCH.md create mode 100644 .gsd/milestones/M007/slices/S03/tasks/T01-PLAN.md create mode 100644 .gsd/milestones/M007/slices/S03/tasks/T01-SUMMARY.md create mode 100644 .gsd/milestones/M007/slices/S03/tasks/T02-PLAN.md create mode 100644 backend/requirements.txt create mode 100644 backend/watcher.py diff --git a/.gsd/milestones/M007/M007-ROADMAP.md b/.gsd/milestones/M007/M007-ROADMAP.md index 09d1239..fd4e585 100644 --- a/.gsd/milestones/M007/M007-ROADMAP.md +++ b/.gsd/milestones/M007/M007-ROADMAP.md @@ -7,7 +7,7 @@ Make the pipeline fully transparent — every LLM call's full prompt and respons | ID | Slice | Risk | Depends | Done | After this | |----|-------|------|---------|------|------------| | S01 | Pipeline Debug Mode — Full LLM I/O Capture and Token Accounting | medium | — | ✅ | Toggle debug mode on for a video, trigger pipeline, see full prompt + response stored for every LLM call with per-stage token breakdown | -| S02 | Debug Payload Viewer — Inline View, Copy, and Export in Admin UI | low | S01 | ⬜ | Admin clicks an LLM call event, sees full prompt and response in a readable viewer, copies to clipboard, or downloads as JSON | +| S02 | Debug Payload Viewer — Inline View, Copy, and Export in Admin UI | low | S01 | ✅ | Admin clicks an LLM call event, sees full prompt and response in a readable viewer, copies to clipboard, or downloads as JSON | | S03 | Transcript Folder Watcher — Auto-Ingest Service | medium | — | ⬜ | Drop a transcript JSON into the watch folder on ub01 → file is detected, validated, POSTed to /ingest, pipeline triggers automatically | | S04 | Admin UX Audit — Prune, Streamline, and Polish | low | S02 | ⬜ | Admin pipeline page is cleaner and more efficient for daily content management. Dead UI elements removed. Workflow improvements visible. | | S05 | Key Moment Card Text Overflow Fix | low | — | ⬜ | Key moment cards with long filenames and timestamp ranges display cleanly — text truncated with ellipsis or wrapped within card bounds, no horizontal bleed | diff --git a/.gsd/milestones/M007/slices/S02/S02-SUMMARY.md b/.gsd/milestones/M007/slices/S02/S02-SUMMARY.md new file mode 100644 index 0000000..eab653e --- /dev/null +++ b/.gsd/milestones/M007/slices/S02/S02-SUMMARY.md @@ -0,0 +1,79 @@ +--- +id: S02 +parent: M007 +milestone: M007 +provides: + - DebugPayloadViewer component for viewing LLM I/O in admin UI + - Per-section clipboard copy for prompt/response text + - JSON export of debug payloads +requires: + - slice: S01 + provides: PipelineEvent model with debug text fields populated by debug mode pipeline runs +affects: + - S04 +key_files: + - frontend/src/pages/AdminPipeline.tsx + - frontend/src/api/public-client.ts + - frontend/src/App.css +key_decisions: + - Used clipboard API with execCommand fallback for non-HTTPS admin tool context + - DebugPayloadViewer renders only when at least one debug text field is non-null + - Field names use system_prompt_text/user_prompt_text/response_text matching the backend PipelineEvent model +patterns_established: + - BEM-style component CSS with debug-viewer__ prefix following existing JsonViewer pattern + - Conditional component rendering gated on non-null payload fields +observability_surfaces: + - Debug viewer surfaces full LLM prompts and responses inline — this IS the observability tool for pipeline prompt debugging +drill_down_paths: + - .gsd/milestones/M007/slices/S02/tasks/T01-SUMMARY.md +duration: "" +verification_result: passed +completed_at: 2026-03-30T19:10:25.752Z +blocker_discovered: false +--- + +# S02: Debug Payload Viewer — Inline View, Copy, and Export in Admin UI + +**Added DebugPayloadViewer component to the admin pipeline page — LLM call events now show collapsible System Prompt / User Prompt / Response sections with per-section clipboard copy and full JSON export.** + +## What Happened + +This slice delivered the frontend viewer for LLM debug payloads captured by S01's debug mode infrastructure. A single task (T01) added three optional string fields to the PipelineEvent TypeScript interface (`system_prompt_text`, `user_prompt_text`, `response_text`), built the `DebugPayloadViewer` component with independently collapsible sections, per-section copy-to-clipboard (with `execCommand` fallback for non-HTTPS contexts), and a JSON export button that downloads all debug fields as a formatted file. The component renders inline on every `llm_call` event row in the pipeline event log, gated on at least one debug field being non-null. CSS follows the existing JsonViewer BEM pattern with `var(--color-*)` custom properties per D017. Changes were applied directly on ub01 (canonical dev directory) and deployed via Docker Compose rebuild. Browser verification confirmed all UI elements render correctly with real pipeline data. + +## Verification + +Container chrysopedia-web-8096 healthy and serving HTTP 200. DebugPayloadViewer component present in deployed JS bundle (grep confirmed). Browser verification: `.debug-viewer` selector visible, "LLM Debug" label rendered, collapsible section headers (System Prompt / User Prompt / Response) present in DOM with Copy buttons, JSON export button visible. Expanded System Prompt section displays full prompt text in pre-formatted block. Component renders on all 3 llm_call events in the expanded video's event list. + +## Requirements Advanced + +None. + +## Requirements Validated + +None. + +## New Requirements Surfaced + +None. + +## Requirements Invalidated or Re-scoped + +None. + +## Deviations + +Changes applied directly on ub01 rather than pushing through git — diverged branches prevented git pull. Service name in plan verification command was `chrysopedia-web-8096` but Docker Compose build target is `chrysopedia-web` (the `-8096` is only the container name). Unicode escape issue in export button required a second build pass. + +## Known Limitations + +Local dev01 copy is out of sync with ub01 — all changes live only on ub01. The plan verification command in S02-PLAN.md has shell quoting issues (unterminated quotes in the SSH command) that would always fail as written. + +## Follow-ups + +None. + +## Files Created/Modified + +- `frontend/src/pages/AdminPipeline.tsx` — Added DebugPayloadViewer component (collapsible sections, copy, export) and wired into llm_call event rows +- `frontend/src/api/public-client.ts` — Added system_prompt_text, user_prompt_text, response_text fields to PipelineEvent interface +- `frontend/src/App.css` — Added ~100 lines of debug-viewer CSS using var(--color-*) custom properties diff --git a/.gsd/milestones/M007/slices/S02/S02-UAT.md b/.gsd/milestones/M007/slices/S02/S02-UAT.md new file mode 100644 index 0000000..98bca26 --- /dev/null +++ b/.gsd/milestones/M007/slices/S02/S02-UAT.md @@ -0,0 +1,62 @@ +# S02: Debug Payload Viewer — Inline View, Copy, and Export in Admin UI — UAT + +**Milestone:** M007 +**Written:** 2026-03-30T19:10:25.752Z + +# S02 UAT: Debug Payload Viewer + +## Preconditions +- Chrysopedia running on ub01:8096 with chrysopedia-web-8096 container healthy +- At least one video has been processed with debug mode enabled (S01) so pipeline events contain non-null `system_prompt_text`, `user_prompt_text`, and `response_text` fields + +## Test Cases + +### TC1: Debug viewer appears on LLM call events +1. Navigate to http://ub01:8096/admin/pipeline +2. Click any video card that shows events (e.g. one with "extracted" status and >10 events) +3. Look for events with the 🤖 emoji and `llm_call` event type +4. **Expected:** Each llm_call event shows a "LLM DEBUG" header with a "↓ JSON" export button +5. **Expected:** Below the header, three collapsible sections: "System Prompt", "User Prompt", "Response" — each with a "Copy" button + +### TC2: Debug viewer does NOT appear on non-LLM events +1. In the same expanded video event list, look at `complete`, `start`, or other non-llm_call events +2. **Expected:** No "LLM DEBUG" section appears on these events + +### TC3: Collapsible sections expand and collapse +1. Click the "▸ System Prompt" toggle on any llm_call event +2. **Expected:** Section expands showing the full system prompt text in a pre-formatted block with dark background +3. Click the "▾ System Prompt" toggle again +4. **Expected:** Section collapses, hiding the content +5. Repeat for "User Prompt" and "Response" sections +6. **Expected:** Each section expands/collapses independently — expanding one does not affect others + +### TC4: Copy to clipboard +1. Expand the "System Prompt" section on any llm_call event +2. Click the "Copy" button next to "System Prompt" +3. Paste into a text editor +4. **Expected:** The full system prompt text is in the clipboard, matching what's displayed in the viewer + +### TC5: JSON export download +1. Click the "↓ JSON" button in the debug viewer header +2. **Expected:** Browser downloads a JSON file +3. Open the downloaded file +4. **Expected:** File contains a JSON object with keys `system_prompt`, `user_prompt`, and `response`, each containing the corresponding text (or null if the field was empty) + +### TC6: Empty debug fields handling +1. If a video was processed WITHOUT debug mode, expand its events +2. Find an llm_call event +3. **Expected:** Either no debug viewer appears, or the viewer shows with empty/null sections — no crash or rendering error + +### Edge Cases + +### TC7: Very long prompts +1. Find an llm_call event where the system prompt is very long (stage 3 extraction prompts are typically 2000+ characters) +2. Expand the System Prompt section +3. **Expected:** Text renders in a scrollable pre-formatted block without breaking the page layout +4. **Expected:** The section does not push other events off-screen permanently — collapsing it restores normal layout + +### TC8: Multiple viewers on same page +1. Expand a video with multiple llm_call events (3+) +2. **Expected:** Each event has its own independent debug viewer +3. Expand System Prompt on the first event and User Prompt on the second +4. **Expected:** Both sections stay expanded independently — they don't interfere with each other diff --git a/.gsd/milestones/M007/slices/S02/tasks/T01-VERIFY.json b/.gsd/milestones/M007/slices/S02/tasks/T01-VERIFY.json new file mode 100644 index 0000000..48ec418 --- /dev/null +++ b/.gsd/milestones/M007/slices/S02/tasks/T01-VERIFY.json @@ -0,0 +1,42 @@ +{ + "schemaVersion": 1, + "taskId": "T01", + "unitId": "M007/S02/T01", + "timestamp": 1774897643823, + "passed": false, + "discoverySource": "task-plan", + "checks": [ + { + "command": "ssh ub01 \"cd /vmPool/r/repos/xpltdco/chrysopedia", + "exitCode": 2, + "durationMs": 5, + "verdict": "fail" + }, + { + "command": "git pull", + "exitCode": 128, + "durationMs": 8, + "verdict": "fail" + }, + { + "command": "docker compose build chrysopedia-web-8096", + "exitCode": 1, + "durationMs": 63, + "verdict": "fail" + }, + { + "command": "docker compose up -d chrysopedia-web-8096\"", + "exitCode": 2, + "durationMs": 4, + "verdict": "fail" + }, + { + "command": "sleep 5", + "exitCode": 0, + "durationMs": 5006, + "verdict": "pass" + } + ], + "retryAttempt": 1, + "maxRetries": 2 +} diff --git a/.gsd/milestones/M007/slices/S03/S03-PLAN.md b/.gsd/milestones/M007/slices/S03/S03-PLAN.md index 9e50de9..3437930 100644 --- a/.gsd/milestones/M007/slices/S03/S03-PLAN.md +++ b/.gsd/milestones/M007/slices/S03/S03-PLAN.md @@ -1,6 +1,143 @@ # S03: Transcript Folder Watcher — Auto-Ingest Service -**Goal:** New Python service that watches a configured directory for new .json files, validates them against the Chrysopedia transcript schema, and POSTs them to the ingest endpoint. Runs as a Docker Compose service. +**Goal:** A folder-watching service monitors a directory for new transcript JSON files, validates them, and POSTs them to the /api/v1/ingest endpoint, replacing manual curl/upload for getting transcripts into the pipeline. **Demo:** After this: Drop a transcript JSON into the watch folder on ub01 → file is detected, validated, POSTed to /ingest, pipeline triggers automatically ## Tasks +- [x] **T01: Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars** — Create the core watcher script that monitors a folder for new transcript JSON files and POSTs them to the ingest API. + +**Slice:** S03 — Transcript Folder Watcher — Auto-Ingest Service +**Milestone:** M007 + +## Description + +Build `backend/watcher.py` — a standalone Python script using the `watchdog` library to monitor a configurable folder for new `.json` files. When a file is detected: +1. Wait for file stability (size unchanged for 2 seconds) to handle SCP/rsync partial writes +2. Read and validate JSON structure (must contain `source_file`, `creator_folder`, `duration_seconds`, `segments`) +3. POST the file as multipart upload to the ingest API endpoint using httpx +4. Move successfully processed files to `processed/` subfolder +5. Move failed files to `failed/` subfolder with a `.error` sidecar file containing the error details + +The script uses `PollingObserver` (not inotify Observer) for reliability on ZFS/network-mounted filesystems. It watches recursively. + +Add `watchdog` to `backend/requirements.txt`. + +## Failure Modes + +| Dependency | On error | On timeout | On malformed response | +|------------|----------|-----------|----------------------| +| Ingest API | Move file to `failed/` with HTTP status + body in `.error` sidecar | httpx timeout (30s default) → same as error | Log warning, move to `failed/` | +| Filesystem | Log exception, skip file (watchdog continues) | N/A | N/A | + +## Negative Tests + +- **Malformed inputs**: Invalid JSON (parse error) → `failed/` with error sidecar. Missing required keys → `failed/` with validation error. Non-JSON files → ignored (only `.json` suffix triggers processing). Empty file → `failed/`. +- **Error paths**: API returns 4xx/5xx → `failed/` with response body. API unreachable → `failed/` with connection error. File deleted between detection and read → log warning, skip. +- **Boundary conditions**: File still being written (size changing) → wait until stable. Same filename in `processed/` already exists → overwrite (idempotent). + +## Steps + +1. Add `watchdog>=4.0,<5.0` to `backend/requirements.txt` +2. Create `backend/watcher.py` with: + - Configuration from environment variables: `WATCH_FOLDER` (default `/watch`), `WATCHER_API_URL` (default `http://chrysopedia-api:8000/api/v1/ingest`), `WATCHER_STABILITY_SECONDS` (default `2`), `WATCHER_POLL_INTERVAL` (default `5`) + - `TranscriptHandler(FileSystemEventHandler)` class that handles `on_created` events for `.json` files + - `wait_for_stability(path, seconds)` function — polls file size every 0.5s, returns True when size is unchanged for `seconds` duration, returns False if file disappears + - `validate_transcript(data)` function — checks for required keys (`source_file`, `creator_folder`, `duration_seconds`, `segments`) + - `post_to_ingest(filepath)` function — reads file, validates JSON, POSTs as multipart upload via httpx, returns (success, detail) + - `move_to_processed(filepath)` / `move_to_failed(filepath, error)` functions — create subdirs as needed, move file, write `.error` sidecar for failures + - Main block: create `processed/` and `failed/` subdirs, start `PollingObserver` with recursive=True, run until KeyboardInterrupt + - Structured logging with `logging.basicConfig` at INFO level, logger name `watcher` +3. Verify `python -m py_compile backend/watcher.py` passes + +## Must-Haves + +- [x] watchdog added to requirements.txt +- [x] watcher.py handles file stability check +- [x] watcher.py validates JSON keys before POST +- [x] watcher.py POSTs via httpx multipart upload +- [x] Processed files move to processed/ subfolder +- [x] Failed files move to failed/ with .error sidecar +- [x] Structured logging with watcher.* logger +- [x] PollingObserver used (not inotify) for ZFS reliability + +## Verification + +- `python -m py_compile backend/watcher.py` exits 0 +- `grep -q 'watchdog' backend/requirements.txt` exits 0 +- `grep -q 'PollingObserver' backend/watcher.py` exits 0 +- `grep -q 'processed' backend/watcher.py && grep -q 'failed' backend/watcher.py` exits 0 + +## Observability Impact + +- Signals added: `watcher` logger with structured messages for pickup, stability wait, validation, POST result, and file moves +- How a future agent inspects this: `docker logs chrysopedia-watcher` shows all watcher activity +- Failure state exposed: `.error` sidecar files in `failed/` directory with HTTP status, response body, or exception traceback + - Estimate: 45m + - Files: backend/watcher.py, backend/requirements.txt + - Verify: python -m py_compile backend/watcher.py && grep -q 'watchdog' backend/requirements.txt && grep -q 'PollingObserver' backend/watcher.py +- [ ] **T02: Docker Compose integration and end-to-end verification on ub01** — Add the chrysopedia-watcher service to docker-compose.yml, create the watch folder on ub01, build and deploy, and verify end-to-end file-drop ingestion. + +**Slice:** S03 — Transcript Folder Watcher — Auto-Ingest Service +**Milestone:** M007 + +## Description + +Wire the watcher script into the Docker Compose stack as a new service. The watcher reuses the existing `Dockerfile.api` image (same Python + deps) but runs `python watcher.py` instead of uvicorn. The watch folder on ub01 is `/vmPool/r/services/chrysopedia_watch/` bind-mounted to `/watch` in the container. + +This task deploys to ub01 and verifies end-to-end: drop a transcript JSON → watcher picks it up → POSTs to API → video record created. + +**Important context for executor:** The canonical codebase lives on ub01 at `/vmPool/r/repos/xpltdco/chrysopedia`. You must SSH to ub01 to edit docker-compose.yml, build, and test. The watcher.py from T01 needs to be committed and available in the repo on ub01. + +## Failure Modes + +| Dependency | On error | On timeout | On malformed response | +|------------|----------|-----------|----------------------| +| Docker build | Fix Dockerfile or requirements, rebuild | N/A | N/A | +| API healthcheck | Watcher waits (depends_on condition: service_healthy) | Start timeout → container restarts | N/A | + +## Steps + +1. SSH to ub01 and add the `chrysopedia-watcher` service to `docker-compose.yml`: + - `build: context: . dockerfile: docker/Dockerfile.api` (reuse existing image) + - `container_name: chrysopedia-watcher` + - `restart: unless-stopped` + - `command: ["python", "watcher.py"]` + - `environment: WATCHER_API_URL: http://chrysopedia-api:8000/api/v1/ingest, WATCH_FOLDER: /watch` + - `volumes: /vmPool/r/services/chrysopedia_watch:/watch` + - `depends_on: chrysopedia-api: condition: service_healthy` + - `networks: chrysopedia` + - `healthcheck: test: ["CMD-SHELL", "pgrep -f watcher.py || exit 1"] interval: 30s timeout: 5s retries: 3` +2. Create the watch folder on ub01: `mkdir -p /vmPool/r/services/chrysopedia_watch` +3. Ensure T01's `backend/watcher.py` and updated `backend/requirements.txt` are committed and pushed to the repo on ub01 (git pull if needed) +4. Build and deploy: `cd /vmPool/r/repos/xpltdco/chrysopedia && docker compose build chrysopedia-watcher && docker compose up -d chrysopedia-watcher` +5. Verify watcher is running: `docker exec chrysopedia-watcher pgrep -f watcher.py` +6. End-to-end test: Copy a known-good transcript JSON to `/vmPool/r/services/chrysopedia_watch/test_ingest.json` → check watcher logs for pickup and 200 → verify file moved to `processed/` +7. Error test: Create an invalid JSON file in the watch folder → verify it moves to `failed/` with `.error` sidecar + +## Must-Haves + +- [x] chrysopedia-watcher service defined in docker-compose.yml +- [x] Watch folder bind mount configured +- [x] Watcher container starts and stays healthy +- [x] End-to-end file-drop ingestion verified +- [x] Failed file handling verified + +## Verification + +- `ssh ub01 "docker ps --filter name=chrysopedia-watcher --format '{{.Status}}'"` shows healthy/running +- `ssh ub01 "docker logs chrysopedia-watcher 2>&1 | tail -5"` shows watcher startup message +- `ssh ub01 "ls /vmPool/r/services/chrysopedia_watch/processed/"` contains the test file +- `ssh ub01 "ls /vmPool/r/services/chrysopedia_watch/failed/"` contains the invalid test file + `.error` sidecar + +## Inputs + +- `backend/watcher.py` — watcher script from T01 +- `backend/requirements.txt` — updated with watchdog dependency from T01 +- `docker-compose.yml` — existing compose file to modify + +## Expected Output + +- `docker-compose.yml` — updated with chrysopedia-watcher service definition + - Estimate: 30m + - Files: docker-compose.yml + - Verify: ssh ub01 "docker ps --filter name=chrysopedia-watcher --format '{{.Status}}'" | grep -qi 'healthy\|up' diff --git a/.gsd/milestones/M007/slices/S03/S03-RESEARCH.md b/.gsd/milestones/M007/slices/S03/S03-RESEARCH.md new file mode 100644 index 0000000..f7703d2 --- /dev/null +++ b/.gsd/milestones/M007/slices/S03/S03-RESEARCH.md @@ -0,0 +1,130 @@ +# S03 Research — Transcript Folder Watcher — Auto-Ingest Service + +## Summary + +Add a folder-watching service that monitors a directory for new transcript JSON files, validates them, and POSTs them to the existing `/api/v1/ingest` endpoint. This replaces manual curl/upload for getting transcripts into the pipeline. + +The work is straightforward — filesystem monitoring + HTTP POST using well-understood Python libraries already in the project. The ingest endpoint is mature (6 integration tests, handles idempotent re-upload, auto-dispatches pipeline). The watcher just needs to be a reliable bridge from "file appears on disk" to "POST /api/v1/ingest". + +## Recommendation + +**watchdog** library + **httpx** (already a dependency) in a lightweight Python script, deployed as a new Docker Compose service with the watch folder bind-mounted. Polling fallback via `PollingObserver` for reliability on network-mounted filesystems. + +## Implementation Landscape + +### Current Ingest Flow +- `POST /api/v1/ingest` accepts multipart file upload (`UploadFile`) +- Validates JSON structure (requires `source_file`, `creator_folder`, `duration_seconds`, `segments`) +- Creates/updates Creator, upserts SourceVideo, bulk-inserts TranscriptSegments +- Saves raw JSON to `/data/transcripts/{creator_folder}/` +- Dispatches Celery pipeline via `run_pipeline.delay(str(video.id))` +- Returns `TranscriptIngestResponse` with `video_id`, `processing_status`, etc. +- Already handles idempotent re-upload (content hash matching, duplicate detection) + +### Transcript JSON Format +Files produced by `whisper/transcribe.py` on hal0022: +```json +{ + "source_file": "filename.mp4", + "creator_folder": "CreatorName", + "duration_seconds": 2547, + "segments": [{"start": 0.0, "end": 4.52, "text": "...", "words": [...]}] +} +``` + +### Deployment Topology +- All services run on **ub01** via Docker Compose (`xpltd_chrysopedia` project) +- Data volume: `/vmPool/r/services/chrysopedia_data` → mounted at `/data` in API/worker containers +- Existing transcripts live at `/vmPool/r/services/chrysopedia_data/transcripts/{CreatorName}/` +- The watcher needs a **separate watch folder** (not the transcripts store) — the ingest endpoint copies files there itself +- Proposed watch folder: `/vmPool/r/services/chrysopedia_watch/` → mounted at `/watch` in the watcher container +- API is reachable from other containers at `http://chrysopedia-api:8000` on the `chrysopedia` Docker network + +### Key Design Decisions + +**Watch folder ≠ transcript storage.** The ingest endpoint already saves transcripts to `/data/transcripts/`. The watch folder is an inbox — files land there, get POSTed, then get moved to a `processed/` or `failed/` subfolder. This prevents double-processing and gives visibility into what's been handled. + +**File stability check.** When watchdog fires `on_created`, the file may still be being written (especially over network mounts or SCP). The watcher should wait until the file size stops changing (poll twice with a 1-2s delay) before attempting to read and POST it. + +**Recursive vs flat watching.** Watch recursively — the whisper script outputs to `{output_dir}/{video_stem}.json` but users might organize into creator subfolders. The `creator_folder` field inside the JSON is authoritative, not the filesystem path. + +**Error handling.** Failed files move to `failed/` with a `.error` sidecar file containing the HTTP response or exception. This avoids infinite retry loops and gives the admin a clear queue of problems to fix. + +### Files to Create/Modify + +| File | Action | Purpose | +|------|--------|---------| +| `backend/watcher.py` | **Create** | Main watcher script — watchdog Observer + httpx POST logic | +| `docker/Dockerfile.watcher` | **Create** | Lightweight image (could reuse Dockerfile.api or be minimal) | +| `docker-compose.yml` | **Modify** | Add `chrysopedia-watcher` service with watch folder bind mount | +| `backend/config.py` | **Modify** | Add `watch_folder_path` and `watcher_api_url` settings | + +### Dependencies + +- **watchdog** — needs adding to `requirements.txt` (not currently a dependency) +- **httpx** — already in `requirements.txt`, used for HTTP client +- **json** / **os** / **shutil** / **time** / **logging** — stdlib only + +### Watcher Service Architecture + +``` +chrysopedia-watcher container: + ┌──────────────────────────────────┐ + │ watchdog Observer │ + │ monitors /watch/ recursively │ + │ │ + │ on_created *.json: │ + │ 1. Wait for file stability │ + │ 2. Read & validate JSON keys │ + │ 3. POST to chrysopedia-api │ + │ 4. Move to processed/ or │ + │ failed/ with sidecar │ + └──────────────────────────────────┘ +``` + +### Docker Compose Service + +```yaml +chrysopedia-watcher: + build: + context: . + dockerfile: docker/Dockerfile.api # reuse same image + container_name: chrysopedia-watcher + restart: unless-stopped + command: ["python", "watcher.py"] + environment: + WATCHER_API_URL: http://chrysopedia-api:8000/api/v1/ingest + WATCH_FOLDER: /watch + volumes: + - /vmPool/r/services/chrysopedia_watch:/watch + depends_on: + chrysopedia-api: + condition: service_healthy + networks: + - chrysopedia + healthcheck: + test: ["CMD-SHELL", "pgrep -f watcher.py || exit 1"] + interval: 30s + timeout: 5s + retries: 3 +``` + +### Natural Task Seams + +1. **T01: Create watcher.py** — the core script with watchdog Observer, file stability check, JSON validation, httpx multipart POST, processed/failed file management, structured logging. This is ~80% of the work. +2. **T02: Docker integration** — Dockerfile (or reuse), compose service definition, config settings, host folder creation. Verify end-to-end: drop file → watcher detects → POSTs to API → pipeline triggers. + +### Verification Strategy + +- **Unit-level:** Drop a valid transcript JSON into the watch folder on ub01 → confirm watcher logs pickup, API returns 200, video appears in DB with `transcribed` status, pipeline dispatches. +- **Error case:** Drop an invalid JSON → confirm it moves to `failed/` with error sidecar, watcher continues running. +- **Stability:** Watcher survives API container restart (retry logic or health-check-gated startup). +- **Idempotency:** Drop same file twice → second copy moves to processed (API handles idempotent re-upload). + +### Risks + +**Low risk overall.** The ingest endpoint is battle-tested. The watcher is a thin bridge. Main concern is file stability detection on the bind mount — watchdog's inotify works well on local Linux filesystems, which is what `/vmPool/r/services/` is (ZFS on ub01). If files arrive via SCP or rsync, partial writes could trigger premature pickup — the stability check (size-stable for 2s) handles this. + +### Skills + +No specialized skills needed. Standard Python, Docker Compose, watchdog library. No frontend work. diff --git a/.gsd/milestones/M007/slices/S03/tasks/T01-PLAN.md b/.gsd/milestones/M007/slices/S03/tasks/T01-PLAN.md new file mode 100644 index 0000000..434fd37 --- /dev/null +++ b/.gsd/milestones/M007/slices/S03/tasks/T01-PLAN.md @@ -0,0 +1,89 @@ +--- +estimated_steps: 51 +estimated_files: 2 +skills_used: [] +--- + +# T01: Create watcher.py — watchdog observer with file stability, validation, and POST logic + +Create the core watcher script that monitors a folder for new transcript JSON files and POSTs them to the ingest API. + +**Slice:** S03 — Transcript Folder Watcher — Auto-Ingest Service +**Milestone:** M007 + +## Description + +Build `backend/watcher.py` — a standalone Python script using the `watchdog` library to monitor a configurable folder for new `.json` files. When a file is detected: +1. Wait for file stability (size unchanged for 2 seconds) to handle SCP/rsync partial writes +2. Read and validate JSON structure (must contain `source_file`, `creator_folder`, `duration_seconds`, `segments`) +3. POST the file as multipart upload to the ingest API endpoint using httpx +4. Move successfully processed files to `processed/` subfolder +5. Move failed files to `failed/` subfolder with a `.error` sidecar file containing the error details + +The script uses `PollingObserver` (not inotify Observer) for reliability on ZFS/network-mounted filesystems. It watches recursively. + +Add `watchdog` to `backend/requirements.txt`. + +## Failure Modes + +| Dependency | On error | On timeout | On malformed response | +|------------|----------|-----------|----------------------| +| Ingest API | Move file to `failed/` with HTTP status + body in `.error` sidecar | httpx timeout (30s default) → same as error | Log warning, move to `failed/` | +| Filesystem | Log exception, skip file (watchdog continues) | N/A | N/A | + +## Negative Tests + +- **Malformed inputs**: Invalid JSON (parse error) → `failed/` with error sidecar. Missing required keys → `failed/` with validation error. Non-JSON files → ignored (only `.json` suffix triggers processing). Empty file → `failed/`. +- **Error paths**: API returns 4xx/5xx → `failed/` with response body. API unreachable → `failed/` with connection error. File deleted between detection and read → log warning, skip. +- **Boundary conditions**: File still being written (size changing) → wait until stable. Same filename in `processed/` already exists → overwrite (idempotent). + +## Steps + +1. Add `watchdog>=4.0,<5.0` to `backend/requirements.txt` +2. Create `backend/watcher.py` with: + - Configuration from environment variables: `WATCH_FOLDER` (default `/watch`), `WATCHER_API_URL` (default `http://chrysopedia-api:8000/api/v1/ingest`), `WATCHER_STABILITY_SECONDS` (default `2`), `WATCHER_POLL_INTERVAL` (default `5`) + - `TranscriptHandler(FileSystemEventHandler)` class that handles `on_created` events for `.json` files + - `wait_for_stability(path, seconds)` function — polls file size every 0.5s, returns True when size is unchanged for `seconds` duration, returns False if file disappears + - `validate_transcript(data)` function — checks for required keys (`source_file`, `creator_folder`, `duration_seconds`, `segments`) + - `post_to_ingest(filepath)` function — reads file, validates JSON, POSTs as multipart upload via httpx, returns (success, detail) + - `move_to_processed(filepath)` / `move_to_failed(filepath, error)` functions — create subdirs as needed, move file, write `.error` sidecar for failures + - Main block: create `processed/` and `failed/` subdirs, start `PollingObserver` with recursive=True, run until KeyboardInterrupt + - Structured logging with `logging.basicConfig` at INFO level, logger name `watcher` +3. Verify `python -m py_compile backend/watcher.py` passes + +## Must-Haves + +- [x] watchdog added to requirements.txt +- [x] watcher.py handles file stability check +- [x] watcher.py validates JSON keys before POST +- [x] watcher.py POSTs via httpx multipart upload +- [x] Processed files move to processed/ subfolder +- [x] Failed files move to failed/ with .error sidecar +- [x] Structured logging with watcher.* logger +- [x] PollingObserver used (not inotify) for ZFS reliability + +## Verification + +- `python -m py_compile backend/watcher.py` exits 0 +- `grep -q 'watchdog' backend/requirements.txt` exits 0 +- `grep -q 'PollingObserver' backend/watcher.py` exits 0 +- `grep -q 'processed' backend/watcher.py && grep -q 'failed' backend/watcher.py` exits 0 + +## Observability Impact + +- Signals added: `watcher` logger with structured messages for pickup, stability wait, validation, POST result, and file moves +- How a future agent inspects this: `docker logs chrysopedia-watcher` shows all watcher activity +- Failure state exposed: `.error` sidecar files in `failed/` directory with HTTP status, response body, or exception traceback + +## Inputs + +- `backend/requirements.txt` + +## Expected Output + +- `backend/watcher.py` +- `backend/requirements.txt` + +## Verification + +python -m py_compile backend/watcher.py && grep -q 'watchdog' backend/requirements.txt && grep -q 'PollingObserver' backend/watcher.py diff --git a/.gsd/milestones/M007/slices/S03/tasks/T01-SUMMARY.md b/.gsd/milestones/M007/slices/S03/tasks/T01-SUMMARY.md new file mode 100644 index 0000000..7bb99ea --- /dev/null +++ b/.gsd/milestones/M007/slices/S03/tasks/T01-SUMMARY.md @@ -0,0 +1,81 @@ +--- +id: T01 +parent: S03 +milestone: M007 +provides: [] +requires: [] +affects: [] +key_files: ["backend/watcher.py", "backend/requirements.txt"] +key_decisions: ["Used httpx sync client since watcher runs in synchronous watchdog callback threads", "Capped error sidecar response body at 500 chars", "Ignore files in processed/ and failed/ subdirs to prevent re-processing loops"] +patterns_established: [] +drill_down_paths: [] +observability_surfaces: [] +duration: "" +verification_result: "All four slice-level verification checks pass: py_compile exits 0, watchdog present in requirements.txt, PollingObserver present in watcher.py, processed and failed strings present in watcher.py." +completed_at: 2026-03-30T19:17:44.067Z +blocker_discovered: false +--- + +# T01: Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars + +> Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars + +## What Happened +--- +id: T01 +parent: S03 +milestone: M007 +key_files: + - backend/watcher.py + - backend/requirements.txt +key_decisions: + - Used httpx sync client since watcher runs in synchronous watchdog callback threads + - Capped error sidecar response body at 500 chars + - Ignore files in processed/ and failed/ subdirs to prevent re-processing loops +duration: "" +verification_result: passed +completed_at: 2026-03-30T19:17:44.068Z +blocker_discovered: false +--- + +# T01: Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars + +**Built backend/watcher.py with PollingObserver-based folder watching, file stability detection, JSON validation, multipart POST to /api/v1/ingest, and processed/failed file disposition with error sidecars** + +## What Happened + +Created backend/watcher.py as a standalone Python script using watchdog's PollingObserver for ZFS/NFS reliability. Monitors WATCH_FOLDER for new .json files, waits for file size stability (SCP/rsync handling), validates JSON structure against required keys, POSTs as multipart upload via httpx to the ingest API. Successfully processed files move to processed/, failures move to failed/ with .error sidecar files. Also created backend/requirements.txt synced from ub01 production with watchdog added. + +## Verification + +All four slice-level verification checks pass: py_compile exits 0, watchdog present in requirements.txt, PollingObserver present in watcher.py, processed and failed strings present in watcher.py. + +## Verification Evidence + +| # | Command | Exit Code | Verdict | Duration | +|---|---------|-----------|---------|----------| +| 1 | `python -m py_compile backend/watcher.py` | 0 | ✅ pass | 500ms | +| 2 | `grep -q 'watchdog' backend/requirements.txt` | 0 | ✅ pass | 50ms | +| 3 | `grep -q 'PollingObserver' backend/watcher.py` | 0 | ✅ pass | 50ms | +| 4 | `grep -q 'processed' backend/watcher.py && grep -q 'failed' backend/watcher.py` | 0 | ✅ pass | 50ms | + + +## Deviations + +None. + +## Known Issues + +None. + +## Files Created/Modified + +- `backend/watcher.py` +- `backend/requirements.txt` + + +## Deviations +None. + +## Known Issues +None. diff --git a/.gsd/milestones/M007/slices/S03/tasks/T02-PLAN.md b/.gsd/milestones/M007/slices/S03/tasks/T02-PLAN.md new file mode 100644 index 0000000..85ea585 --- /dev/null +++ b/.gsd/milestones/M007/slices/S03/tasks/T02-PLAN.md @@ -0,0 +1,85 @@ +--- +estimated_steps: 46 +estimated_files: 1 +skills_used: [] +--- + +# T02: Docker Compose integration and end-to-end verification on ub01 + +Add the chrysopedia-watcher service to docker-compose.yml, create the watch folder on ub01, build and deploy, and verify end-to-end file-drop ingestion. + +**Slice:** S03 — Transcript Folder Watcher — Auto-Ingest Service +**Milestone:** M007 + +## Description + +Wire the watcher script into the Docker Compose stack as a new service. The watcher reuses the existing `Dockerfile.api` image (same Python + deps) but runs `python watcher.py` instead of uvicorn. The watch folder on ub01 is `/vmPool/r/services/chrysopedia_watch/` bind-mounted to `/watch` in the container. + +This task deploys to ub01 and verifies end-to-end: drop a transcript JSON → watcher picks it up → POSTs to API → video record created. + +**Important context for executor:** The canonical codebase lives on ub01 at `/vmPool/r/repos/xpltdco/chrysopedia`. You must SSH to ub01 to edit docker-compose.yml, build, and test. The watcher.py from T01 needs to be committed and available in the repo on ub01. + +## Failure Modes + +| Dependency | On error | On timeout | On malformed response | +|------------|----------|-----------|----------------------| +| Docker build | Fix Dockerfile or requirements, rebuild | N/A | N/A | +| API healthcheck | Watcher waits (depends_on condition: service_healthy) | Start timeout → container restarts | N/A | + +## Steps + +1. SSH to ub01 and add the `chrysopedia-watcher` service to `docker-compose.yml`: + - `build: context: . dockerfile: docker/Dockerfile.api` (reuse existing image) + - `container_name: chrysopedia-watcher` + - `restart: unless-stopped` + - `command: ["python", "watcher.py"]` + - `environment: WATCHER_API_URL: http://chrysopedia-api:8000/api/v1/ingest, WATCH_FOLDER: /watch` + - `volumes: /vmPool/r/services/chrysopedia_watch:/watch` + - `depends_on: chrysopedia-api: condition: service_healthy` + - `networks: chrysopedia` + - `healthcheck: test: ["CMD-SHELL", "pgrep -f watcher.py || exit 1"] interval: 30s timeout: 5s retries: 3` +2. Create the watch folder on ub01: `mkdir -p /vmPool/r/services/chrysopedia_watch` +3. Ensure T01's `backend/watcher.py` and updated `backend/requirements.txt` are committed and pushed to the repo on ub01 (git pull if needed) +4. Build and deploy: `cd /vmPool/r/repos/xpltdco/chrysopedia && docker compose build chrysopedia-watcher && docker compose up -d chrysopedia-watcher` +5. Verify watcher is running: `docker exec chrysopedia-watcher pgrep -f watcher.py` +6. End-to-end test: Copy a known-good transcript JSON to `/vmPool/r/services/chrysopedia_watch/test_ingest.json` → check watcher logs for pickup and 200 → verify file moved to `processed/` +7. Error test: Create an invalid JSON file in the watch folder → verify it moves to `failed/` with `.error` sidecar + +## Must-Haves + +- [x] chrysopedia-watcher service defined in docker-compose.yml +- [x] Watch folder bind mount configured +- [x] Watcher container starts and stays healthy +- [x] End-to-end file-drop ingestion verified +- [x] Failed file handling verified + +## Verification + +- `ssh ub01 "docker ps --filter name=chrysopedia-watcher --format '{{.Status}}'"` shows healthy/running +- `ssh ub01 "docker logs chrysopedia-watcher 2>&1 | tail -5"` shows watcher startup message +- `ssh ub01 "ls /vmPool/r/services/chrysopedia_watch/processed/"` contains the test file +- `ssh ub01 "ls /vmPool/r/services/chrysopedia_watch/failed/"` contains the invalid test file + `.error` sidecar + +## Inputs + +- `backend/watcher.py` — watcher script from T01 +- `backend/requirements.txt` — updated with watchdog dependency from T01 +- `docker-compose.yml` — existing compose file to modify + +## Expected Output + +- `docker-compose.yml` — updated with chrysopedia-watcher service definition + +## Inputs + +- `backend/watcher.py` +- `backend/requirements.txt` +- `docker-compose.yml` + +## Expected Output + +- `docker-compose.yml` + +## Verification + +ssh ub01 "docker ps --filter name=chrysopedia-watcher --format '{{.Status}}'" | grep -qi 'healthy\|up' diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..39e28d4 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,20 @@ +fastapi>=0.115.0,<1.0 +uvicorn[standard]>=0.32.0,<1.0 +sqlalchemy[asyncio]>=2.0,<3.0 +asyncpg>=0.30.0,<1.0 +alembic>=1.14.0,<2.0 +pydantic>=2.0,<3.0 +pydantic-settings>=2.0,<3.0 +celery[redis]>=5.4.0,<6.0 +redis>=5.0,<6.0 +python-dotenv>=1.0,<2.0 +python-multipart>=0.0.9,<1.0 +httpx>=0.27.0,<1.0 +openai>=1.0,<2.0 +qdrant-client>=1.9,<2.0 +pyyaml>=6.0,<7.0 +psycopg2-binary>=2.9,<3.0 +watchdog>=4.0,<5.0 +# Test dependencies +pytest>=8.0,<10.0 +pytest-asyncio>=0.24,<1.0 diff --git a/backend/watcher.py b/backend/watcher.py new file mode 100644 index 0000000..43c8de3 --- /dev/null +++ b/backend/watcher.py @@ -0,0 +1,244 @@ +"""Transcript folder watcher — monitors a directory for new JSON files and POSTs them to the ingest API. + +Designed to run as a standalone service (or Docker container) that watches for Whisper +transcript JSON files dropped via SCP/rsync and automatically ingests them into Chrysopedia. + +Uses PollingObserver (not inotify) for reliability on ZFS and network-mounted filesystems. + +Environment variables: + WATCH_FOLDER — directory to monitor (default: /watch) + WATCHER_API_URL — ingest endpoint URL (default: http://chrysopedia-api:8000/api/v1/ingest) + WATCHER_STABILITY_SECONDS — seconds file size must be stable before processing (default: 2) + WATCHER_POLL_INTERVAL — seconds between filesystem polls (default: 5) +""" + +import json +import logging +import os +import shutil +import sys +import time +from pathlib import Path + +import httpx +from watchdog.events import FileSystemEventHandler +from watchdog.observers.polling import PollingObserver + +# ── Configuration ──────────────────────────────────────────────────────────── + +WATCH_FOLDER = os.environ.get("WATCH_FOLDER", "/watch") +API_URL = os.environ.get("WATCHER_API_URL", "http://chrysopedia-api:8000/api/v1/ingest") +STABILITY_SECONDS = float(os.environ.get("WATCHER_STABILITY_SECONDS", "2")) +POLL_INTERVAL = float(os.environ.get("WATCHER_POLL_INTERVAL", "5")) + +REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"} + +logger = logging.getLogger("watcher") + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def wait_for_stability(path: str, seconds: float = STABILITY_SECONDS) -> bool: + """Wait until a file's size is unchanged for *seconds* consecutive seconds. + + Returns True when stable, False if the file disappears during the wait. + Polls every 0.5 s. + """ + prev_size = -1 + stable_since: float | None = None + + while True: + try: + current_size = os.path.getsize(path) + except OSError: + logger.warning("File disappeared during stability wait: %s", path) + return False + + if current_size == prev_size: + if stable_since is None: + stable_since = time.monotonic() + elif time.monotonic() - stable_since >= seconds: + return True + else: + prev_size = current_size + stable_since = None + + time.sleep(0.5) + + +def validate_transcript(data: dict) -> str | None: + """Validate that *data* has the required transcript keys. + + Returns None on success, or a human-readable error string on failure. + """ + if not isinstance(data, dict): + return "Top-level JSON value is not an object" + missing = REQUIRED_KEYS - data.keys() + if missing: + return f"Missing required keys: {', '.join(sorted(missing))}" + if not isinstance(data.get("segments"), list): + return "'segments' must be an array" + return None + + +def post_to_ingest(filepath: str) -> tuple[bool, str]: + """Read *filepath*, validate its JSON, and POST it to the ingest API. + + Returns (success: bool, detail: str). + """ + # Read and parse + try: + raw_bytes = Path(filepath).read_bytes() + except OSError as exc: + return False, f"File read error: {exc}" + + if not raw_bytes: + return False, "File is empty" + + try: + data = json.loads(raw_bytes) + except json.JSONDecodeError as exc: + return False, f"JSON parse error: {exc}" + + # Validate structure + error = validate_transcript(data) + if error: + return False, f"Validation error: {error}" + + # POST as multipart upload (field name: "file") + filename = os.path.basename(filepath) + try: + with httpx.Client(timeout=30.0) as client: + response = client.post( + API_URL, + files={"file": (filename, raw_bytes, "application/json")}, + ) + except httpx.TimeoutException: + return False, f"HTTP timeout after 30s contacting {API_URL}" + except httpx.ConnectError as exc: + return False, f"Connection error: {exc}" + except httpx.HTTPError as exc: + return False, f"HTTP error: {exc}" + + if response.status_code >= 400: + body = response.text[:500] # cap sidecar size + return False, f"HTTP {response.status_code}: {body}" + + return True, f"HTTP {response.status_code} — ingested successfully" + + +def move_to_processed(filepath: str, watch_folder: str) -> None: + """Move *filepath* into the ``processed/`` subfolder of *watch_folder*.""" + dest_dir = os.path.join(watch_folder, "processed") + os.makedirs(dest_dir, exist_ok=True) + dest = os.path.join(dest_dir, os.path.basename(filepath)) + shutil.move(filepath, dest) + logger.info("Moved to processed: %s", dest) + + +def move_to_failed(filepath: str, error: str, watch_folder: str) -> None: + """Move *filepath* into the ``failed/`` subfolder and write a ``.error`` sidecar.""" + dest_dir = os.path.join(watch_folder, "failed") + os.makedirs(dest_dir, exist_ok=True) + dest = os.path.join(dest_dir, os.path.basename(filepath)) + shutil.move(filepath, dest) + + sidecar = dest + ".error" + try: + Path(sidecar).write_text(error, encoding="utf-8") + except OSError as exc: + logger.error("Failed to write error sidecar %s: %s", sidecar, exc) + + logger.warning("Moved to failed: %s — %s", dest, error) + + +# ── Watchdog handler ───────────────────────────────────────────────────────── + +class TranscriptHandler(FileSystemEventHandler): + """Handle new ``.json`` files in the watch folder.""" + + def __init__(self, watch_folder: str) -> None: + super().__init__() + self.watch_folder = watch_folder + + def on_created(self, event): # noqa: ANN001 + if event.is_directory: + return + + filepath = event.src_path + + # Only process .json files + if not filepath.lower().endswith(".json"): + return + + # Ignore files inside processed/ or failed/ subdirs + rel = os.path.relpath(filepath, self.watch_folder) + parts = Path(rel).parts + if parts and parts[0] in ("processed", "failed"): + return + + logger.info("Detected new file: %s", filepath) + + # Wait for file to finish writing (SCP/rsync) + if not wait_for_stability(filepath): + logger.warning("File vanished before stable: %s", filepath) + return + + logger.info("File stable, processing: %s", filepath) + + # Attempt ingest + success, detail = post_to_ingest(filepath) + + if success: + logger.info("Ingest succeeded for %s: %s", filepath, detail) + try: + move_to_processed(filepath, self.watch_folder) + except OSError as exc: + logger.error("Failed to move %s to processed/: %s", filepath, exc) + else: + logger.error("Ingest failed for %s: %s", filepath, detail) + try: + move_to_failed(filepath, detail, self.watch_folder) + except OSError as exc: + logger.error("Failed to move %s to failed/: %s", filepath, exc) + + +# ── Main ───────────────────────────────────────────────────────────────────── + +def main() -> None: + """Entry point — start the polling observer and block until interrupted.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + stream=sys.stdout, + ) + + watch_path = os.path.abspath(WATCH_FOLDER) + logger.info("Starting watcher on %s (poll every %.1fs, stability %.1fs)", + watch_path, POLL_INTERVAL, STABILITY_SECONDS) + + # Ensure base dirs exist + os.makedirs(watch_path, exist_ok=True) + os.makedirs(os.path.join(watch_path, "processed"), exist_ok=True) + os.makedirs(os.path.join(watch_path, "failed"), exist_ok=True) + + handler = TranscriptHandler(watch_path) + observer = PollingObserver(timeout=POLL_INTERVAL) + observer.schedule(handler, watch_path, recursive=True) + observer.start() + + logger.info("Watcher running. Press Ctrl+C to stop.") + try: + while observer.is_alive(): + observer.join(timeout=1) + except KeyboardInterrupt: + logger.info("Shutting down watcher.") + finally: + observer.stop() + observer.join() + + logger.info("Watcher stopped.") + + +if __name__ == "__main__": + main()