From e6faa05d1cdb52d1f4b90fce14985e69d8654d74 Mon Sep 17 00:00:00 2001 From: John Lightner Date: Tue, 24 Mar 2026 20:48:20 -0500 Subject: [PATCH] =?UTF-8?q?test(S03/T01):=20Add=20getCollectibleItems=20re?= =?UTF-8?q?pository=20query=20and=20collect/col=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - src/db/repositories/content-repository.ts - src/server/routes/collect.ts - src/server/index.ts - src/__tests__/collect-api.test.ts --- src/__tests__/collect-api.test.ts | 518 ++++++++++++++++++++++ src/db/repositories/content-repository.ts | 31 +- src/server/index.ts | 2 + src/server/routes/collect.ts | 140 ++++++ 4 files changed, 690 insertions(+), 1 deletion(-) create mode 100644 src/__tests__/collect-api.test.ts create mode 100644 src/server/routes/collect.ts diff --git a/src/__tests__/collect-api.test.ts b/src/__tests__/collect-api.test.ts new file mode 100644 index 0000000..9cdb40f --- /dev/null +++ b/src/__tests__/collect-api.test.ts @@ -0,0 +1,518 @@ +import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest'; +import { mkdtempSync, rmSync, existsSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { type FastifyInstance } from 'fastify'; +import { initDatabaseAsync, closeDatabase } from '../db/index'; +import { runMigrations } from '../db/migrate'; +import { buildServer } from '../server/index'; +import { systemConfig } from '../db/schema/index'; +import { eq } from 'drizzle-orm'; +import { type LibSQLDatabase } from 'drizzle-orm/libsql'; +import type * as schema from '../db/schema/index'; +import { createChannel } from '../db/repositories/channel-repository'; +import { createContentItem, updateContentItem } from '../db/repositories/content-repository'; +import { createQueueItem } from '../db/repositories/queue-repository'; +import { QueueService } from '../services/queue'; +import type { DownloadService } from '../services/download'; +import type { ContentItem, Channel } from '../types/index'; + +/** + * Integration tests for the collect endpoints. + * + * POST /api/v1/channel/:id/collect — collect monitored items for one channel + * POST /api/v1/channel/collect-all — collect monitored items across all channels + * + * Both return { enqueued, skipped, errors, items } and enqueue via QueueService. + */ + +describe('Collect API', () => { + let server: FastifyInstance; + let db: LibSQLDatabase; + let apiKey: string; + let tmpDir: string; + let testChannel: Channel; + let testChannel2: Channel; + let queueService: QueueService; + let mockDownloadService: { + downloadItem: ReturnType; + }; + + beforeAll(async () => { + tmpDir = mkdtempSync(join(tmpdir(), 'tubearr-collect-api-')); + const dbPath = join(tmpDir, 'test.db'); + db = await initDatabaseAsync(dbPath); + await runMigrations(dbPath); + server = await buildServer({ db }); + + // Create mock download service and real queue service + mockDownloadService = { + downloadItem: vi.fn().mockResolvedValue(undefined), + }; + queueService = new QueueService( + db, + mockDownloadService as unknown as DownloadService, + 2 + ); + // Stop auto-processing so tests stay deterministic + queueService.stop(); + + (server as { downloadService: DownloadService | null }).downloadService = + mockDownloadService as unknown as DownloadService; + (server as { queueService: QueueService | null }).queueService = queueService; + + await server.ready(); + + // Read API key + const rows = await db + .select() + .from(systemConfig) + .where(eq(systemConfig.key, 'api_key')) + .limit(1); + apiKey = rows[0]?.value ?? ''; + expect(apiKey).toBeTruthy(); + + // Create two test channels for cross-channel testing + testChannel = await createChannel(db, { + name: 'Collect Test Channel 1', + platform: 'youtube', + platformId: 'UC_collect_test_1', + url: 'https://www.youtube.com/channel/UC_collect_test_1', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + testChannel2 = await createChannel(db, { + name: 'Collect Test Channel 2', + platform: 'youtube', + platformId: 'UC_collect_test_2', + url: 'https://www.youtube.com/channel/UC_collect_test_2', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + }); + + afterAll(async () => { + queueService.stop(); + await server.close(); + closeDatabase(); + try { + if (tmpDir && existsSync(tmpDir)) { + rmSync(tmpDir, { recursive: true, force: true }); + } + } catch { + // Temp dir cleanup is best-effort on Windows (K004) + } + }); + + // ── Helpers ── + + function authed(opts: Record) { + return { + ...opts, + headers: { 'x-api-key': apiKey, ...(opts.headers as Record | undefined) }, + }; + } + + let contentCounter = 0; + async function createTestItem( + channelId: number, + overrides: { status?: string; monitored?: boolean; platformContentId?: string } = {} + ): Promise { + contentCounter++; + const item = await createContentItem(db, { + channelId, + title: `Collect Test Item ${contentCounter}`, + platformContentId: overrides.platformContentId ?? `vid_collect_${Date.now()}_${contentCounter}`, + url: 'https://www.youtube.com/watch?v=collect_test', + contentType: 'video', + duration: 300, + status: (overrides.status ?? 'monitored') as 'monitored', + monitored: overrides.monitored ?? true, + }); + return item!; + } + + // ── Auth gating ── + + describe('Authentication', () => { + it('returns 401 when no API key is provided for single channel', async () => { + const res = await server.inject({ + method: 'POST', + url: `/api/v1/channel/${testChannel.id}/collect`, + }); + expect(res.statusCode).toBe(401); + }); + + it('returns 401 when no API key is provided for collect-all', async () => { + const res = await server.inject({ + method: 'POST', + url: '/api/v1/channel/collect-all', + }); + expect(res.statusCode).toBe(401); + }); + }); + + // ── Validation ── + + describe('Validation', () => { + it('returns 400 for non-numeric channel ID', async () => { + const res = await server.inject( + authed({ method: 'POST', url: '/api/v1/channel/abc/collect' }) + ); + expect(res.statusCode).toBe(400); + expect(res.json().message).toContain('number'); + }); + + it('returns 404 for nonexistent channel ID', async () => { + const res = await server.inject( + authed({ method: 'POST', url: '/api/v1/channel/99999/collect' }) + ); + expect(res.statusCode).toBe(404); + expect(res.json().message).toContain('99999'); + }); + }); + + // ── 503 when queue service unavailable ── + + describe('Queue service unavailable', () => { + it('returns 503 for single channel when queue service is null', async () => { + const savedQueueService = (server as unknown as Record).queueService; + (server as { queueService: QueueService | null }).queueService = null; + + try { + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${testChannel.id}/collect` }) + ); + expect(res.statusCode).toBe(503); + expect(res.json().message).toContain('Queue service'); + } finally { + (server as { queueService: QueueService | null }).queueService = + savedQueueService as QueueService; + } + }); + + it('returns 503 for collect-all when queue service is null', async () => { + const savedQueueService = (server as unknown as Record).queueService; + (server as { queueService: QueueService | null }).queueService = null; + + try { + const res = await server.inject( + authed({ method: 'POST', url: '/api/v1/channel/collect-all' }) + ); + expect(res.statusCode).toBe(503); + expect(res.json().message).toContain('Queue service'); + } finally { + (server as { queueService: QueueService | null }).queueService = + savedQueueService as QueueService; + } + }); + }); + + // ── Happy path ── + + describe('Happy path — single channel', () => { + it('enqueues monitored items with eligible status', async () => { + const item1 = await createTestItem(testChannel.id, { status: 'monitored' }); + const item2 = await createTestItem(testChannel.id, { status: 'monitored' }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${testChannel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + const body = res.json(); + expect(body.enqueued).toBe(2); + expect(body.skipped).toBe(0); + expect(body.errors).toBe(0); + expect(body.items).toHaveLength(2); + + const enqueued = body.items.filter((i: { status: string }) => i.status === 'enqueued'); + expect(enqueued).toHaveLength(2); + const ids = enqueued.map((i: { contentItemId: number }) => i.contentItemId); + expect(ids).toContain(item1.id); + expect(ids).toContain(item2.id); + }); + + it('also enqueues items with status "failed"', async () => { + const item = await createTestItem(testChannel.id, { status: 'monitored' }); + await updateContentItem(db, item.id, { status: 'failed' }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${testChannel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + const body = res.json(); + // Should include the failed item (it's collectible — not downloaded/queued/downloading) + const itemResult = body.items.find((i: { contentItemId: number }) => i.contentItemId === item.id); + expect(itemResult).toBeDefined(); + expect(itemResult.status).toBe('enqueued'); + }); + }); + + // ── Empty result ── + + describe('Empty result', () => { + it('returns zeros when no collectible items exist', async () => { + // Create a new channel with no content + const emptyChannel = await createChannel(db, { + name: 'Empty Channel', + platform: 'youtube', + platformId: 'UC_empty_collect', + url: 'https://www.youtube.com/channel/UC_empty_collect', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${emptyChannel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + const body = res.json(); + expect(body.enqueued).toBe(0); + expect(body.skipped).toBe(0); + expect(body.errors).toBe(0); + expect(body.items).toHaveLength(0); + }); + }); + + // ── Exclusion filters ── + + describe('Exclusion filters', () => { + it('excludes items with monitored=false even if status is eligible', async () => { + const channel = await createChannel(db, { + name: 'Exclusion Test Channel', + platform: 'youtube', + platformId: 'UC_excl_test', + url: 'https://www.youtube.com/channel/UC_excl_test', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + // Create unmonitored item with eligible status + await createTestItem(channel.id, { monitored: false, status: 'monitored' }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + expect(res.json().enqueued).toBe(0); + expect(res.json().items).toHaveLength(0); + }); + + it('excludes items with status "downloaded" even if monitored=true', async () => { + const channel = await createChannel(db, { + name: 'Downloaded Exclusion Channel', + platform: 'youtube', + platformId: 'UC_dl_excl', + url: 'https://www.youtube.com/channel/UC_dl_excl', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + const item = await createTestItem(channel.id); + await updateContentItem(db, item.id, { status: 'downloaded' }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + expect(res.json().enqueued).toBe(0); + expect(res.json().items).toHaveLength(0); + }); + + it('excludes items with status "queued"', async () => { + const channel = await createChannel(db, { + name: 'Queued Exclusion Channel', + platform: 'youtube', + platformId: 'UC_q_excl', + url: 'https://www.youtube.com/channel/UC_q_excl', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + const item = await createTestItem(channel.id); + await updateContentItem(db, item.id, { status: 'queued' }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + expect(res.json().enqueued).toBe(0); + expect(res.json().items).toHaveLength(0); + }); + + it('excludes items with status "downloading"', async () => { + const channel = await createChannel(db, { + name: 'Downloading Exclusion Channel', + platform: 'youtube', + platformId: 'UC_ding_excl', + url: 'https://www.youtube.com/channel/UC_ding_excl', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + const item = await createTestItem(channel.id); + await updateContentItem(db, item.id, { status: 'downloading' }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + expect(res.json().enqueued).toBe(0); + expect(res.json().items).toHaveLength(0); + }); + }); + + // ── Dedup / skip ── + + describe('Dedup — already queued items', () => { + it('reports skipped count when items are already in the queue', async () => { + const channel = await createChannel(db, { + name: 'Dedup Test Channel', + platform: 'youtube', + platformId: 'UC_dedup_test', + url: 'https://www.youtube.com/channel/UC_dedup_test', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + const item = await createTestItem(channel.id); + // Create a queue entry directly (simulates a concurrent enqueue that happened + // after getCollectibleItems ran but before we iterate to this item). + // The content status stays 'monitored' so getCollectibleItems returns it, + // but queueService.enqueue() will see the existing queue entry and throw. + await createQueueItem(db, { contentItemId: item.id }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + const body = res.json(); + expect(body.enqueued).toBe(0); + expect(body.skipped).toBe(1); + expect(body.items[0].status).toBe('skipped'); + }); + }); + + // ── Mixed results ── + + describe('Mixed results', () => { + it('correctly counts enqueued and skipped items in same batch', async () => { + const channel = await createChannel(db, { + name: 'Mixed Test Channel', + platform: 'youtube', + platformId: 'UC_mixed_test', + url: 'https://www.youtube.com/channel/UC_mixed_test', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + const itemToSkip = await createTestItem(channel.id); + const itemToEnqueue = await createTestItem(channel.id); + + // Create a queue entry directly for the skip item (content status stays 'monitored') + await createQueueItem(db, { contentItemId: itemToSkip.id }); + + const res = await server.inject( + authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` }) + ); + + expect(res.statusCode).toBe(200); + const body = res.json(); + expect(body.enqueued).toBe(1); + expect(body.skipped).toBe(1); + expect(body.errors).toBe(0); + expect(body.items).toHaveLength(2); + + const skippedItem = body.items.find( + (i: { contentItemId: number }) => i.contentItemId === itemToSkip.id + ); + const enqueuedItem = body.items.find( + (i: { contentItemId: number }) => i.contentItemId === itemToEnqueue.id + ); + expect(skippedItem.status).toBe('skipped'); + expect(enqueuedItem.status).toBe('enqueued'); + }); + }); + + // ── Collect-all across channels ── + + describe('Collect-all', () => { + it('collects items across multiple channels', async () => { + const ch1 = await createChannel(db, { + name: 'CollectAll Ch1', + platform: 'youtube', + platformId: 'UC_coll_all_1', + url: 'https://www.youtube.com/channel/UC_coll_all_1', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + const ch2 = await createChannel(db, { + name: 'CollectAll Ch2', + platform: 'youtube', + platformId: 'UC_coll_all_2', + url: 'https://www.youtube.com/channel/UC_coll_all_2', + monitoringEnabled: true, + checkInterval: 360, + imageUrl: null, + metadata: null, + formatProfileId: null, + }); + + const item1 = await createTestItem(ch1.id); + const item2 = await createTestItem(ch2.id); + + const res = await server.inject( + authed({ method: 'POST', url: '/api/v1/channel/collect-all' }) + ); + + expect(res.statusCode).toBe(200); + const body = res.json(); + // Should include at least these two items (may include others from earlier tests) + expect(body.enqueued).toBeGreaterThanOrEqual(2); + const ids = body.items + .filter((i: { status: string }) => i.status === 'enqueued') + .map((i: { contentItemId: number }) => i.contentItemId); + expect(ids).toContain(item1.id); + expect(ids).toContain(item2.id); + }); + }); +}); diff --git a/src/db/repositories/content-repository.ts b/src/db/repositories/content-repository.ts index a371274..260946e 100644 --- a/src/db/repositories/content-repository.ts +++ b/src/db/repositories/content-repository.ts @@ -1,4 +1,4 @@ -import { eq, and, desc, like, sql, inArray } from 'drizzle-orm'; +import { eq, and, desc, like, sql, inArray, notInArray } from 'drizzle-orm'; import { type LibSQLDatabase } from 'drizzle-orm/libsql'; import type * as schema from '../schema/index'; import { contentItems } from '../schema/index'; @@ -338,6 +338,35 @@ export async function getContentCountsByChannelIds( return map; } +// ── Collectible Items ── + +/** + * Get content items eligible for collection (download enqueueing). + * Returns items where `monitored = true` AND status is NOT 'downloaded', + * 'queued', or 'downloading'. Optionally filtered by channelId. + */ +export async function getCollectibleItems( + db: Db, + channelId?: number +): Promise { + const conditions = [ + eq(contentItems.monitored, true), + notInArray(contentItems.status, ['downloaded', 'queued', 'downloading']), + ]; + + if (channelId !== undefined) { + conditions.push(eq(contentItems.channelId, channelId)); + } + + const rows = await db + .select() + .from(contentItems) + .where(and(...conditions)) + .orderBy(contentItems.id); + + return rows.map(mapRow); +} + // ── Row Mapping ── function mapRow(row: typeof contentItems.$inferSelect): ContentItem { diff --git a/src/server/index.ts b/src/server/index.ts index 48e4960..5d3c295 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -19,6 +19,7 @@ import { contentRoutes } from './routes/content'; import { notificationRoutes } from './routes/notification'; import { platformSettingsRoutes } from './routes/platform-settings'; import { scanRoutes } from './routes/scan'; +import { collectRoutes } from './routes/collect'; import { playlistRoutes } from './routes/playlist'; import type { SchedulerService } from '../services/scheduler'; import type { DownloadService } from '../services/download'; @@ -99,6 +100,7 @@ export async function buildServer(opts: BuildServerOptions): Promise { + + // ── POST /api/v1/channel/collect-all ── + // Registered first — literal path before parameterized (K019/K022) + fastify.post('/api/v1/channel/collect-all', async (_request, reply) => { + if (!fastify.queueService) { + return reply.status(503).send({ + statusCode: 503, + error: 'Service Unavailable', + message: 'Queue service is not initialized', + }); + } + + const items = await getCollectibleItems(fastify.db); + const result = await enqueueItems(fastify, items); + + fastify.log.info( + { enqueued: result.enqueued, skipped: result.skipped, errors: result.errors }, + '[collect] collect-all completed' + ); + + return result; + }); + + // ── POST /api/v1/channel/:id/collect ── + fastify.post<{ Params: { id: string } }>( + '/api/v1/channel/:id/collect', + async (request, reply) => { + const id = parseInt(request.params.id, 10); + if (isNaN(id)) { + return reply.status(400).send({ + statusCode: 400, + error: 'Bad Request', + message: 'Channel ID must be a number', + }); + } + + const channel = await getChannelById(fastify.db, id); + if (!channel) { + return reply.status(404).send({ + statusCode: 404, + error: 'Not Found', + message: `Channel with ID ${id} not found`, + }); + } + + if (!fastify.queueService) { + return reply.status(503).send({ + statusCode: 503, + error: 'Service Unavailable', + message: 'Queue service is not initialized', + }); + } + + const items = await getCollectibleItems(fastify.db, id); + const result = await enqueueItems(fastify, items); + + fastify.log.info( + { channelId: id, enqueued: result.enqueued, skipped: result.skipped, errors: result.errors }, + '[collect] channel %d collect completed', + id + ); + + return result; + } + ); +} + +// ── Helpers ── + +/** + * Enqueue a batch of content items with per-item error isolation. + * Items that are already queued are counted as "skipped". + * Unexpected errors are counted and logged but don't abort the batch. + */ +async function enqueueItems( + fastify: FastifyInstance, + items: Array<{ id: number }> +): Promise { + let enqueued = 0; + let skipped = 0; + let errors = 0; + const results: CollectItemResult[] = []; + + for (const item of items) { + try { + await fastify.queueService!.enqueue(item.id); + enqueued++; + results.push({ contentItemId: item.id, status: 'enqueued' }); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + + if (message.includes('already in the queue')) { + skipped++; + results.push({ contentItemId: item.id, status: 'skipped' }); + } else { + errors++; + results.push({ contentItemId: item.id, status: 'error' }); + fastify.log.error( + { err, contentItemId: item.id }, + '[collect] error enqueueing content item %d', + item.id + ); + } + } + } + + return { enqueued, skipped, errors, items: results }; +}