import { eq, and, desc, asc, sql } from 'drizzle-orm'; import { type LibSQLDatabase } from 'drizzle-orm/libsql'; import type * as schema from '../schema/index'; import { queueItems, contentItems, channels } from '../schema/index'; import type { QueueItem, QueueStatus } from '../../types/index'; // ── Types ── /** Fields needed to create a new queue item. */ export interface CreateQueueItemData { contentItemId: number; priority?: number; maxAttempts?: number; } /** Optional fields when updating queue item status. */ export interface UpdateQueueItemFields { error?: string | null; startedAt?: string | null; completedAt?: string | null; attempts?: number; } type Db = LibSQLDatabase; // ── Repository Functions ── /** * Insert a new queue item. Returns the created row. */ export async function createQueueItem( db: Db, data: CreateQueueItemData ): Promise { const result = await db .insert(queueItems) .values({ contentItemId: data.contentItemId, priority: data.priority ?? 0, maxAttempts: data.maxAttempts ?? 3, }) .returning(); return mapRow(result[0]); } /** Get a queue item by ID. Returns null if not found. */ export async function getQueueItemById( db: Db, id: number ): Promise { const rows = await db .select() .from(queueItems) .where(eq(queueItems.id, id)) .limit(1); return rows.length > 0 ? mapRow(rows[0]) : null; } /** Get queue items by status, ordered by priority DESC then createdAt ASC. */ export async function getQueueItemsByStatus( db: Db, status: QueueStatus ): Promise { const rows = await db .select({ id: queueItems.id, contentItemId: queueItems.contentItemId, status: queueItems.status, priority: queueItems.priority, attempts: queueItems.attempts, maxAttempts: queueItems.maxAttempts, error: queueItems.error, startedAt: queueItems.startedAt, completedAt: queueItems.completedAt, createdAt: queueItems.createdAt, updatedAt: queueItems.updatedAt, contentTitle: contentItems.title, channelName: channels.name, }) .from(queueItems) .leftJoin(contentItems, eq(queueItems.contentItemId, contentItems.id)) .leftJoin(channels, eq(contentItems.channelId, channels.id)) .where(eq(queueItems.status, status)) .orderBy(desc(queueItems.priority), asc(queueItems.createdAt)); return rows.map(mapJoinedRow); } /** Get all queue items ordered by priority DESC then createdAt ASC. */ export async function getAllQueueItems( db: Db ): Promise { const rows = await db .select({ id: queueItems.id, contentItemId: queueItems.contentItemId, status: queueItems.status, priority: queueItems.priority, attempts: queueItems.attempts, maxAttempts: queueItems.maxAttempts, error: queueItems.error, startedAt: queueItems.startedAt, completedAt: queueItems.completedAt, createdAt: queueItems.createdAt, updatedAt: queueItems.updatedAt, contentTitle: contentItems.title, channelName: channels.name, }) .from(queueItems) .leftJoin(contentItems, eq(queueItems.contentItemId, contentItems.id)) .leftJoin(channels, eq(contentItems.channelId, channels.id)) .orderBy(desc(queueItems.priority), asc(queueItems.createdAt)); return rows.map(mapJoinedRow); } /** * Get pending queue items ordered by priority DESC then createdAt ASC. * Optional limit to constrain the result set (for concurrency control). */ export async function getPendingQueueItems( db: Db, limit?: number ): Promise { let query = db .select() .from(queueItems) .where(eq(queueItems.status, 'pending')) .orderBy(desc(queueItems.priority), asc(queueItems.createdAt)); if (limit !== undefined) { query = query.limit(limit) as typeof query; } const rows = await query; return rows.map(mapRow); } /** * Update a queue item's status and optional fields. Sets updatedAt to now. * Returns updated item or null if not found. */ export async function updateQueueItemStatus( db: Db, id: number, status: QueueStatus, updates?: UpdateQueueItemFields ): Promise { const setData: Record = { status, updatedAt: sql`(datetime('now'))`, }; if (updates?.error !== undefined) setData.error = updates.error; if (updates?.startedAt !== undefined) setData.startedAt = updates.startedAt; if (updates?.completedAt !== undefined) setData.completedAt = updates.completedAt; if (updates?.attempts !== undefined) setData.attempts = updates.attempts; const result = await db .update(queueItems) .set(setData) .where(eq(queueItems.id, id)) .returning(); return result.length > 0 ? mapRow(result[0]) : null; } /** * Count queue items grouped by status. * Returns a record with counts for each QueueStatus value. */ export async function countQueueItemsByStatus( db: Db ): Promise> { const rows = await db .select({ status: queueItems.status, count: sql`count(*)`, }) .from(queueItems) .groupBy(queueItems.status); const counts: Record = { pending: 0, downloading: 0, completed: 0, failed: 0, cancelled: 0, }; for (const row of rows) { counts[row.status] = Number(row.count); } return counts as Record; } /** Delete a queue item by ID. Returns true if a row was deleted. */ export async function deleteQueueItem( db: Db, id: number ): Promise { const result = await db .delete(queueItems) .where(eq(queueItems.id, id)) .returning({ id: queueItems.id }); return result.length > 0; } /** * Get a queue item by content item ID (for dedup checking before enqueue). * Returns the queue item or null if no entry exists for this content item. */ export async function getQueueItemByContentItemId( db: Db, contentItemId: number ): Promise { const rows = await db .select() .from(queueItems) .where(eq(queueItems.contentItemId, contentItemId)) .limit(1); return rows.length > 0 ? mapRow(rows[0]) : null; } // ── Row Mapping ── /** Map a plain queue_items row (no JOINs). Used by CRUD operations that don't need display names. */ function mapRow(row: typeof queueItems.$inferSelect): QueueItem { return { id: row.id, contentItemId: row.contentItemId, status: row.status as QueueStatus, priority: row.priority, attempts: row.attempts, maxAttempts: row.maxAttempts, error: row.error, startedAt: row.startedAt, completedAt: row.completedAt, createdAt: row.createdAt, updatedAt: row.updatedAt, }; } /** Joined row shape from explicit `.select({...})` with LEFT JOIN content_items and channels. */ interface JoinedQueueRow { id: number; contentItemId: number; status: string; priority: number; attempts: number; maxAttempts: number; error: string | null; startedAt: string | null; completedAt: string | null; createdAt: string; updatedAt: string; contentTitle: string | null; channelName: string | null; } /** Map a joined queue row (with content title and channel name from LEFT JOINs). */ function mapJoinedRow(row: JoinedQueueRow): QueueItem { return { id: row.id, contentItemId: row.contentItemId, status: row.status as QueueStatus, priority: row.priority, attempts: row.attempts, maxAttempts: row.maxAttempts, error: row.error, startedAt: row.startedAt, completedAt: row.completedAt, createdAt: row.createdAt, updatedAt: row.updatedAt, contentTitle: row.contentTitle ?? null, channelName: row.channelName ?? null, }; }