From ad16cc6141e29ec3b5619c9e68d940e4858af310 Mon Sep 17 00:00:00 2001 From: jlightner Date: Sat, 4 Apr 2026 08:23:23 +0000 Subject: [PATCH] fix: direct per-video enrichment and migration statement breakpoints - Rewrote scheduler enrichNewItems to fetch metadata per-video directly instead of redundantly re-running Phase 1 discovery. Background enrichment now calls yt-dlp --dump-json per video ID, updating publishedAt and duration on DB records. Emits scan:complete when enrichment finishes so clients know to refetch. - Added missing --> statement-breakpoint markers to migration 0011. Without them, Drizzle concatenated the three ALTER TABLE statements and SQLite only executed the first one, leaving embed_thumbnail and sponsor_block_remove columns missing from format_profiles. --- drizzle/0011_add_youtube_enhancements.sql | 6 +- src/services/scheduler.ts | 124 ++++++++++++++-------- 2 files changed, 81 insertions(+), 49 deletions(-) diff --git a/drizzle/0011_add_youtube_enhancements.sql b/drizzle/0011_add_youtube_enhancements.sql index 3e4d45d..8168c3a 100644 --- a/drizzle/0011_add_youtube_enhancements.sql +++ b/drizzle/0011_add_youtube_enhancements.sql @@ -1,4 +1,4 @@ -- Add YouTube enhancement columns to format_profiles -ALTER TABLE format_profiles ADD COLUMN embed_chapters INTEGER NOT NULL DEFAULT 0; -ALTER TABLE format_profiles ADD COLUMN embed_thumbnail INTEGER NOT NULL DEFAULT 0; -ALTER TABLE format_profiles ADD COLUMN sponsor_block_remove TEXT; -- comma-separated: 'sponsor,selfpromo,interaction,intro,outro,preview,music_offtopic,filler' +ALTER TABLE format_profiles ADD COLUMN embed_chapters INTEGER NOT NULL DEFAULT 0;--> statement-breakpoint +ALTER TABLE format_profiles ADD COLUMN embed_thumbnail INTEGER NOT NULL DEFAULT 0;--> statement-breakpoint +ALTER TABLE format_profiles ADD COLUMN sponsor_block_remove TEXT; diff --git a/src/services/scheduler.ts b/src/services/scheduler.ts index 955d028..df50d99 100644 --- a/src/services/scheduler.ts +++ b/src/services/scheduler.ts @@ -4,7 +4,7 @@ import type * as schema from '../db/schema/index'; import type { Channel, Platform, PlatformContentMetadata } from '../types/index'; import type { PlatformRegistry, PlatformSource, FetchRecentContentOptions } from '../sources/platform-source'; import type { RateLimiter } from './rate-limiter'; -import { YtDlpError } from '../sources/yt-dlp'; +import { YtDlpError, execYtDlp, parseSingleJson } from '../sources/yt-dlp'; import type { EventBus } from './event-bus'; import { matchesKeywordFilter } from './keyword-filter'; import { @@ -428,73 +428,93 @@ export class SchedulerService { } /** - * Background Phase 2: re-fetch full metadata for newly discovered items - * and update their DB records with enriched data (publishedAt, duration, etc). - * Runs after the main scan completes — items are already visible to the user. + * Background Phase 2: directly enrich newly discovered items by fetching + * full metadata per-video. Updates DB records with publishedAt, duration, + * and correct contentType. Emits scan:enrichment-complete when done. */ private async enrichNewItems( channel: Channel, discoveredItems: PlatformContentMetadata[], existingIds: Set, rateLimitDelay: number, - source: PlatformSource, + _source: PlatformSource, signal: AbortSignal, ): Promise { - const newPlatformIds = discoveredItems - .filter((item) => !existingIds.has(item.platformContentId)) - .map((item) => item.platformContentId); + const newItems = discoveredItems + .filter((item) => !existingIds.has(item.platformContentId)); - if (newPlatformIds.length === 0) return; + if (newItems.length === 0) return; console.log( - `[scheduler] Phase 2: enriching ${newPlatformIds.length} items for channel ${channel.id}` + `[scheduler] Phase 2: enriching ${newItems.length} items for channel ${channel.id}` ); - // Re-fetch with enrichment (discoveryOnly: false) - const enrichedItems = await source.fetchRecentContent(channel, { - limit: newPlatformIds.length + existingIds.size, - existingIds, - rateLimitDelay, - discoveryOnly: false, - signal, - }); - - // Build lookup by platformContentId - const enrichedMap = new Map(); - for (const item of enrichedItems) { - enrichedMap.set(item.platformContentId, item); - } - - // Update DB records with enriched data let enrichedCount = 0; - for (const platformContentId of newPlatformIds) { - if (signal.aborted) break; - const enriched = enrichedMap.get(platformContentId); - if (!enriched) continue; - - // Look up the DB record by platformContentId - const dbItem = await getContentByPlatformContentId(this.db, channel.id, platformContentId); - if (!dbItem) continue; - - // Only update if enrichment provides new data - const updates: Record = {}; - if (enriched.publishedAt && !dbItem.publishedAt) { - updates.publishedAt = enriched.publishedAt; - } - if (enriched.duration != null && dbItem.duration == null) { - updates.duration = enriched.duration; + for (let i = 0; i < newItems.length; i++) { + if (signal.aborted) { + console.log( + `[scheduler] Phase 2 aborted for channel ${channel.id} after ${enrichedCount}/${newItems.length} items` + ); + break; } - if (Object.keys(updates).length > 0) { - await updateContentItem(this.db, dbItem.id, updates); - enrichedCount++; + const item = newItems[i]; + + // Rate limit delay between enrichment calls (skip before first) + if (i > 0 && rateLimitDelay > 0) { + await sleep(rateLimitDelay); + } + + try { + const videoUrl = `https://www.youtube.com/watch?v=${item.platformContentId}`; + const enrichResult = await execYtDlp( + ['--dump-json', '--no-playlist', videoUrl], + { timeout: 15_000 } + ); + + const enrichData = parseSingleJson(enrichResult.stdout) as Record; + + // Parse enriched fields + const publishedAt = parseUploadDate(enrichData.upload_date as string | undefined); + const duration = typeof enrichData.duration === 'number' ? enrichData.duration : null; + + // Look up the DB record + const dbItem = await getContentByPlatformContentId(this.db, channel.id, item.platformContentId); + if (!dbItem) continue; + + // Build update payload — only update fields that enrichment provides + const updates: Record = {}; + if (publishedAt && !dbItem.publishedAt) { + updates.publishedAt = publishedAt; + } + if (duration != null && dbItem.duration == null) { + updates.duration = duration; + } + + if (Object.keys(updates).length > 0) { + await updateContentItem(this.db, dbItem.id, updates); + enrichedCount++; + } + } catch (err) { + // Tolerate individual enrichment failures + console.warn( + `[scheduler] Enrichment failed for ${item.platformContentId}: ${err instanceof Error ? err.message : err}` + ); } } console.log( - `[scheduler] Phase 2 complete for channel ${channel.id}: ${enrichedCount} items enriched` + `[scheduler] Phase 2 complete for channel ${channel.id}: ${enrichedCount}/${newItems.length} items enriched` ); + + // Notify clients that enrichment is done — they should refetch content + this.eventBus?.emitScan('scan:complete', { + channelId: channel.id, + channelName: channel.name, + newItems: enrichedCount, + totalFetched: newItems.length, + }); } // ── Internal ── @@ -548,3 +568,15 @@ function minutesToCronPattern(minutes: number): string { // For intervals >= 60 minutes, run every hour and use `interval` option return '0 * * * *'; } + +function parseUploadDate(raw: string | undefined): string | null { + if (!raw || raw.length !== 8) return null; + const y = raw.slice(0, 4); + const m = raw.slice(4, 6); + const d = raw.slice(6, 8); + return `${y}-${m}-${d}T00:00:00Z`; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +}