fix: direct per-video enrichment and migration statement breakpoints
All checks were successful
CI / test (push) Successful in 20s

- Rewrote scheduler enrichNewItems to fetch metadata per-video directly
  instead of redundantly re-running Phase 1 discovery. Background enrichment
  now calls yt-dlp --dump-json per video ID, updating publishedAt and
  duration on DB records. Emits scan:complete when enrichment finishes
  so clients know to refetch.

- Added missing --> statement-breakpoint markers to migration 0011.
  Without them, Drizzle concatenated the three ALTER TABLE statements
  and SQLite only executed the first one, leaving embed_thumbnail and
  sponsor_block_remove columns missing from format_profiles.
This commit is contained in:
jlightner 2026-04-04 08:23:23 +00:00
parent 8d77ae248a
commit ad16cc6141
2 changed files with 81 additions and 49 deletions

View file

@ -1,4 +1,4 @@
-- Add YouTube enhancement columns to format_profiles -- Add YouTube enhancement columns to format_profiles
ALTER TABLE format_profiles ADD COLUMN embed_chapters INTEGER NOT NULL DEFAULT 0; ALTER TABLE format_profiles ADD COLUMN embed_chapters INTEGER NOT NULL DEFAULT 0;--> statement-breakpoint
ALTER TABLE format_profiles ADD COLUMN embed_thumbnail INTEGER NOT NULL DEFAULT 0; ALTER TABLE format_profiles ADD COLUMN embed_thumbnail INTEGER NOT NULL DEFAULT 0;--> statement-breakpoint
ALTER TABLE format_profiles ADD COLUMN sponsor_block_remove TEXT; -- comma-separated: 'sponsor,selfpromo,interaction,intro,outro,preview,music_offtopic,filler' ALTER TABLE format_profiles ADD COLUMN sponsor_block_remove TEXT;

View file

