import type { LibSQLDatabase } from 'drizzle-orm/libsql'; import type * as schema from '../db/schema/index'; import type { PlatformRegistry } from '../sources/platform-source'; import type { QueueService } from './queue'; import { getChannelById } from '../db/repositories/channel-repository'; import { createContentItem } from '../db/repositories/content-repository'; import type { Platform, PlatformContentMetadata } from '../types/index'; // ── Types ── type Db = LibSQLDatabase; export interface ImportResult { found: number; imported: number; skipped: number; } // ── Service ── /** * Fetches all content for a channel from their platform source, * deduplicates against existing content, inserts new items with * 'monitored' status, and enqueues them at priority -10 (below * normal priority 0, so regular scheduled downloads take precedence). */ export class BackCatalogImportService { constructor( private readonly db: Db, private readonly platformRegistry: PlatformRegistry, private readonly queueService: QueueService ) {} /** * Import all content for a channel, deduplicate, insert, and enqueue. * * @param channelId - The channel ID to import for * @param order - 'newest' (natural order) or 'oldest' (reversed, oldest enqueued first) * @returns Counts of found, imported, and skipped (duplicate) items */ async importChannel( channelId: number, order: 'newest' | 'oldest' = 'newest' ): Promise { // 1. Look up channel const channel = await getChannelById(this.db, channelId); if (!channel) { console.log( `[import] Channel ${channelId} not found — aborting import` ); throw new Error(`Channel ${channelId} not found`); } const platform = channel.platform as Platform; console.log( `[import] Starting back-catalog import for channel ${channelId} (${platform}, order=${order})` ); // 2. Get platform source const source = this.platformRegistry.get(platform); if (!source) { console.log( `[import] No platform source registered for ${platform} — aborting import for channel ${channelId}` ); throw new Error(`No platform source for ${platform}`); } // 3. Fetch all content (or fall back to fetchRecentContent with high limit) let allContent: PlatformContentMetadata[]; try { if (source.fetchAllContent) { allContent = await source.fetchAllContent(channel); } else { // Fallback for platforms without fetchAllContent (e.g. SoundCloud) allContent = await source.fetchRecentContent(channel, { limit: 10_000 }); } } catch (err) { const msg = err instanceof Error ? err.message : String(err); console.log( `[import] Failed to fetch content for channel ${channelId} (${platform}): ${msg}` ); throw err; } const found = allContent.length; console.log( `[import] Fetched ${found} items for channel ${channelId} (${platform})` ); // 4. If order === 'oldest', reverse so oldest items get enqueued first if (order === 'oldest') { allContent.reverse(); } // 5. Deduplicate, insert, and enqueue let imported = 0; let skipped = 0; for (const item of allContent) { // Back-catalog is *existing* content, so 'all' and 'existing' → monitored const monitored = channel.monitoringMode === 'all' || channel.monitoringMode === 'existing'; // createContentItem returns null if duplicate (dedup on channelId + platformContentId) const contentItem = await createContentItem(this.db, { channelId: channel.id, title: item.title, platformContentId: item.platformContentId, url: item.url, contentType: item.contentType, duration: item.duration, thumbnailUrl: item.thumbnailUrl, publishedAt: item.publishedAt ?? null, status: monitored ? 'monitored' : 'ignored', monitored, }); if (!contentItem) { skipped++; continue; } imported++; // Enqueue at priority -10 — yields to normal priority (0) items try { await this.queueService.enqueue(contentItem.id, -10); } catch (enqueueErr) { // Individual enqueue failures don't abort the import const msg = enqueueErr instanceof Error ? enqueueErr.message : String(enqueueErr); console.log( `[import] Failed to enqueue content item ${contentItem.id} for channel ${channelId}: ${msg}` ); } } console.log( `[import] Import complete: ${imported} imported, ${skipped} duplicates, ${found} total for channel ${channelId} (${platform})` ); return { found, imported, skipped }; } }