import type { LibSQLDatabase } from 'drizzle-orm/libsql'; import type * as schema from '../db/schema/index'; import type { DownloadService } from './download'; import { createQueueItem, getQueueItemById, getQueueItemsByStatus, getPendingQueueItems, updateQueueItemStatus, countQueueItemsByStatus, getQueueItemByContentItemId, } from '../db/repositories/queue-repository'; import { createHistoryEvent } from '../db/repositories/history-repository'; import { getContentItemById, updateContentItem } from '../db/repositories/content-repository'; import { getChannelById } from '../db/repositories/channel-repository'; import { getFormatProfileById, getDefaultFormatProfile } from '../db/repositories/format-profile-repository'; import type { QueueItem, QueueStatus } from '../types/index'; import type { NotificationEvent } from './notification'; import { appConfig } from '../config/index'; // ── Types ── type Db = LibSQLDatabase; export interface QueueServiceOptions { concurrency?: number; onDownloadComplete?: (event: NotificationEvent) => void; onDownloadFailed?: (event: NotificationEvent) => void; } export interface QueueState { pending: number; downloading: number; completed: number; failed: number; cancelled: number; paused: number; } // ── QueueService ── /** * Orchestrates the download queue lifecycle: enqueue, process with concurrency * control, retry on failure, cancel, pause/resume, and recover interrupted items on startup. * * Status transitions: * pending → downloading → completed | failed * failed → pending (retry) or failed (max attempts exhausted) * pending | failed → cancelled * pending | downloading → paused * paused → pending (resume) * * Concurrency is managed via an in-memory counter — Node's single-threaded * event loop ensures processNext() is not re-entrant within a single tick. */ export class QueueService { private activeCount = 0; private stopped = false; private concurrency: number; private readonly onDownloadComplete?: (event: NotificationEvent) => void; private readonly onDownloadFailed?: (event: NotificationEvent) => void; /** Maps queueItemId → AbortController for in-flight downloads (used by pause to cancel yt-dlp). */ private readonly activeAbortControllers = new Map(); constructor( private readonly db: Db, private readonly downloadService: DownloadService, options?: QueueServiceOptions | number ) { // Support both old (concurrency number) and new (options object) signatures if (typeof options === 'number') { this.concurrency = options; } else { this.concurrency = options?.concurrency ?? appConfig.concurrentDownloads; this.onDownloadComplete = options?.onDownloadComplete; this.onDownloadFailed = options?.onDownloadFailed; } } // ── Public API ── /** * Update the concurrency limit at runtime. * Immediately tries to pick up pending items if concurrency increased. */ setConcurrency(n: number): void { this.concurrency = n; console.log(`[queue] concurrency updated to ${n}`); this.processNext(); } /** * Enqueue a content item for download. Creates a queue item, updates the * content status to 'queued', records a 'grabbed' history event, and kicks * off processing. * * @throws Error if the content item is already queued or downloading. */ async enqueue(contentItemId: number, priority = 0): Promise { // Dedup check — don't allow double-enqueue const existing = await getQueueItemByContentItemId(this.db, contentItemId); if (existing) { const activeStatuses: QueueStatus[] = ['pending', 'downloading']; if (activeStatuses.includes(existing.status)) { throw new Error( `Content item ${contentItemId} is already in the queue with status '${existing.status}'` ); } } // Look up the content item so we can set channel info on history const contentItem = await getContentItemById(this.db, contentItemId); if (!contentItem) { throw new Error(`Content item ${contentItemId} not found`); } // Create queue item const queueItem = await createQueueItem(this.db, { contentItemId, priority, }); // Update content status to queued await updateContentItem(this.db, contentItemId, { status: 'queued' }); // Record grabbed history event await createHistoryEvent(this.db, { contentItemId, channelId: contentItem.channelId, eventType: 'grabbed', status: 'pending', details: { queueItemId: queueItem.id, title: contentItem.title }, }); console.log( `[queue] enqueue queueId=${queueItem.id} contentId=${contentItemId} status=pending priority=${priority}` ); // Kick off processing this.processNext(); return queueItem; } /** * Synchronous entry point that picks up pending items up to the concurrency * limit and fires off async processing for each. */ processNext(): void { if (this.stopped) return; const slots = this.concurrency - this.activeCount; if (slots <= 0) return; // Fetch pending items — async but we fire-and-forget getPendingQueueItems(this.db, slots).then((items) => { for (const item of items) { if (this.stopped) break; if (this.activeCount >= this.concurrency) break; this.activeCount++; this.processItem(item).catch((err) => { console.log( `[queue] unhandled error processing queueId=${item.id}: ${err instanceof Error ? err.message : String(err)}` ); }); } }).catch((err) => { console.log( `[queue] error fetching pending items: ${err instanceof Error ? err.message : String(err)}` ); }); } /** * Retry a failed queue item. Resets it to pending if under maxAttempts. * * @throws Error if item not found, not in failed status, or attempts exhausted. */ async retryItem(queueItemId: number): Promise { const item = await getQueueItemById(this.db, queueItemId); if (!item) { throw new Error(`Queue item ${queueItemId} not found`); } if (item.status !== 'failed') { throw new Error( `Cannot retry queue item ${queueItemId} — status is '${item.status}', expected 'failed'` ); } if (item.attempts >= item.maxAttempts) { throw new Error( `Cannot retry queue item ${queueItemId} — attempts (${item.attempts}) >= maxAttempts (${item.maxAttempts})` ); } const updated = await updateQueueItemStatus(this.db, queueItemId, 'pending', { error: null, }); // Reset content status to queued await updateContentItem(this.db, item.contentItemId, { status: 'queued' }); console.log( `[queue] retry queueId=${queueItemId} contentId=${item.contentItemId} status=pending attempts=${item.attempts}/${item.maxAttempts}` ); this.processNext(); return updated!; } /** * Cancel a queue item. Only pending or failed items can be cancelled. * * @throws Error if item not found or not in a cancellable status. */ async cancelItem(queueItemId: number): Promise { const item = await getQueueItemById(this.db, queueItemId); if (!item) { throw new Error(`Queue item ${queueItemId} not found`); } const cancellable: QueueStatus[] = ['pending', 'failed']; if (!cancellable.includes(item.status)) { throw new Error( `Cannot cancel queue item ${queueItemId} — status is '${item.status}', must be 'pending' or 'failed'` ); } const updated = await updateQueueItemStatus(this.db, queueItemId, 'cancelled'); console.log( `[queue] cancel queueId=${queueItemId} contentId=${item.contentItemId} status=cancelled` ); return updated!; } /** * Pause a queue item. Pending items are set to 'paused' immediately. * Downloading items have their yt-dlp process killed and are set to 'paused'. * * @throws Error if item not found or not in a pausable status. */ async pauseItem(queueItemId: number): Promise { const item = await getQueueItemById(this.db, queueItemId); if (!item) { throw new Error(`Queue item ${queueItemId} not found`); } const pausable: QueueStatus[] = ['pending', 'downloading']; if (!pausable.includes(item.status)) { throw new Error( `Cannot pause queue item ${queueItemId} — status is '${item.status}', must be 'pending' or 'downloading'` ); } // If downloading, abort the yt-dlp process if (item.status === 'downloading') { const controller = this.activeAbortControllers.get(queueItemId); if (controller) { controller.abort('paused'); } } const updated = await updateQueueItemStatus(this.db, queueItemId, 'paused'); console.log( `[queue] pause queueId=${queueItemId} contentId=${item.contentItemId} previousStatus=${item.status} status=paused` ); return updated!; } /** * Resume a paused queue item. Sets it back to 'pending' and triggers processing. * * @throws Error if item not found or not paused. */ async resumeItem(queueItemId: number): Promise { const item = await getQueueItemById(this.db, queueItemId); if (!item) { throw new Error(`Queue item ${queueItemId} not found`); } if (item.status !== 'paused') { throw new Error( `Cannot resume queue item ${queueItemId} — status is '${item.status}', expected 'paused'` ); } const updated = await updateQueueItemStatus(this.db, queueItemId, 'pending', { error: null, startedAt: null, }); // Reset content status to queued await updateContentItem(this.db, item.contentItemId, { status: 'queued' }); console.log( `[queue] resume queueId=${queueItemId} contentId=${item.contentItemId} status=pending` ); this.processNext(); return updated!; } /** * Recover items that were stuck in 'downloading' status after a crash/restart. * Resets them to 'pending' so they'll be picked up again. * * @returns Number of items recovered. */ async recoverOnStartup(): Promise { const stuckItems = await getQueueItemsByStatus(this.db, 'downloading'); for (const item of stuckItems) { await updateQueueItemStatus(this.db, item.id, 'pending', { startedAt: null, error: null, }); } if (stuckItems.length > 0) { console.log( `[queue] recovery: reset ${stuckItems.length} stuck item(s) from downloading → pending` ); } return stuckItems.length; } /** * Get current queue state — count of items by status. */ async getState(): Promise { return countQueueItemsByStatus(this.db); } /** * Stop processing — no new items will be picked up. * Items already downloading will finish. */ stop(): void { this.stopped = true; console.log('[queue] stopped — no new items will be processed'); } /** * Resume processing after stop(). */ start(): void { this.stopped = false; console.log('[queue] started — processing resumed'); this.processNext(); } /** * Infer platform from a URL for ad-hoc downloads. */ private inferPlatformFromUrl(url: string): string { if (url.includes('youtube.com') || url.includes('youtu.be')) return 'youtube'; if (url.includes('soundcloud.com')) return 'soundcloud'; return 'generic'; } // ── Internal ── /** * Process a single queue item: download the content, update status, * and record history events. */ private async processItem(queueItem: QueueItem): Promise { const logPrefix = `[queue] process queueId=${queueItem.id} contentId=${queueItem.contentItemId}`; const abortController = new AbortController(); this.activeAbortControllers.set(queueItem.id, abortController); try { // Transition to downloading await updateQueueItemStatus(this.db, queueItem.id, 'downloading', { startedAt: new Date().toISOString(), }); console.log(`${logPrefix} status=downloading`); // Look up content item and channel const contentItem = await getContentItemById(this.db, queueItem.contentItemId); if (!contentItem) { throw new Error(`Content item ${queueItem.contentItemId} not found`); } const channel = contentItem.channelId ? await getChannelById(this.db, contentItem.channelId) : null; if (contentItem.channelId && !channel) { throw new Error(`Channel ${contentItem.channelId} not found for content item ${contentItem.id}`); } // Resolve format profile: channel-specific > default > undefined let formatProfile = undefined; if (channel?.formatProfileId) { formatProfile = await getFormatProfileById(this.db, channel.formatProfileId) ?? undefined; } if (!formatProfile) { formatProfile = await getDefaultFormatProfile(this.db) ?? undefined; } // Execute download — ad-hoc items (no channel) pass null with platform/channelName overrides if (channel) { await this.downloadService.downloadItem(contentItem, channel, formatProfile); } else { // Ad-hoc download: infer platform from URL, use stored title metadata const platform = this.inferPlatformFromUrl(contentItem.url); await this.downloadService.downloadItem(contentItem, null, formatProfile, { platform: platform as import('../types/index').Platform, channelName: 'Ad-hoc', }); } // Success — mark completed await updateQueueItemStatus(this.db, queueItem.id, 'completed', { completedAt: new Date().toISOString(), }); // Record downloaded history event await createHistoryEvent(this.db, { contentItemId: queueItem.contentItemId, channelId: channel?.id ?? null, eventType: 'downloaded', status: 'completed', details: { queueItemId: queueItem.id, title: contentItem.title, attempts: queueItem.attempts + 1, }, }); console.log(`${logPrefix} status=completed`); // Fire notification callback (fire-and-forget) if (this.onDownloadComplete) { try { this.onDownloadComplete({ contentTitle: contentItem.title, channelName: channel?.name ?? 'Ad-hoc', platform: channel?.platform ?? 'generic', url: contentItem.url, filePath: contentItem.filePath ?? undefined, }); } catch (notifyErr) { console.log( `[queue] notification callback error: ${notifyErr instanceof Error ? notifyErr.message : String(notifyErr)}` ); } } } catch (err: unknown) { // If aborted due to pause, don't treat as a failure — status is already set by pauseItem if (abortController.signal.aborted) { console.log(`${logPrefix} aborted (paused)`); // Don't increment attempts or record failure — the item was paused by the user } else { const errorMsg = err instanceof Error ? err.message : String(err); const newAttempts = queueItem.attempts + 1; const exhausted = newAttempts >= queueItem.maxAttempts; const newStatus: QueueStatus = exhausted ? 'failed' : 'pending'; await updateQueueItemStatus(this.db, queueItem.id, newStatus, { attempts: newAttempts, error: errorMsg, ...(exhausted ? {} : { startedAt: null }), }); // Update content status to failed when attempts exhausted if (exhausted) { await updateContentItem(this.db, queueItem.contentItemId, { status: 'failed' }); } // Record failed history event const contentItem = await getContentItemById(this.db, queueItem.contentItemId); await createHistoryEvent(this.db, { contentItemId: queueItem.contentItemId, channelId: contentItem?.channelId ?? null, eventType: 'failed', status: newStatus, details: { queueItemId: queueItem.id, error: errorMsg, attempt: newAttempts, maxAttempts: queueItem.maxAttempts, exhausted, }, }); console.log( `${logPrefix} status=${newStatus} attempt=${newAttempts}/${queueItem.maxAttempts} error="${errorMsg.slice(0, 200)}"` ); // Fire failure notification callback when attempts exhausted (fire-and-forget) if (exhausted && this.onDownloadFailed) { try { // Look up channel for notification context let failedChannelName = 'Unknown'; let failedPlatform = 'unknown'; if (contentItem?.channelId) { const failedChannel = await getChannelById(this.db, contentItem.channelId); if (failedChannel) { failedChannelName = failedChannel.name; failedPlatform = failedChannel.platform; } } this.onDownloadFailed({ contentTitle: contentItem?.title ?? `Content #${queueItem.contentItemId}`, channelName: failedChannelName, platform: failedPlatform, url: contentItem?.url ?? '', error: errorMsg, attempt: newAttempts, maxAttempts: queueItem.maxAttempts, }); } catch (notifyErr) { console.log( `[queue] notification callback error: ${notifyErr instanceof Error ? notifyErr.message : String(notifyErr)}` ); } } } } finally { this.activeAbortControllers.delete(queueItem.id); this.activeCount--; this.processNext(); } } }