Migrated git root from W:/programming/Projects/ to W:/programming/Projects/Tubearr/. Previous history preserved in Tubearr-full-backup.bundle at parent directory. Completed milestones: M001 through M005 Active: M006/S02 (Add Channel UX)
884 lines
29 KiB
TypeScript
884 lines
29 KiB
TypeScript
import {
|
|
describe,
|
|
it,
|
|
expect,
|
|
beforeAll,
|
|
beforeEach,
|
|
afterAll,
|
|
vi,
|
|
} from 'vitest';
|
|
import { mkdtempSync, rmSync, existsSync } from 'node:fs';
|
|
import { join } from 'node:path';
|
|
import { tmpdir } from 'node:os';
|
|
import { initDatabaseAsync, closeDatabase } from '../db/index';
|
|
import { runMigrations } from '../db/migrate';
|
|
import { RateLimiter } from '../services/rate-limiter';
|
|
import { SchedulerService } from '../services/scheduler';
|
|
import type { CheckChannelResult } from '../services/scheduler';
|
|
import { PlatformRegistry } from '../sources/platform-source';
|
|
import type { PlatformSource, FetchRecentContentOptions } from '../sources/platform-source';
|
|
import { YtDlpError } from '../sources/yt-dlp';
|
|
import {
|
|
createChannel,
|
|
getChannelById,
|
|
getEnabledChannels,
|
|
} from '../db/repositories/channel-repository';
|
|
import {
|
|
getContentByChannelId,
|
|
getRecentContentIds,
|
|
} from '../db/repositories/content-repository';
|
|
import * as contentRepo from '../db/repositories/content-repository';
|
|
import { Platform } from '../types/index';
|
|
import type {
|
|
Channel,
|
|
PlatformSourceMetadata,
|
|
PlatformContentMetadata,
|
|
} from '../types/index';
|
|
import type { LibSQLDatabase } from 'drizzle-orm/libsql';
|
|
import type * as schema from '../db/schema/index';
|
|
|
|
// ── Rate Limiter Tests ──
|
|
|
|
describe('RateLimiter', () => {
|
|
it('first acquire is immediate (no wait)', async () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 1000 },
|
|
});
|
|
|
|
const start = Date.now();
|
|
await limiter.acquire(Platform.YouTube);
|
|
const elapsed = Date.now() - start;
|
|
|
|
// First call should be near-instant
|
|
expect(elapsed).toBeLessThan(50);
|
|
});
|
|
|
|
it('enforces minimum interval between calls', async () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 100 },
|
|
});
|
|
|
|
await limiter.acquire(Platform.YouTube);
|
|
const start = Date.now();
|
|
await limiter.acquire(Platform.YouTube);
|
|
const elapsed = Date.now() - start;
|
|
|
|
// Second call should have waited ~100ms
|
|
expect(elapsed).toBeGreaterThanOrEqual(80);
|
|
expect(elapsed).toBeLessThan(200);
|
|
});
|
|
|
|
it('does not wait if enough time has passed', async () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 50 },
|
|
});
|
|
|
|
await limiter.acquire(Platform.YouTube);
|
|
// Wait longer than the interval
|
|
await new Promise((r) => setTimeout(r, 80));
|
|
|
|
const start = Date.now();
|
|
await limiter.acquire(Platform.YouTube);
|
|
const elapsed = Date.now() - start;
|
|
|
|
expect(elapsed).toBeLessThan(30);
|
|
});
|
|
|
|
it('tracks platforms independently', async () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 100 },
|
|
[Platform.SoundCloud]: { minIntervalMs: 100 },
|
|
});
|
|
|
|
await limiter.acquire(Platform.YouTube);
|
|
|
|
// SoundCloud first call should be instant even though YouTube just went
|
|
const start = Date.now();
|
|
await limiter.acquire(Platform.SoundCloud);
|
|
const elapsed = Date.now() - start;
|
|
|
|
expect(elapsed).toBeLessThan(30);
|
|
});
|
|
|
|
it('doubles effective interval on error (exponential backoff)', () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 1000 },
|
|
});
|
|
|
|
limiter.reportError(Platform.YouTube);
|
|
const state1 = limiter.getState()[Platform.YouTube];
|
|
expect(state1.errorCount).toBe(1);
|
|
expect(state1.effectiveIntervalMs).toBe(2000); // 1000 * 2^1
|
|
|
|
limiter.reportError(Platform.YouTube);
|
|
const state2 = limiter.getState()[Platform.YouTube];
|
|
expect(state2.errorCount).toBe(2);
|
|
expect(state2.effectiveIntervalMs).toBe(4000); // 1000 * 2^2
|
|
});
|
|
|
|
it('caps backoff at MAX_BACKOFF_MS (60s)', () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 1000 },
|
|
});
|
|
|
|
// 2^7 = 128, so 1000 * 128 = 128000 > 60000
|
|
for (let i = 0; i < 7; i++) {
|
|
limiter.reportError(Platform.YouTube);
|
|
}
|
|
|
|
const state = limiter.getState()[Platform.YouTube];
|
|
expect(state.effectiveIntervalMs).toBe(60_000);
|
|
expect(state.errorCount).toBe(7);
|
|
});
|
|
|
|
it('resets to minimum interval on success', () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 1000 },
|
|
});
|
|
|
|
limiter.reportError(Platform.YouTube);
|
|
limiter.reportError(Platform.YouTube);
|
|
expect(limiter.getState()[Platform.YouTube].effectiveIntervalMs).toBe(4000);
|
|
|
|
limiter.reportSuccess(Platform.YouTube);
|
|
const state = limiter.getState()[Platform.YouTube];
|
|
expect(state.errorCount).toBe(0);
|
|
expect(state.effectiveIntervalMs).toBe(1000);
|
|
});
|
|
|
|
it('getState returns correct structure for all platforms', () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 1000 },
|
|
[Platform.SoundCloud]: { minIntervalMs: 3000 },
|
|
});
|
|
|
|
const state = limiter.getState();
|
|
expect(state[Platform.YouTube]).toEqual({
|
|
lastCallTime: null,
|
|
errorCount: 0,
|
|
effectiveIntervalMs: 1000,
|
|
});
|
|
expect(state[Platform.SoundCloud]).toEqual({
|
|
lastCallTime: null,
|
|
errorCount: 0,
|
|
effectiveIntervalMs: 3000,
|
|
});
|
|
});
|
|
|
|
it('getState reflects updated lastCallTime after acquire', async () => {
|
|
const limiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 50 },
|
|
});
|
|
|
|
expect(limiter.getState()[Platform.YouTube].lastCallTime).toBeNull();
|
|
|
|
await limiter.acquire(Platform.YouTube);
|
|
const state = limiter.getState()[Platform.YouTube];
|
|
expect(state.lastCallTime).toBeTypeOf('number');
|
|
expect(state.lastCallTime! - Date.now()).toBeLessThan(100);
|
|
});
|
|
});
|
|
|
|
// ── Scheduler Tests ──
|
|
|
|
describe('SchedulerService', () => {
|
|
let db: LibSQLDatabase<typeof schema>;
|
|
let tmpDir: string;
|
|
let registry: PlatformRegistry;
|
|
let rateLimiter: RateLimiter;
|
|
let mockFetchRecentContent: ReturnType<typeof vi.fn>;
|
|
let mockResolveChannel: ReturnType<typeof vi.fn>;
|
|
|
|
// Build a mock PlatformSource that returns canned content
|
|
function buildMockRegistry(): {
|
|
registry: PlatformRegistry;
|
|
fetchFn: ReturnType<typeof vi.fn>;
|
|
resolveFn: ReturnType<typeof vi.fn>;
|
|
} {
|
|
const fetchFn = vi.fn<
|
|
(channel: Channel, options?: FetchRecentContentOptions) => Promise<PlatformContentMetadata[]>
|
|
>();
|
|
const resolveFn = vi.fn<
|
|
(url: string) => Promise<PlatformSourceMetadata>
|
|
>();
|
|
|
|
const mockSource: PlatformSource = {
|
|
resolveChannel: resolveFn,
|
|
fetchRecentContent: fetchFn,
|
|
};
|
|
|
|
const reg = new PlatformRegistry();
|
|
reg.register(Platform.YouTube, mockSource);
|
|
reg.register(Platform.SoundCloud, mockSource);
|
|
|
|
return { registry: reg, fetchFn, resolveFn };
|
|
}
|
|
|
|
function makeCannedContent(count: number, prefix = 'vid'): PlatformContentMetadata[] {
|
|
return Array.from({ length: count }, (_, i) => ({
|
|
platformContentId: `${prefix}_${i + 1}`,
|
|
title: `${prefix} Title ${i + 1}`,
|
|
url: `https://www.youtube.com/watch?v=${prefix}_${i + 1}`,
|
|
contentType: 'video' as const,
|
|
duration: 600,
|
|
thumbnailUrl: null,
|
|
publishedAt: null,
|
|
}));
|
|
}
|
|
|
|
beforeAll(async () => {
|
|
tmpDir = mkdtempSync(join(tmpdir(), 'tubearr-scheduler-'));
|
|
const dbPath = join(tmpDir, 'test.db');
|
|
db = await initDatabaseAsync(dbPath);
|
|
await runMigrations(dbPath);
|
|
});
|
|
|
|
beforeEach(() => {
|
|
const mock = buildMockRegistry();
|
|
registry = mock.registry;
|
|
mockFetchRecentContent = mock.fetchFn;
|
|
mockResolveChannel = mock.resolveFn;
|
|
rateLimiter = new RateLimiter({
|
|
[Platform.YouTube]: { minIntervalMs: 0 }, // No delay in tests
|
|
[Platform.SoundCloud]: { minIntervalMs: 0 },
|
|
});
|
|
});
|
|
|
|
afterAll(async () => {
|
|
closeDatabase();
|
|
try {
|
|
if (tmpDir && existsSync(tmpDir)) {
|
|
rmSync(tmpDir, { recursive: true, force: true });
|
|
}
|
|
} catch {
|
|
// K004: best-effort cleanup on Windows
|
|
}
|
|
});
|
|
|
|
// ── Helper to create a test channel in the DB ──
|
|
async function insertTestChannel(
|
|
overrides: Partial<Parameters<typeof createChannel>[1]> = {}
|
|
): Promise<Channel> {
|
|
const defaults = {
|
|
name: `Test Channel ${Date.now()}`,
|
|
platform: Platform.YouTube as Platform,
|
|
platformId: `UC_TEST_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`,
|
|
url: 'https://www.youtube.com/@Test',
|
|
monitoringEnabled: true,
|
|
checkInterval: 60,
|
|
imageUrl: null,
|
|
metadata: null,
|
|
formatProfileId: null,
|
|
monitoringMode: 'all' as const,
|
|
};
|
|
return createChannel(db, { ...defaults, ...overrides });
|
|
}
|
|
|
|
// ── start() ──
|
|
|
|
describe('start()', () => {
|
|
it('loads enabled channels and creates cron jobs', async () => {
|
|
const channel = await insertTestChannel({ monitoringEnabled: true });
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
const count = await scheduler.start();
|
|
expect(count).toBeGreaterThanOrEqual(1);
|
|
|
|
const state = scheduler.getState();
|
|
expect(state.running).toBe(true);
|
|
expect(state.channelCount).toBeGreaterThanOrEqual(1);
|
|
|
|
// Find our channel in the state
|
|
const channelState = state.channels.find(
|
|
(c) => c.channelId === channel.id
|
|
);
|
|
expect(channelState).toBeDefined();
|
|
expect(channelState!.nextRun).toBeInstanceOf(Date);
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('does not create jobs for disabled channels', async () => {
|
|
const disabledChannel = await insertTestChannel({
|
|
monitoringEnabled: false,
|
|
});
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
await scheduler.start();
|
|
const state = scheduler.getState();
|
|
|
|
const channelState = state.channels.find(
|
|
(c) => c.channelId === disabledChannel.id
|
|
);
|
|
expect(channelState).toBeUndefined();
|
|
|
|
scheduler.stop();
|
|
});
|
|
});
|
|
|
|
// ── checkChannel() ──
|
|
|
|
describe('checkChannel()', () => {
|
|
it('inserts new content items and updates lastCheckedAt/lastCheckStatus', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
// Mock: return 3 new content items
|
|
mockFetchRecentContent.mockResolvedValueOnce(makeCannedContent(3));
|
|
|
|
await scheduler.checkChannel(channel);
|
|
|
|
// Verify content was inserted
|
|
const content = await getContentByChannelId(db, channel.id);
|
|
expect(content.length).toBe(3);
|
|
expect(content[0].status).toBe('monitored');
|
|
expect(content[0].platformContentId).toBeTruthy();
|
|
|
|
// Verify channel status was updated
|
|
const updated = await getChannelById(db, channel.id);
|
|
expect(updated!.lastCheckedAt).toBeTruthy();
|
|
expect(updated!.lastCheckStatus).toBe('success');
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('skips duplicate content items', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
const items = makeCannedContent(3, `dedup_${channel.id}`);
|
|
|
|
// First check — all 3 are new
|
|
mockFetchRecentContent.mockResolvedValueOnce(items);
|
|
await scheduler.checkChannel(channel);
|
|
|
|
const contentAfterFirst = await getContentByChannelId(db, channel.id);
|
|
expect(contentAfterFirst.length).toBe(3);
|
|
|
|
// Second check — same 3 items should be deduplicated
|
|
mockFetchRecentContent.mockResolvedValueOnce(items);
|
|
await scheduler.checkChannel(channel);
|
|
|
|
const contentAfterSecond = await getContentByChannelId(db, channel.id);
|
|
expect(contentAfterSecond.length).toBe(3); // No duplicates added
|
|
});
|
|
|
|
it('inserts only genuinely new items when some already exist', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
// First check — insert 2 items
|
|
const existing = makeCannedContent(2, `partial_${channel.id}`);
|
|
mockFetchRecentContent.mockResolvedValueOnce(existing);
|
|
await scheduler.checkChannel(channel);
|
|
|
|
// Second check — return the same 2 + 1 new
|
|
const combined = [
|
|
...existing,
|
|
{
|
|
platformContentId: `partial_${channel.id}_new_1`,
|
|
title: 'New Item',
|
|
url: 'https://www.youtube.com/watch?v=new1',
|
|
contentType: 'video' as const,
|
|
duration: 300,
|
|
thumbnailUrl: null,
|
|
},
|
|
];
|
|
mockFetchRecentContent.mockResolvedValueOnce(combined);
|
|
await scheduler.checkChannel(channel);
|
|
|
|
const content = await getContentByChannelId(db, channel.id);
|
|
expect(content.length).toBe(3); // 2 existing + 1 new
|
|
});
|
|
|
|
it('sets lastCheckStatus to "error" on generic error', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
mockFetchRecentContent.mockRejectedValueOnce(
|
|
new Error('Network timeout')
|
|
);
|
|
|
|
await scheduler.checkChannel(channel);
|
|
|
|
const updated = await getChannelById(db, channel.id);
|
|
expect(updated!.lastCheckStatus).toBe('error');
|
|
expect(updated!.lastCheckedAt).toBeTruthy();
|
|
});
|
|
|
|
it('sets lastCheckStatus to "rate_limited" on YtDlpError with isRateLimit', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
mockFetchRecentContent.mockRejectedValueOnce(
|
|
new YtDlpError(
|
|
'HTTP Error 429: Too Many Requests',
|
|
'ERROR: HTTP Error 429: Too Many Requests',
|
|
1
|
|
)
|
|
);
|
|
|
|
await scheduler.checkChannel(channel);
|
|
|
|
const updated = await getChannelById(db, channel.id);
|
|
expect(updated!.lastCheckStatus).toBe('rate_limited');
|
|
});
|
|
|
|
it('calls rateLimiter.reportError on check failure', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
const reportErrorSpy = vi.spyOn(rateLimiter, 'reportError');
|
|
|
|
mockFetchRecentContent.mockRejectedValueOnce(new Error('fail'));
|
|
|
|
await scheduler.checkChannel(channel);
|
|
|
|
expect(reportErrorSpy).toHaveBeenCalledWith(channel.platform);
|
|
});
|
|
|
|
it('calls rateLimiter.reportSuccess on successful check', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
const reportSuccessSpy = vi.spyOn(rateLimiter, 'reportSuccess');
|
|
|
|
mockFetchRecentContent.mockResolvedValueOnce([]);
|
|
|
|
await scheduler.checkChannel(channel);
|
|
|
|
expect(reportSuccessSpy).toHaveBeenCalledWith(channel.platform);
|
|
});
|
|
|
|
// ── Return value tests ──
|
|
|
|
it('returns CheckChannelResult with correct newItems and totalFetched on success', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
mockFetchRecentContent.mockResolvedValueOnce(makeCannedContent(5, `rv_${channel.id}`));
|
|
|
|
const result = await scheduler.checkChannel(channel);
|
|
|
|
expect(result).toEqual({
|
|
channelId: channel.id,
|
|
channelName: channel.name,
|
|
newItems: 5,
|
|
totalFetched: 5,
|
|
status: 'success',
|
|
} satisfies CheckChannelResult);
|
|
});
|
|
|
|
it('returns status "error" when the platform source throws', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
mockFetchRecentContent.mockRejectedValueOnce(new Error('Network timeout'));
|
|
|
|
const result = await scheduler.checkChannel(channel);
|
|
|
|
expect(result.channelId).toBe(channel.id);
|
|
expect(result.channelName).toBe(channel.name);
|
|
expect(result.status).toBe('error');
|
|
expect(result.newItems).toBe(0);
|
|
expect(result.totalFetched).toBe(0);
|
|
});
|
|
|
|
it('returns status "rate_limited" when YtDlpError with isRateLimit is thrown', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
mockFetchRecentContent.mockRejectedValueOnce(
|
|
new YtDlpError(
|
|
'HTTP Error 429: Too Many Requests',
|
|
'ERROR: HTTP Error 429: Too Many Requests',
|
|
1
|
|
)
|
|
);
|
|
|
|
const result = await scheduler.checkChannel(channel);
|
|
|
|
expect(result.channelId).toBe(channel.id);
|
|
expect(result.status).toBe('rate_limited');
|
|
expect(result.newItems).toBe(0);
|
|
expect(result.totalFetched).toBe(0);
|
|
});
|
|
|
|
// ── Per-channel lock tests ──
|
|
|
|
it('returns "already_running" when the same channel is checked concurrently', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
// Mock a slow platform source — holds for 200ms
|
|
mockFetchRecentContent.mockImplementation(
|
|
() => new Promise((resolve) => setTimeout(() => resolve([]), 200))
|
|
);
|
|
|
|
// Start first check (will be in-flight for ~200ms)
|
|
const first = scheduler.checkChannel(channel);
|
|
|
|
// Immediately start second check — should hit the lock
|
|
const second = await scheduler.checkChannel(channel);
|
|
|
|
expect(second.status).toBe('already_running');
|
|
expect(second.channelId).toBe(channel.id);
|
|
expect(second.newItems).toBe(0);
|
|
expect(second.totalFetched).toBe(0);
|
|
|
|
// First check should complete normally
|
|
const firstResult = await first;
|
|
expect(firstResult.status).toBe('success');
|
|
});
|
|
|
|
it('releases the lock after completion — subsequent call works normally', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
mockFetchRecentContent.mockResolvedValueOnce(makeCannedContent(2, `lock_release_${channel.id}`));
|
|
|
|
// First call completes
|
|
const first = await scheduler.checkChannel(channel);
|
|
expect(first.status).toBe('success');
|
|
|
|
// Second call should NOT be locked out
|
|
mockFetchRecentContent.mockResolvedValueOnce([]);
|
|
const second = await scheduler.checkChannel(channel);
|
|
expect(second.status).toBe('success');
|
|
});
|
|
|
|
it('releases the lock even when the check fails', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
mockFetchRecentContent.mockRejectedValueOnce(new Error('Boom'));
|
|
|
|
// First call fails
|
|
const first = await scheduler.checkChannel(channel);
|
|
expect(first.status).toBe('error');
|
|
|
|
// Second call should NOT be locked out
|
|
mockFetchRecentContent.mockResolvedValueOnce([]);
|
|
const second = await scheduler.checkChannel(channel);
|
|
expect(second.status).toBe('success');
|
|
});
|
|
|
|
// ── onNewContent + monitored gate tests ──
|
|
|
|
it('calls onNewContent for each newly inserted (monitored) item', async () => {
|
|
const channel = await insertTestChannel();
|
|
const onNewContentSpy = vi.fn();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter, {
|
|
onNewContent: onNewContentSpy,
|
|
});
|
|
|
|
mockFetchRecentContent.mockResolvedValueOnce(
|
|
makeCannedContent(3, `on_new_${channel.id}`)
|
|
);
|
|
|
|
const result = await scheduler.checkChannel(channel);
|
|
|
|
expect(result.newItems).toBe(3);
|
|
expect(onNewContentSpy).toHaveBeenCalledTimes(3);
|
|
// Each call receives the content item ID (a positive integer)
|
|
for (const call of onNewContentSpy.mock.calls) {
|
|
expect(call[0]).toBeTypeOf('number');
|
|
expect(call[0]).toBeGreaterThan(0);
|
|
}
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('does NOT call onNewContent for items with monitored=false', async () => {
|
|
const channel = await insertTestChannel();
|
|
const onNewContentSpy = vi.fn();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter, {
|
|
onNewContent: onNewContentSpy,
|
|
});
|
|
|
|
// Spy on createContentItem to return items with monitored=false
|
|
const createSpy = vi.spyOn(contentRepo, 'createContentItem').mockResolvedValue({
|
|
id: 9999,
|
|
channelId: channel.id,
|
|
title: 'Unmonitored Item',
|
|
platformContentId: 'unmon_1',
|
|
url: 'https://example.com/unmon_1',
|
|
contentType: 'video',
|
|
duration: 600,
|
|
thumbnailUrl: null,
|
|
filePath: null,
|
|
fileSize: null,
|
|
format: null,
|
|
qualityMetadata: null,
|
|
status: 'monitored',
|
|
monitored: false,
|
|
publishedAt: null,
|
|
downloadedAt: null,
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
mockFetchRecentContent.mockResolvedValueOnce(
|
|
makeCannedContent(2, `unmon_${channel.id}`)
|
|
);
|
|
|
|
await scheduler.checkChannel(channel);
|
|
|
|
// createContentItem was called for each new item
|
|
expect(createSpy).toHaveBeenCalledTimes(2);
|
|
// But onNewContent was NOT called because monitored=false
|
|
expect(onNewContentSpy).not.toHaveBeenCalled();
|
|
|
|
createSpy.mockRestore();
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('passes publishedAt from platform metadata to createContentItem', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
const items: PlatformContentMetadata[] = [
|
|
{
|
|
platformContentId: `pub_${channel.id}_1`,
|
|
title: 'Item With PublishedAt',
|
|
url: 'https://www.youtube.com/watch?v=pub1',
|
|
contentType: 'video',
|
|
duration: 300,
|
|
thumbnailUrl: null,
|
|
publishedAt: '2024-06-15T00:00:00.000Z',
|
|
},
|
|
];
|
|
mockFetchRecentContent.mockResolvedValueOnce(items);
|
|
|
|
await scheduler.checkChannel(channel);
|
|
|
|
// Verify the item was stored with publishedAt
|
|
const content = await getContentByChannelId(db, channel.id);
|
|
const inserted = content.find(c => c.platformContentId === `pub_${channel.id}_1`);
|
|
expect(inserted).toBeDefined();
|
|
expect(inserted!.publishedAt).toBe('2024-06-15T00:00:00.000Z');
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
// ── monitoringMode-aware item creation tests ──
|
|
|
|
it("creates items with monitored=false when channel monitoringMode is 'none'", async () => {
|
|
const channel = await insertTestChannel({ monitoringMode: 'none', monitoringEnabled: false });
|
|
const onNewContentSpy = vi.fn();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter, {
|
|
onNewContent: onNewContentSpy,
|
|
});
|
|
|
|
mockFetchRecentContent.mockResolvedValueOnce(
|
|
makeCannedContent(2, `mode_none_${channel.id}`)
|
|
);
|
|
|
|
const result = await scheduler.checkChannel(channel);
|
|
expect(result.status).toBe('success');
|
|
expect(result.newItems).toBe(2);
|
|
|
|
// Verify items were created with monitored=false
|
|
const content = await getContentByChannelId(db, channel.id);
|
|
expect(content.length).toBe(2);
|
|
for (const item of content) {
|
|
expect(item.monitored).toBe(false);
|
|
}
|
|
|
|
// onNewContent should NOT be called because monitored=false
|
|
expect(onNewContentSpy).not.toHaveBeenCalled();
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it("creates items with monitored=true when channel monitoringMode is 'future'", async () => {
|
|
const channel = await insertTestChannel({ monitoringMode: 'future' });
|
|
const onNewContentSpy = vi.fn();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter, {
|
|
onNewContent: onNewContentSpy,
|
|
});
|
|
|
|
mockFetchRecentContent.mockResolvedValueOnce(
|
|
makeCannedContent(2, `mode_future_${channel.id}`)
|
|
);
|
|
|
|
const result = await scheduler.checkChannel(channel);
|
|
expect(result.status).toBe('success');
|
|
expect(result.newItems).toBe(2);
|
|
|
|
// Scheduler discovers *new* content (future), so 'future' → monitored=true
|
|
const content = await getContentByChannelId(db, channel.id);
|
|
expect(content.length).toBe(2);
|
|
for (const item of content) {
|
|
expect(item.monitored).toBe(true);
|
|
}
|
|
|
|
// onNewContent should be called because items are monitored
|
|
expect(onNewContentSpy).toHaveBeenCalledTimes(2);
|
|
|
|
scheduler.stop();
|
|
});
|
|
});
|
|
|
|
// ── addChannel() / removeChannel() ──
|
|
|
|
describe('addChannel() / removeChannel()', () => {
|
|
it('addChannel creates a new job', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
scheduler.addChannel(channel);
|
|
|
|
const state = scheduler.getState();
|
|
const channelState = state.channels.find(
|
|
(c) => c.channelId === channel.id
|
|
);
|
|
expect(channelState).toBeDefined();
|
|
expect(channelState!.nextRun).toBeInstanceOf(Date);
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('addChannel skips disabled channels', async () => {
|
|
const channel = await insertTestChannel({ monitoringEnabled: false });
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
scheduler.addChannel(channel);
|
|
|
|
const state = scheduler.getState();
|
|
const channelState = state.channels.find(
|
|
(c) => c.channelId === channel.id
|
|
);
|
|
expect(channelState).toBeUndefined();
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('removeChannel stops and removes the job', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
scheduler.addChannel(channel);
|
|
expect(
|
|
scheduler.getState().channels.find((c) => c.channelId === channel.id)
|
|
).toBeDefined();
|
|
|
|
scheduler.removeChannel(channel.id);
|
|
expect(
|
|
scheduler.getState().channels.find((c) => c.channelId === channel.id)
|
|
).toBeUndefined();
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('removeChannel is safe for non-existent channel', () => {
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
// Should not throw
|
|
scheduler.removeChannel(99999);
|
|
scheduler.stop();
|
|
});
|
|
});
|
|
|
|
// ── updateChannel() ──
|
|
|
|
describe('updateChannel()', () => {
|
|
it('removes old job and creates new one', async () => {
|
|
const channel = await insertTestChannel({ checkInterval: 60 });
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
scheduler.addChannel(channel);
|
|
expect(
|
|
scheduler.getState().channels.find((c) => c.channelId === channel.id)
|
|
).toBeDefined();
|
|
|
|
// Update with different interval
|
|
const updatedChannel: Channel = {
|
|
...channel,
|
|
checkInterval: 120,
|
|
};
|
|
scheduler.updateChannel(updatedChannel);
|
|
|
|
// Should still have a job for this channel
|
|
const channelState = scheduler.getState().channels.find(
|
|
(c) => c.channelId === channel.id
|
|
);
|
|
expect(channelState).toBeDefined();
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('removes job when monitoring is disabled', async () => {
|
|
const channel = await insertTestChannel({ monitoringEnabled: true });
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
scheduler.addChannel(channel);
|
|
|
|
const updatedChannel: Channel = {
|
|
...channel,
|
|
monitoringEnabled: false,
|
|
};
|
|
scheduler.updateChannel(updatedChannel);
|
|
|
|
const channelState = scheduler.getState().channels.find(
|
|
(c) => c.channelId === channel.id
|
|
);
|
|
expect(channelState).toBeUndefined();
|
|
|
|
scheduler.stop();
|
|
});
|
|
});
|
|
|
|
// ── getState() ──
|
|
|
|
describe('getState()', () => {
|
|
it('returns correct structure', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
scheduler.addChannel(channel);
|
|
const state = scheduler.getState();
|
|
|
|
expect(state).toHaveProperty('running');
|
|
expect(state).toHaveProperty('channelCount');
|
|
expect(state).toHaveProperty('channels');
|
|
expect(Array.isArray(state.channels)).toBe(true);
|
|
|
|
const cs = state.channels[0];
|
|
expect(cs).toHaveProperty('channelId');
|
|
expect(cs).toHaveProperty('channelName');
|
|
expect(cs).toHaveProperty('platform');
|
|
expect(cs).toHaveProperty('isRunning');
|
|
expect(cs).toHaveProperty('nextRun');
|
|
|
|
scheduler.stop();
|
|
});
|
|
|
|
it('reports running: false before start and after stop', async () => {
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
expect(scheduler.getState().running).toBe(false);
|
|
|
|
await scheduler.start();
|
|
expect(scheduler.getState().running).toBe(true);
|
|
|
|
scheduler.stop();
|
|
expect(scheduler.getState().running).toBe(false);
|
|
});
|
|
});
|
|
|
|
// ── stop() ──
|
|
|
|
describe('stop()', () => {
|
|
it('stops all jobs and clears state', async () => {
|
|
const channel = await insertTestChannel();
|
|
const scheduler = new SchedulerService(db, registry, rateLimiter);
|
|
|
|
scheduler.addChannel(channel);
|
|
expect(scheduler.getState().channelCount).toBeGreaterThanOrEqual(1);
|
|
|
|
scheduler.stop();
|
|
expect(scheduler.getState().channelCount).toBe(0);
|
|
expect(scheduler.getState().channels).toEqual([]);
|
|
expect(scheduler.getState().running).toBe(false);
|
|
});
|
|
});
|
|
});
|