@ -4,7 +4,7 @@ import type * as schema from '../db/schema/index';
import type { Channel, Platform, PlatformContentMetadata } from '../types/index'; import type { Channel, Platform, PlatformContentMetadata } from '../types/index';
import type { PlatformRegistry, PlatformSource, FetchRecentContentOptions } from '../sources/platform-source'; import type { PlatformRegistry, PlatformSource, FetchRecentContentOptions } from '../sources/platform-source';
import type { RateLimiter } from './rate-limiter'; import type { RateLimiter } from './rate-limiter';
import { YtDlpError } from '../sources/yt-dlp'; import { YtDlpError, execYtDlp, parseSingleJson } from '../sources/yt-dlp';
import type { EventBus } from './event-bus'; import type { EventBus } from './event-bus';
import { matchesKeywordFilter } from './keyword-filter'; import { matchesKeywordFilter } from './keyword-filter';
import { import {
@ -428,73 +428,93 @@ export class SchedulerService {
} }
/** /**
* Background Phase 2: re-fetch full metadata for newly discovered items * Background Phase 2: directly enrich newly discovered items by fetching
* and update their DB records with enriched data (publishedAt, duration, etc). * full metadata per-video. Updates DB records with publishedAt, duration,
* Runs after the main scan completes items are already visible to the user. * and correct contentType. Emits scan:enrichment-complete when done.
*/ */
private async enrichNewItems( private async enrichNewItems(
channel: Channel, channel: Channel,
discoveredItems: PlatformContentMetadata[], discoveredItems: PlatformContentMetadata[],
existingIds: Set<string>, existingIds: Set<string>,
rateLimitDelay: number, rateLimitDelay: number,
source: PlatformSource, _source: PlatformSource,
signal: AbortSignal, signal: AbortSignal,
): Promise<void> { ): Promise<void> {
const newPlatformIds = discoveredItems const newItems = discoveredItems
.filter((item) => !existingIds.has(item.platformContentId)) .filter((item) => !existingIds.has(item.platformContentId));
.map((item) => item.platformContentId);
if (newPlatformIds.length === 0) return; if (newItems.length === 0) return;
console.log( console.log(
`[scheduler] Phase 2: enriching ${newPlatformIds.length} items for channel ${channel.id}` `[scheduler] Phase 2: enriching ${newItems.length} items for channel ${channel.id}`
); );
// Re-fetch with enrichment (discoveryOnly: false) let enrichedCount = 0;
const enrichedItems = await source.fetchRecentContent(channel, {
limit: newPlatformIds.length + existingIds.size,
existingIds,
rateLimitDelay,
discoveryOnly: false,
signal,
});
// Build lookup by platformContentId for (let i = 0; i < newItems.length; i++) {
const enrichedMap = new Map<string, PlatformContentMetadata>(); if (signal.aborted) {
for (const item of enrichedItems) { console.log(
enrichedMap.set(item.platformContentId, item); `[scheduler] Phase 2 aborted for channel ${channel.id} after ${enrichedCount}/${newItems.length} items`
);
break;
} }
// Update DB records with enriched data const item = newItems[i];
let enrichedCount = 0;
for (const platformContentId of newPlatformIds) {
if (signal.aborted) break;
const enriched = enrichedMap.get(platformContentId); // Rate limit delay between enrichment calls (skip before first)
if (!enriched) continue; if (i > 0 && rateLimitDelay > 0) {
await sleep(rateLimitDelay);
}
// Look up the DB record by platformContentId try {
const dbItem = await getContentByPlatformContentId(this.db, channel.id, platformContentId); const videoUrl = `https://www.youtube.com/watch?v=${item.platformContentId}`;
const enrichResult = await execYtDlp(
['--dump-json', '--no-playlist', videoUrl],
{ timeout: 15_000 }
);
const enrichData = parseSingleJson(enrichResult.stdout) as Record<string, unknown>;
// Parse enriched fields
const publishedAt = parseUploadDate(enrichData.upload_date as string | undefined);
const duration = typeof enrichData.duration === 'number' ? enrichData.duration : null;
// Look up the DB record
const dbItem = await getContentByPlatformContentId(this.db, channel.id, item.platformContentId);
if (!dbItem) continue; if (!dbItem) continue;
// Only update if enrichment provides new data // Build update payload — only update fields that enrichment provides
const updates: Record<string, unknown> = {}; const updates: Record<string, unknown> = {};
if (enriched.publishedAt && !dbItem.publishedAt) { if (publishedAt && !dbItem.publishedAt) {
updates.publishedAt = enriched.publishedAt; updates.publishedAt = publishedAt;
} }
if (enriched.duration != null && dbItem.duration == null) { if (duration != null && dbItem.duration == null) {
updates.duration = enriched.duration; updates.duration = duration;
} }
if (Object.keys(updates).length > 0) { if (Object.keys(updates).length > 0) {
await updateContentItem(this.db, dbItem.id, updates); await updateContentItem(this.db, dbItem.id, updates);
enrichedCount++; enrichedCount++;
} }
} catch (err) {
// Tolerate individual enrichment failures
console.warn(
`[scheduler] Enrichment failed for ${item.platformContentId}: ${err instanceof Error ? err.message : err}`
);
}
} }
console.log( console.log(
`[scheduler] Phase 2 complete for channel ${channel.id}: ${enrichedCount} items enriched` `[scheduler] Phase 2 complete for channel ${channel.id}: ${enrichedCount}/${newItems.length} items enriched`
); );
// Notify clients that enrichment is done — they should refetch content
this.eventBus?.emitScan('scan:complete', {
channelId: channel.id,
channelName: channel.name,
newItems: enrichedCount,
totalFetched: newItems.length,
});
} }
// ── Internal ── // ── Internal ──
@ -548,3 +568,15 @@ function minutesToCronPattern(minutes: number): string {
// For intervals >= 60 minutes, run every hour and use `interval` option // For intervals >= 60 minutes, run every hour and use `interval` option
return '0 * * * *'; return '0 * * * *';
} }
function parseUploadDate(raw: string | undefined): string | null {
if (!raw || raw.length !== 8) return null;
const y = raw.slice(0, 4);
const m = raw.slice(4, 6);
const d = raw.slice(6, 8);
return `${y}-${m}-${d}T00:00:00Z`;
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}