test(S03/T01): Add getCollectibleItems repository query and collect/col…

- src/db/repositories/content-repository.ts
- src/server/routes/collect.ts
- src/server/index.ts
- src/__tests__/collect-api.test.ts
This commit is contained in:
John Lightner 2026-03-24 20:48:20 -05:00
parent 0ef34b1d21
commit e6faa05d1c
4 changed files with 690 additions and 1 deletions

View file

@ -0,0 +1,518 @@
import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest';
import { mkdtempSync, rmSync, existsSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { type FastifyInstance } from 'fastify';
import { initDatabaseAsync, closeDatabase } from '../db/index';
import { runMigrations } from '../db/migrate';
import { buildServer } from '../server/index';
import { systemConfig } from '../db/schema/index';
import { eq } from 'drizzle-orm';
import { type LibSQLDatabase } from 'drizzle-orm/libsql';
import type * as schema from '../db/schema/index';
import { createChannel } from '../db/repositories/channel-repository';
import { createContentItem, updateContentItem } from '../db/repositories/content-repository';
import { createQueueItem } from '../db/repositories/queue-repository';
import { QueueService } from '../services/queue';
import type { DownloadService } from '../services/download';
import type { ContentItem, Channel } from '../types/index';
/**
* Integration tests for the collect endpoints.
*
* POST /api/v1/channel/:id/collect collect monitored items for one channel
* POST /api/v1/channel/collect-all collect monitored items across all channels
*
* Both return { enqueued, skipped, errors, items } and enqueue via QueueService.
*/
describe('Collect API', () => {
let server: FastifyInstance;
let db: LibSQLDatabase<typeof schema>;
let apiKey: string;
let tmpDir: string;
let testChannel: Channel;
let testChannel2: Channel;
let queueService: QueueService;
let mockDownloadService: {
downloadItem: ReturnType<typeof vi.fn>;
};
beforeAll(async () => {
tmpDir = mkdtempSync(join(tmpdir(), 'tubearr-collect-api-'));
const dbPath = join(tmpDir, 'test.db');
db = await initDatabaseAsync(dbPath);
await runMigrations(dbPath);
server = await buildServer({ db });
// Create mock download service and real queue service
mockDownloadService = {
downloadItem: vi.fn().mockResolvedValue(undefined),
};
queueService = new QueueService(
db,
mockDownloadService as unknown as DownloadService,
2
);
// Stop auto-processing so tests stay deterministic
queueService.stop();
(server as { downloadService: DownloadService | null }).downloadService =
mockDownloadService as unknown as DownloadService;
(server as { queueService: QueueService | null }).queueService = queueService;
await server.ready();
// Read API key
const rows = await db
.select()
.from(systemConfig)
.where(eq(systemConfig.key, 'api_key'))
.limit(1);
apiKey = rows[0]?.value ?? '';
expect(apiKey).toBeTruthy();
// Create two test channels for cross-channel testing
testChannel = await createChannel(db, {
name: 'Collect Test Channel 1',
platform: 'youtube',
platformId: 'UC_collect_test_1',
url: 'https://www.youtube.com/channel/UC_collect_test_1',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
testChannel2 = await createChannel(db, {
name: 'Collect Test Channel 2',
platform: 'youtube',
platformId: 'UC_collect_test_2',
url: 'https://www.youtube.com/channel/UC_collect_test_2',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
});
afterAll(async () => {
queueService.stop();
await server.close();
closeDatabase();
try {
if (tmpDir && existsSync(tmpDir)) {
rmSync(tmpDir, { recursive: true, force: true });
}
} catch {
// Temp dir cleanup is best-effort on Windows (K004)
}
});
// ── Helpers ──
function authed(opts: Record<string, unknown>) {
return {
...opts,
headers: { 'x-api-key': apiKey, ...(opts.headers as Record<string, string> | undefined) },
};
}
let contentCounter = 0;
async function createTestItem(
channelId: number,
overrides: { status?: string; monitored?: boolean; platformContentId?: string } = {}
): Promise<ContentItem> {
contentCounter++;
const item = await createContentItem(db, {
channelId,
title: `Collect Test Item ${contentCounter}`,
platformContentId: overrides.platformContentId ?? `vid_collect_${Date.now()}_${contentCounter}`,
url: 'https://www.youtube.com/watch?v=collect_test',
contentType: 'video',
duration: 300,
status: (overrides.status ?? 'monitored') as 'monitored',
monitored: overrides.monitored ?? true,
});
return item!;
}
// ── Auth gating ──
describe('Authentication', () => {
it('returns 401 when no API key is provided for single channel', async () => {
const res = await server.inject({
method: 'POST',
url: `/api/v1/channel/${testChannel.id}/collect`,
});
expect(res.statusCode).toBe(401);
});
it('returns 401 when no API key is provided for collect-all', async () => {
const res = await server.inject({
method: 'POST',
url: '/api/v1/channel/collect-all',
});
expect(res.statusCode).toBe(401);
});
});
// ── Validation ──
describe('Validation', () => {
it('returns 400 for non-numeric channel ID', async () => {
const res = await server.inject(
authed({ method: 'POST', url: '/api/v1/channel/abc/collect' })
);
expect(res.statusCode).toBe(400);
expect(res.json().message).toContain('number');
});
it('returns 404 for nonexistent channel ID', async () => {
const res = await server.inject(
authed({ method: 'POST', url: '/api/v1/channel/99999/collect' })
);
expect(res.statusCode).toBe(404);
expect(res.json().message).toContain('99999');
});
});
// ── 503 when queue service unavailable ──
describe('Queue service unavailable', () => {
it('returns 503 for single channel when queue service is null', async () => {
const savedQueueService = (server as unknown as Record<string, unknown>).queueService;
(server as { queueService: QueueService | null }).queueService = null;
try {
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${testChannel.id}/collect` })
);
expect(res.statusCode).toBe(503);
expect(res.json().message).toContain('Queue service');
} finally {
(server as { queueService: QueueService | null }).queueService =
savedQueueService as QueueService;
}
});
it('returns 503 for collect-all when queue service is null', async () => {
const savedQueueService = (server as unknown as Record<string, unknown>).queueService;
(server as { queueService: QueueService | null }).queueService = null;
try {
const res = await server.inject(
authed({ method: 'POST', url: '/api/v1/channel/collect-all' })
);
expect(res.statusCode).toBe(503);
expect(res.json().message).toContain('Queue service');
} finally {
(server as { queueService: QueueService | null }).queueService =
savedQueueService as QueueService;
}
});
});
// ── Happy path ──
describe('Happy path — single channel', () => {
it('enqueues monitored items with eligible status', async () => {
const item1 = await createTestItem(testChannel.id, { status: 'monitored' });
const item2 = await createTestItem(testChannel.id, { status: 'monitored' });
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${testChannel.id}/collect` })
);
expect(res.statusCode).toBe(200);
const body = res.json();
expect(body.enqueued).toBe(2);
expect(body.skipped).toBe(0);
expect(body.errors).toBe(0);
expect(body.items).toHaveLength(2);
const enqueued = body.items.filter((i: { status: string }) => i.status === 'enqueued');
expect(enqueued).toHaveLength(2);
const ids = enqueued.map((i: { contentItemId: number }) => i.contentItemId);
expect(ids).toContain(item1.id);
expect(ids).toContain(item2.id);
});
it('also enqueues items with status "failed"', async () => {
const item = await createTestItem(testChannel.id, { status: 'monitored' });
await updateContentItem(db, item.id, { status: 'failed' });
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${testChannel.id}/collect` })
);
expect(res.statusCode).toBe(200);
const body = res.json();
// Should include the failed item (it's collectible — not downloaded/queued/downloading)
const itemResult = body.items.find((i: { contentItemId: number }) => i.contentItemId === item.id);
expect(itemResult).toBeDefined();
expect(itemResult.status).toBe('enqueued');
});
});
// ── Empty result ──
describe('Empty result', () => {
it('returns zeros when no collectible items exist', async () => {
// Create a new channel with no content
const emptyChannel = await createChannel(db, {
name: 'Empty Channel',
platform: 'youtube',
platformId: 'UC_empty_collect',
url: 'https://www.youtube.com/channel/UC_empty_collect',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${emptyChannel.id}/collect` })
);
expect(res.statusCode).toBe(200);
const body = res.json();
expect(body.enqueued).toBe(0);
expect(body.skipped).toBe(0);
expect(body.errors).toBe(0);
expect(body.items).toHaveLength(0);
});
});
// ── Exclusion filters ──
describe('Exclusion filters', () => {
it('excludes items with monitored=false even if status is eligible', async () => {
const channel = await createChannel(db, {
name: 'Exclusion Test Channel',
platform: 'youtube',
platformId: 'UC_excl_test',
url: 'https://www.youtube.com/channel/UC_excl_test',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
// Create unmonitored item with eligible status
await createTestItem(channel.id, { monitored: false, status: 'monitored' });
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` })
);
expect(res.statusCode).toBe(200);
expect(res.json().enqueued).toBe(0);
expect(res.json().items).toHaveLength(0);
});
it('excludes items with status "downloaded" even if monitored=true', async () => {
const channel = await createChannel(db, {
name: 'Downloaded Exclusion Channel',
platform: 'youtube',
platformId: 'UC_dl_excl',
url: 'https://www.youtube.com/channel/UC_dl_excl',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
const item = await createTestItem(channel.id);
await updateContentItem(db, item.id, { status: 'downloaded' });
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` })
);
expect(res.statusCode).toBe(200);
expect(res.json().enqueued).toBe(0);
expect(res.json().items).toHaveLength(0);
});
it('excludes items with status "queued"', async () => {
const channel = await createChannel(db, {
name: 'Queued Exclusion Channel',
platform: 'youtube',
platformId: 'UC_q_excl',
url: 'https://www.youtube.com/channel/UC_q_excl',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
const item = await createTestItem(channel.id);
await updateContentItem(db, item.id, { status: 'queued' });
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` })
);
expect(res.statusCode).toBe(200);
expect(res.json().enqueued).toBe(0);
expect(res.json().items).toHaveLength(0);
});
it('excludes items with status "downloading"', async () => {
const channel = await createChannel(db, {
name: 'Downloading Exclusion Channel',
platform: 'youtube',
platformId: 'UC_ding_excl',
url: 'https://www.youtube.com/channel/UC_ding_excl',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
const item = await createTestItem(channel.id);
await updateContentItem(db, item.id, { status: 'downloading' });
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` })
);
expect(res.statusCode).toBe(200);
expect(res.json().enqueued).toBe(0);
expect(res.json().items).toHaveLength(0);
});
});
// ── Dedup / skip ──
describe('Dedup — already queued items', () => {
it('reports skipped count when items are already in the queue', async () => {
const channel = await createChannel(db, {
name: 'Dedup Test Channel',
platform: 'youtube',
platformId: 'UC_dedup_test',
url: 'https://www.youtube.com/channel/UC_dedup_test',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
const item = await createTestItem(channel.id);
// Create a queue entry directly (simulates a concurrent enqueue that happened
// after getCollectibleItems ran but before we iterate to this item).
// The content status stays 'monitored' so getCollectibleItems returns it,
// but queueService.enqueue() will see the existing queue entry and throw.
await createQueueItem(db, { contentItemId: item.id });
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` })
);
expect(res.statusCode).toBe(200);
const body = res.json();
expect(body.enqueued).toBe(0);
expect(body.skipped).toBe(1);
expect(body.items[0].status).toBe('skipped');
});
});
// ── Mixed results ──
describe('Mixed results', () => {
it('correctly counts enqueued and skipped items in same batch', async () => {
const channel = await createChannel(db, {
name: 'Mixed Test Channel',
platform: 'youtube',
platformId: 'UC_mixed_test',
url: 'https://www.youtube.com/channel/UC_mixed_test',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
const itemToSkip = await createTestItem(channel.id);
const itemToEnqueue = await createTestItem(channel.id);
// Create a queue entry directly for the skip item (content status stays 'monitored')
await createQueueItem(db, { contentItemId: itemToSkip.id });
const res = await server.inject(
authed({ method: 'POST', url: `/api/v1/channel/${channel.id}/collect` })
);
expect(res.statusCode).toBe(200);
const body = res.json();
expect(body.enqueued).toBe(1);
expect(body.skipped).toBe(1);
expect(body.errors).toBe(0);
expect(body.items).toHaveLength(2);
const skippedItem = body.items.find(
(i: { contentItemId: number }) => i.contentItemId === itemToSkip.id
);
const enqueuedItem = body.items.find(
(i: { contentItemId: number }) => i.contentItemId === itemToEnqueue.id
);
expect(skippedItem.status).toBe('skipped');
expect(enqueuedItem.status).toBe('enqueued');
});
});
// ── Collect-all across channels ──
describe('Collect-all', () => {
it('collects items across multiple channels', async () => {
const ch1 = await createChannel(db, {
name: 'CollectAll Ch1',
platform: 'youtube',
platformId: 'UC_coll_all_1',
url: 'https://www.youtube.com/channel/UC_coll_all_1',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
const ch2 = await createChannel(db, {
name: 'CollectAll Ch2',
platform: 'youtube',
platformId: 'UC_coll_all_2',
url: 'https://www.youtube.com/channel/UC_coll_all_2',
monitoringEnabled: true,
checkInterval: 360,
imageUrl: null,
metadata: null,
formatProfileId: null,
});
const item1 = await createTestItem(ch1.id);
const item2 = await createTestItem(ch2.id);
const res = await server.inject(
authed({ method: 'POST', url: '/api/v1/channel/collect-all' })
);
expect(res.statusCode).toBe(200);
const body = res.json();
// Should include at least these two items (may include others from earlier tests)
expect(body.enqueued).toBeGreaterThanOrEqual(2);
const ids = body.items
.filter((i: { status: string }) => i.status === 'enqueued')
.map((i: { contentItemId: number }) => i.contentItemId);
expect(ids).toContain(item1.id);
expect(ids).toContain(item2.id);
});
});
});

View file

@ -1,4 +1,4 @@
import { eq, and, desc, like, sql, inArray } from 'drizzle-orm';
import { eq, and, desc, like, sql, inArray, notInArray } from 'drizzle-orm';
import { type LibSQLDatabase } from 'drizzle-orm/libsql';
import type * as schema from '../schema/index';
import { contentItems } from '../schema/index';
@ -338,6 +338,35 @@ export async function getContentCountsByChannelIds(
return map;
}
// ── Collectible Items ──
/**
* Get content items eligible for collection (download enqueueing).
* Returns items where `monitored = true` AND status is NOT 'downloaded',
* 'queued', or 'downloading'. Optionally filtered by channelId.
*/
export async function getCollectibleItems(
db: Db,
channelId?: number
): Promise<ContentItem[]> {
const conditions = [
eq(contentItems.monitored, true),
notInArray(contentItems.status, ['downloaded', 'queued', 'downloading']),
];
if (channelId !== undefined) {
conditions.push(eq(contentItems.channelId, channelId));
}
const rows = await db
.select()
.from(contentItems)
.where(and(...conditions))
.orderBy(contentItems.id);
return rows.map(mapRow);
}
// ── Row Mapping ──
function mapRow(row: typeof contentItems.$inferSelect): ContentItem {

View file

@ -19,6 +19,7 @@ import { contentRoutes } from './routes/content';
import { notificationRoutes } from './routes/notification';
import { platformSettingsRoutes } from './routes/platform-settings';
import { scanRoutes } from './routes/scan';
import { collectRoutes } from './routes/collect';
import { playlistRoutes } from './routes/playlist';
import type { SchedulerService } from '../services/scheduler';
import type { DownloadService } from '../services/download';
@ -99,6 +100,7 @@ export async function buildServer(opts: BuildServerOptions): Promise<FastifyInst
await server.register(notificationRoutes);
await server.register(platformSettingsRoutes);
await server.register(scanRoutes);
await server.register(collectRoutes);
await server.register(playlistRoutes);
// ── Static file serving for the frontend SPA ──

View file

@ -0,0 +1,140 @@
import { type FastifyInstance } from 'fastify';
import { getCollectibleItems } from '../../db/repositories/content-repository';
import { getChannelById } from '../../db/repositories/channel-repository';
// ── Types ──
interface CollectItemResult {
contentItemId: number;
status: 'enqueued' | 'skipped' | 'error';
}
interface CollectResponse {
enqueued: number;
skipped: number;
errors: number;
items: CollectItemResult[];
}
// ── Route Plugin ──
/**
* Collect routes for batch-enqueueing monitored content items.
*
* Registers:
* POST /api/v1/channel/collect-all enqueue all collectible items across all channels
* POST /api/v1/channel/:id/collect enqueue collectible items for a single channel
*
* "Collectible" = monitored:true AND status NOT IN ('downloaded', 'queued', 'downloading').
*
* Route registration order: collect-all BEFORE :id/collect (K019/K022).
*/
export async function collectRoutes(fastify: FastifyInstance): Promise<void> {
// ── POST /api/v1/channel/collect-all ──
// Registered first — literal path before parameterized (K019/K022)
fastify.post('/api/v1/channel/collect-all', async (_request, reply) => {
if (!fastify.queueService) {
return reply.status(503).send({
statusCode: 503,
error: 'Service Unavailable',
message: 'Queue service is not initialized',
});
}
const items = await getCollectibleItems(fastify.db);
const result = await enqueueItems(fastify, items);
fastify.log.info(
{ enqueued: result.enqueued, skipped: result.skipped, errors: result.errors },
'[collect] collect-all completed'
);
return result;
});
// ── POST /api/v1/channel/:id/collect ──
fastify.post<{ Params: { id: string } }>(
'/api/v1/channel/:id/collect',
async (request, reply) => {
const id = parseInt(request.params.id, 10);
if (isNaN(id)) {
return reply.status(400).send({
statusCode: 400,
error: 'Bad Request',
message: 'Channel ID must be a number',
});
}
const channel = await getChannelById(fastify.db, id);
if (!channel) {
return reply.status(404).send({
statusCode: 404,
error: 'Not Found',
message: `Channel with ID ${id} not found`,
});
}
if (!fastify.queueService) {
return reply.status(503).send({
statusCode: 503,
error: 'Service Unavailable',
message: 'Queue service is not initialized',
});
}
const items = await getCollectibleItems(fastify.db, id);
const result = await enqueueItems(fastify, items);
fastify.log.info(
{ channelId: id, enqueued: result.enqueued, skipped: result.skipped, errors: result.errors },
'[collect] channel %d collect completed',
id
);
return result;
}
);
}
// ── Helpers ──
/**
* Enqueue a batch of content items with per-item error isolation.
* Items that are already queued are counted as "skipped".
* Unexpected errors are counted and logged but don't abort the batch.
*/
async function enqueueItems(
fastify: FastifyInstance,
items: Array<{ id: number }>
): Promise<CollectResponse> {
let enqueued = 0;
let skipped = 0;
let errors = 0;
const results: CollectItemResult[] = [];
for (const item of items) {
try {
await fastify.queueService!.enqueue(item.id);
enqueued++;
results.push({ contentItemId: item.id, status: 'enqueued' });
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
if (message.includes('already in the queue')) {
skipped++;
results.push({ contentItemId: item.id, status: 'skipped' });
} else {
errors++;
results.push({ contentItemId: item.id, status: 'error' });
fastify.log.error(
{ err, contentItemId: item.id },
'[collect] error enqueueing content item %d',
item.id
);
}
}
}
return { enqueued, skipped, errors, items: results };
}