import { Cron } from 'croner'; import type { LibSQLDatabase } from 'drizzle-orm/libsql'; import type * as schema from '../db/schema/index'; import type { Channel, Platform, PlatformContentMetadata } from '../types/index'; import type { PlatformRegistry, PlatformSource, FetchRecentContentOptions } from '../sources/platform-source'; import type { RateLimiter } from './rate-limiter'; import { YtDlpError, execYtDlp, parseSingleJson } from '../sources/yt-dlp'; import type { EventBus } from './event-bus'; import { matchesKeywordFilter } from './keyword-filter'; import { getEnabledChannels, updateChannel, } from '../db/repositories/channel-repository'; import { createContentItem, getRecentContentIds, getContentByPlatformContentId, updateContentItem, } from '../db/repositories/content-repository'; import { getPlatformSettings } from '../db/repositories/platform-settings-repository'; import { upsertPlaylists } from '../db/repositories/playlist-repository'; // ── Types ── export interface ChannelJobState { channelId: number; channelName: string; platform: Platform; isRunning: boolean; nextRun: Date | null; lastCheckedAt: string | null; lastCheckStatus: string | null; } export interface SchedulerState { running: boolean; channelCount: number; channels: ChannelJobState[]; } export interface CheckChannelResult { channelId: number; channelName: string; newItems: number; totalFetched: number; status: 'success' | 'error' | 'rate_limited' | 'already_running'; } /** Optional configuration for the scheduler service. */ export interface SchedulerOptions { /** Called when a new content item is inserted — used to auto-enqueue for download. */ onNewContent?: (contentItemId: number) => void; /** Event bus for broadcasting scan lifecycle events to WebSocket clients. */ eventBus?: EventBus; } // ── Scheduler Service ── /** * Manages per-channel cron jobs for content monitoring. * * Loads enabled channels from the database, creates Cron jobs that periodically * check for new content via platform sources, deduplicates against existing * records, and inserts new items with `monitored` status. */ export class SchedulerService { private readonly db: LibSQLDatabase; private readonly platformRegistry: PlatformRegistry; private readonly rateLimiter: RateLimiter; private readonly onNewContent?: (contentItemId: number) => void; private readonly eventBus?: EventBus; private readonly jobs = new Map(); private readonly channelCache = new Map(); private readonly activeChecks = new Set(); private readonly activeAbortControllers = new Map(); private running = false; constructor( db: LibSQLDatabase, platformRegistry: PlatformRegistry, rateLimiter: RateLimiter, options?: SchedulerOptions ) { this.db = db; this.platformRegistry = platformRegistry; this.rateLimiter = rateLimiter; this.onNewContent = options?.onNewContent; this.eventBus = options?.eventBus; } /** * Load all enabled channels and create cron jobs for each. * Returns the number of channels loaded. */ async start(): Promise { const channels = await getEnabledChannels(this.db); for (const channel of channels) { this.createJob(channel); } this.running = true; console.log(`[scheduler] Started with ${channels.length} channels`); return channels.length; } /** Stop all active cron jobs. */ stop(): void { for (const [channelId, job] of this.jobs) { job.stop(); console.log(`[scheduler] Stopped job for channel ${channelId}`); } this.jobs.clear(); this.channelCache.clear(); this.running = false; console.log('[scheduler] Stopped'); } /** Create a cron job for a newly added channel. */ addChannel(channel: Channel): void { if (!channel.monitoringEnabled) return; this.createJob(channel); console.log( `[scheduler] Added job for channel ${channel.id} ("${channel.name}") — interval: ${channel.checkInterval}m` ); } /** Stop and remove the cron job for a deleted channel. */ removeChannel(channelId: number): void { const job = this.jobs.get(channelId); if (job) { job.stop(); this.jobs.delete(channelId); this.channelCache.delete(channelId); console.log(`[scheduler] Removed job for channel ${channelId}`); } } /** Update a channel's cron job (remove old, create new with updated interval). */ updateChannel(channel: Channel): void { this.removeChannel(channel.id); if (channel.monitoringEnabled) { this.createJob(channel); console.log( `[scheduler] Updated job for channel ${channel.id} ("${channel.name}") — interval: ${channel.checkInterval}m` ); } } /** * Check a channel for new content. * * 1. Check per-channel lock (reject overlap) * 2. Acquire rate limiter slot for the platform * 3. Fetch recent content via platform source * 4. Deduplicate against existing content * 5. Insert new items with `monitored` status * 6. Update channel's lastCheckedAt and lastCheckStatus * * Returns a structured result with item counts and status. */ async checkChannel(channel: Channel, signal?: AbortSignal): Promise { // Per-channel lock — reject overlap before any async work if (this.activeChecks.has(channel.id)) { console.log( `[scheduler] Skipping channel ${channel.id} ("${channel.name}") — already running` ); return { channelId: channel.id, channelName: channel.name, newItems: 0, totalFetched: 0, status: 'already_running', }; } // Create AbortController for this scan if no external signal provided const abortController = new AbortController(); const effectiveSignal = signal ?? abortController.signal; // Link external signal to our controller if provided if (signal) { signal.addEventListener('abort', () => abortController.abort(signal.reason), { once: true }); } this.activeChecks.add(channel.id); this.activeAbortControllers.set(channel.id, abortController); // Emit scan:started before any async work this.eventBus?.emitScan('scan:started', { channelId: channel.id, channelName: channel.name, }); console.log( `[scheduler] Checking channel ${channel.id} ("${channel.name}") on ${channel.platform}` ); try { // 1. Rate limit await this.rateLimiter.acquire(channel.platform); // 2. Get platform source const source = this.platformRegistry.get(channel.platform); if (!source) { throw new Error( `No platform source registered for "${channel.platform}"` ); } // 3. Load platform settings for scan limit and rate limit delay const platformSettingsRow = await getPlatformSettings(this.db, channel.platform); const baseScanLimit = platformSettingsRow?.scanLimit ?? 500; const rateLimitDelay = platformSettingsRow?.rateLimitDelay ?? 1000; // First scan (lastCheckedAt === null) → grab full catalog up to 999 const scanLimit = channel.lastCheckedAt === null ? Math.max(baseScanLimit, 999) : baseScanLimit; // 4. Load existing content IDs for dedup gating const existingIds = new Set( await getRecentContentIds(this.db, channel.id) ); // 5. Fetch content — discovery-only (fast Phase 1, skip slow enrichment) const fetchOptions: FetchRecentContentOptions = { limit: scanLimit, existingIds, rateLimitDelay, discoveryOnly: true, signal: effectiveSignal, }; const items: PlatformContentMetadata[] = await source.fetchRecentContent(channel, fetchOptions); // 6. Deduplicate — filter out items already known const newItems = items.filter( (item) => !existingIds.has(item.platformContentId) ); // 6b. Apply keyword filter — exclude/include patterns from channel settings const filteredItems = newItems.filter((item) => matchesKeywordFilter(item.title, channel.includeKeywords, channel.excludeKeywords) ); if (filteredItems.length < newItems.length) { console.log( `[scheduler] Keyword filter: ${newItems.length - filteredItems.length} of ${newItems.length} new items filtered out for channel ${channel.id}` ); } // 7. Insert new items (check abort between each) let insertedCount = 0; for (const item of filteredItems) { // Check if scan was cancelled if (effectiveSignal.aborted) { console.log( `[scheduler] Scan cancelled for channel ${channel.id} ("${channel.name}") after ${insertedCount} items` ); this.eventBus?.emitScan('scan:complete', { channelId: channel.id, channelName: channel.name, newItems: insertedCount, totalFetched: items.length, }); return { channelId: channel.id, channelName: channel.name, newItems: insertedCount, totalFetched: items.length, status: 'success', }; } // Scheduler discovers *new* content (future), so 'all' and 'future' → monitored const monitored = channel.monitoringMode === 'all' || channel.monitoringMode === 'future'; const created = await createContentItem(this.db, { channelId: channel.id, title: item.title, platformContentId: item.platformContentId, url: item.url, contentType: item.contentType, duration: item.duration, thumbnailUrl: item.thumbnailUrl, publishedAt: item.publishedAt ?? null, status: 'monitored', monitored, }); if (created) { insertedCount++; // Broadcast the new item to WebSocket clients this.eventBus?.emitScan('scan:item-discovered', { channelId: channel.id, channelName: channel.name, item: created, }); // Only auto-enqueue monitored items if (this.onNewContent && created.monitored) { this.onNewContent(created.id); } } } // 8. Update channel status await updateChannel(this.db, channel.id, { lastCheckedAt: new Date().toISOString(), lastCheckStatus: 'success', }); this.rateLimiter.reportSuccess(channel.platform); console.log( `[scheduler] Check complete for channel ${channel.id}: ${insertedCount} new items (${items.length} fetched, ${existingIds.size} existing)` ); this.eventBus?.emitScan('scan:complete', { channelId: channel.id, channelName: channel.name, newItems: insertedCount, totalFetched: items.length, }); // 9. Background Phase 2: enrich newly inserted items with full metadata // This runs after the scan result is returned — enrichment updates DB records // and triggers a final cache invalidation when done. if (insertedCount > 0 && !effectiveSignal.aborted) { this.enrichNewItems(channel, filteredItems, existingIds, rateLimitDelay, source, effectiveSignal) .catch((err) => { console.error( `[scheduler] Background enrichment failed for channel ${channel.id}:`, err instanceof Error ? err.message : err ); }); } // 10. Best-effort playlist sync (K011 sidecar pattern) // Runs after content scan succeeds — failure never affects the scan result. if (source.fetchPlaylists && !effectiveSignal.aborted) { this.syncPlaylists(channel, source) .catch((err) => { console.error( `[scheduler] Playlist sync failed for channel ${channel.id}:`, err instanceof Error ? err.message : err ); }); } return { channelId: channel.id, channelName: channel.name, newItems: insertedCount, totalFetched: items.length, status: 'success', }; } catch (err) { // Determine status based on error type const isRateLimit = err instanceof YtDlpError && err.isRateLimit; const status = isRateLimit ? 'rate_limited' : 'error'; // Update channel status try { await updateChannel(this.db, channel.id, { lastCheckedAt: new Date().toISOString(), lastCheckStatus: status, }); } catch (updateErr) { console.error( `[scheduler] Failed to update status for channel ${channel.id}:`, updateErr ); } this.rateLimiter.reportError(channel.platform); this.eventBus?.emitScan('scan:error', { channelId: channel.id, channelName: channel.name, error: err instanceof Error ? err.message : String(err), }); console.error( `[scheduler] Check failed for channel ${channel.id} ("${channel.name}"): ${status}`, err instanceof Error ? err.message : err ); return { channelId: channel.id, channelName: channel.name, newItems: 0, totalFetched: 0, status, }; } finally { this.activeChecks.delete(channel.id); this.activeAbortControllers.delete(channel.id); } } /** * Check whether a channel scan is currently in progress. */ isScanning(channelId: number): boolean { return this.activeChecks.has(channelId); } /** * Cancel an in-progress scan for a channel. * Returns true if a scan was running and was cancelled. */ cancelScan(channelId: number): boolean { const controller = this.activeAbortControllers.get(channelId); if (!controller) return false; controller.abort('User cancelled'); console.log(`[scheduler] Scan cancel requested for channel ${channelId}`); return true; } /** * Get the current state of the scheduler for diagnostic inspection. */ getState(): SchedulerState { const channels: ChannelJobState[] = []; for (const [channelId, job] of this.jobs) { const channel = this.channelCache.get(channelId); channels.push({ channelId, channelName: channel?.name ?? 'unknown', platform: (channel?.platform ?? 'unknown') as Platform, isRunning: job.isBusy(), nextRun: job.nextRun() ?? null, lastCheckedAt: channel?.lastCheckedAt ?? null, lastCheckStatus: channel?.lastCheckStatus ?? null, }); } return { running: this.running, channelCount: this.jobs.size, channels, }; } /** * Background Phase 2: directly enrich newly discovered items by fetching * full metadata per-video. Updates DB records with publishedAt, duration, * and correct contentType. Emits scan:enrichment-complete when done. */ private async enrichNewItems( channel: Channel, discoveredItems: PlatformContentMetadata[], existingIds: Set, rateLimitDelay: number, _source: PlatformSource, signal: AbortSignal, ): Promise { const newItems = discoveredItems .filter((item) => !existingIds.has(item.platformContentId)); if (newItems.length === 0) return; console.log( `[scheduler] Phase 2: enriching ${newItems.length} items for channel ${channel.id}` ); let enrichedCount = 0; for (let i = 0; i < newItems.length; i++) { if (signal.aborted) { console.log( `[scheduler] Phase 2 aborted for channel ${channel.id} after ${enrichedCount}/${newItems.length} items` ); break; } const item = newItems[i]; // Rate limit delay between enrichment calls (skip before first) if (i > 0 && rateLimitDelay > 0) { await sleep(rateLimitDelay); } try { // Use the item's original URL (platform-agnostic) instead of // hardcoding YouTube — SoundCloud / generic items have their own URLs. const contentUrl = item.url || `https://www.youtube.com/watch?v=${item.platformContentId}`; const enrichResult = await execYtDlp( ['--dump-json', '--no-playlist', contentUrl], { timeout: 15_000 } ); const enrichData = parseSingleJson(enrichResult.stdout) as Record; // Parse enriched fields const publishedAt = parseUploadDate(enrichData.upload_date as string | undefined); const duration = typeof enrichData.duration === 'number' ? enrichData.duration : null; // Look up the DB record const dbItem = await getContentByPlatformContentId(this.db, channel.id, item.platformContentId); if (!dbItem) continue; // Build update payload — only update fields that enrichment provides const updates: Record = {}; if (publishedAt && !dbItem.publishedAt) { updates.publishedAt = publishedAt; } if (duration != null && dbItem.duration == null) { updates.duration = duration; } if (Object.keys(updates).length > 0) { await updateContentItem(this.db, dbItem.id, updates); enrichedCount++; } } catch (err) { // Tolerate individual enrichment failures console.warn( `[scheduler] Enrichment failed for ${item.platformContentId}: ${err instanceof Error ? err.message : err}` ); } } console.log( `[scheduler] Phase 2 complete for channel ${channel.id}: ${enrichedCount}/${newItems.length} items enriched` ); // Notify clients that enrichment is done — they should refetch content this.eventBus?.emitScan('scan:complete', { channelId: channel.id, channelName: channel.name, newItems: enrichedCount, totalFetched: newItems.length, }); } /** * Best-effort playlist sync for a channel. * Fetches playlists from the platform source and upserts them into the DB. * Failure is logged but never propagated (K011 sidecar pattern). */ private async syncPlaylists(channel: Channel, source: PlatformSource): Promise { if (!source.fetchPlaylists) return; try { const discoveryResults = await source.fetchPlaylists(channel); if (discoveryResults.length === 0) { console.log(`[scheduler] Playlist sync: no playlists found for channel ${channel.id}`); return; } const upserted = await upsertPlaylists(this.db, channel.id, discoveryResults); console.log( `[scheduler] Playlist sync complete for channel ${channel.id}: ${upserted.length} playlists synced` ); } catch (err) { console.error( `[scheduler] Playlist sync error for channel ${channel.id}:`, err instanceof Error ? err.message : err ); } } // ── Internal ── /** * Create a Cron job for a channel. * Uses the interval option for arbitrary check intervals. */ private createJob(channel: Channel): void { const intervalSeconds = channel.checkInterval * 60; const cronPattern = minutesToCronPattern(channel.checkInterval); const job = new Cron( cronPattern, { protect: true, // Prevent overlapping runs interval: intervalSeconds, // Minimum seconds between runs }, async () => { // Refresh channel from cache (it may have been updated) const current = this.channelCache.get(channel.id) ?? channel; await this.checkChannel(current); } ); this.jobs.set(channel.id, job); this.channelCache.set(channel.id, channel); } } // ── Helpers ── /** * Convert a check interval in minutes to a cron pattern. * For intervals that divide evenly into 60, use `* /{n} * * * *`. * For other intervals, use the closest reasonable pattern and rely on * croner's `interval` option for exact timing. */ function minutesToCronPattern(minutes: number): string { if (minutes <= 0) return '*/5 * * * *'; // Fallback: every 5 minutes if (minutes < 60 && 60 % minutes === 0) { return `*/${minutes} * * * *`; } if (minutes === 60) { return '0 * * * *'; // Every hour } if (minutes < 60) { // Arbitrary sub-hour interval — run every minute, use `interval` option return '* * * * *'; } // For intervals >= 60 minutes, run every hour and use `interval` option return '0 * * * *'; } function parseUploadDate(raw: string | undefined): string | null { if (!raw || raw.length !== 8) return null; const y = raw.slice(0, 4); const m = raw.slice(4, 6); const d = raw.slice(6, 8); return `${y}-${m}-${d}T00:00:00Z`; } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); }