tubearr/src/services/scheduler.ts
jlightner f494d31e60 fix: raise default scan limit from 100 to 500, use 999 for initial scans
- Default scanLimit increased to 500 (was 100, missing most channel content)
- First scan (lastCheckedAt === null) uses max(scanLimit, 999) for full catalog
- Discovery timeout scales with limit: 60s base + 30s per 500 items
- Updated platform-settings-repository defaults to match
2026-04-03 22:07:24 +00:00

539 lines
17 KiB
TypeScript

import { Cron } from 'croner';
import type { LibSQLDatabase } from 'drizzle-orm/libsql';
import type * as schema from '../db/schema/index';
import type { Channel, Platform, PlatformContentMetadata } from '../types/index';
import type { PlatformRegistry, PlatformSource, FetchRecentContentOptions } from '../sources/platform-source';
import type { RateLimiter } from './rate-limiter';
import { YtDlpError } from '../sources/yt-dlp';
import type { EventBus } from './event-bus';
import {
getEnabledChannels,
updateChannel,
} from '../db/repositories/channel-repository';
import {
createContentItem,
getRecentContentIds,
getContentByPlatformContentId,
updateContentItem,
} from '../db/repositories/content-repository';
import { getPlatformSettings } from '../db/repositories/platform-settings-repository';
// ── Types ──
export interface ChannelJobState {
channelId: number;
channelName: string;
platform: Platform;
isRunning: boolean;
nextRun: Date | null;
lastCheckedAt: string | null;
lastCheckStatus: string | null;
}
export interface SchedulerState {
running: boolean;
channelCount: number;
channels: ChannelJobState[];
}
export interface CheckChannelResult {
channelId: number;
channelName: string;
newItems: number;
totalFetched: number;
status: 'success' | 'error' | 'rate_limited' | 'already_running';
}
/** Optional configuration for the scheduler service. */
export interface SchedulerOptions {
/** Called when a new content item is inserted — used to auto-enqueue for download. */
onNewContent?: (contentItemId: number) => void;
/** Event bus for broadcasting scan lifecycle events to WebSocket clients. */
eventBus?: EventBus;
}
// ── Scheduler Service ──
/**
* Manages per-channel cron jobs for content monitoring.
*
* Loads enabled channels from the database, creates Cron jobs that periodically
* check for new content via platform sources, deduplicates against existing
* records, and inserts new items with `monitored` status.
*/
export class SchedulerService {
private readonly db: LibSQLDatabase<typeof schema>;
private readonly platformRegistry: PlatformRegistry;
private readonly rateLimiter: RateLimiter;
private readonly onNewContent?: (contentItemId: number) => void;
private readonly eventBus?: EventBus;
private readonly jobs = new Map<number, Cron>();
private readonly channelCache = new Map<number, Channel>();
private readonly activeChecks = new Set<number>();
private readonly activeAbortControllers = new Map<number, AbortController>();
private running = false;
constructor(
db: LibSQLDatabase<typeof schema>,
platformRegistry: PlatformRegistry,
rateLimiter: RateLimiter,
options?: SchedulerOptions
) {
this.db = db;
this.platformRegistry = platformRegistry;
this.rateLimiter = rateLimiter;
this.onNewContent = options?.onNewContent;
this.eventBus = options?.eventBus;
}
/**
* Load all enabled channels and create cron jobs for each.
* Returns the number of channels loaded.
*/
async start(): Promise<number> {
const channels = await getEnabledChannels(this.db);
for (const channel of channels) {
this.createJob(channel);
}
this.running = true;
console.log(`[scheduler] Started with ${channels.length} channels`);
return channels.length;
}
/** Stop all active cron jobs. */
stop(): void {
for (const [channelId, job] of this.jobs) {
job.stop();
console.log(`[scheduler] Stopped job for channel ${channelId}`);
}
this.jobs.clear();
this.channelCache.clear();
this.running = false;
console.log('[scheduler] Stopped');
}
/** Create a cron job for a newly added channel. */
addChannel(channel: Channel): void {
if (!channel.monitoringEnabled) return;
this.createJob(channel);
console.log(
`[scheduler] Added job for channel ${channel.id} ("${channel.name}") — interval: ${channel.checkInterval}m`
);
}
/** Stop and remove the cron job for a deleted channel. */
removeChannel(channelId: number): void {
const job = this.jobs.get(channelId);
if (job) {
job.stop();
this.jobs.delete(channelId);
this.channelCache.delete(channelId);
console.log(`[scheduler] Removed job for channel ${channelId}`);
}
}
/** Update a channel's cron job (remove old, create new with updated interval). */
updateChannel(channel: Channel): void {
this.removeChannel(channel.id);
if (channel.monitoringEnabled) {
this.createJob(channel);
console.log(
`[scheduler] Updated job for channel ${channel.id} ("${channel.name}") — interval: ${channel.checkInterval}m`
);
}
}
/**
* Check a channel for new content.
*
* 1. Check per-channel lock (reject overlap)
* 2. Acquire rate limiter slot for the platform
* 3. Fetch recent content via platform source
* 4. Deduplicate against existing content
* 5. Insert new items with `monitored` status
* 6. Update channel's lastCheckedAt and lastCheckStatus
*
* Returns a structured result with item counts and status.
*/
async checkChannel(channel: Channel, signal?: AbortSignal): Promise<CheckChannelResult> {
// Per-channel lock — reject overlap before any async work
if (this.activeChecks.has(channel.id)) {
console.log(
`[scheduler] Skipping channel ${channel.id} ("${channel.name}") — already running`
);
return {
channelId: channel.id,
channelName: channel.name,
newItems: 0,
totalFetched: 0,
status: 'already_running',
};
}
// Create AbortController for this scan if no external signal provided
const abortController = new AbortController();
const effectiveSignal = signal ?? abortController.signal;
// Link external signal to our controller if provided
if (signal) {
signal.addEventListener('abort', () => abortController.abort(signal.reason), { once: true });
}
this.activeChecks.add(channel.id);
this.activeAbortControllers.set(channel.id, abortController);
// Emit scan:started before any async work
this.eventBus?.emitScan('scan:started', {
channelId: channel.id,
channelName: channel.name,
});
console.log(
`[scheduler] Checking channel ${channel.id} ("${channel.name}") on ${channel.platform}`
);
try {
// 1. Rate limit
await this.rateLimiter.acquire(channel.platform);
// 2. Get platform source
const source = this.platformRegistry.get(channel.platform);
if (!source) {
throw new Error(
`No platform source registered for "${channel.platform}"`
);
}
// 3. Load platform settings for scan limit and rate limit delay
const platformSettingsRow = await getPlatformSettings(this.db, channel.platform);
const baseScanLimit = platformSettingsRow?.scanLimit ?? 500;
const rateLimitDelay = platformSettingsRow?.rateLimitDelay ?? 1000;
// First scan (lastCheckedAt === null) → grab full catalog up to 999
const scanLimit = channel.lastCheckedAt === null
? Math.max(baseScanLimit, 999)
: baseScanLimit;
// 4. Load existing content IDs for dedup gating
const existingIds = new Set(
await getRecentContentIds(this.db, channel.id)
);
// 5. Fetch content — discovery-only (fast Phase 1, skip slow enrichment)
const fetchOptions: FetchRecentContentOptions = {
limit: scanLimit,
existingIds,
rateLimitDelay,
discoveryOnly: true,
signal: effectiveSignal,
};
const items: PlatformContentMetadata[] =
await source.fetchRecentContent(channel, fetchOptions);
// 6. Deduplicate — filter out items already known
const newItems = items.filter(
(item) => !existingIds.has(item.platformContentId)
);
// 7. Insert new items (check abort between each)
let insertedCount = 0;
for (const item of newItems) {
// Check if scan was cancelled
if (effectiveSignal.aborted) {
console.log(
`[scheduler] Scan cancelled for channel ${channel.id} ("${channel.name}") after ${insertedCount} items`
);
this.eventBus?.emitScan('scan:complete', {
channelId: channel.id,
channelName: channel.name,
newItems: insertedCount,
totalFetched: items.length,
});
return {
channelId: channel.id,
channelName: channel.name,
newItems: insertedCount,
totalFetched: items.length,
status: 'success',
};
}
// Scheduler discovers *new* content (future), so 'all' and 'future' → monitored
const monitored = channel.monitoringMode === 'all' || channel.monitoringMode === 'future';
const created = await createContentItem(this.db, {
channelId: channel.id,
title: item.title,
platformContentId: item.platformContentId,
url: item.url,
contentType: item.contentType,
duration: item.duration,
thumbnailUrl: item.thumbnailUrl,
publishedAt: item.publishedAt ?? null,
status: 'monitored',
monitored,
});
if (created) {
insertedCount++;
// Broadcast the new item to WebSocket clients
this.eventBus?.emitScan('scan:item-discovered', {
channelId: channel.id,
channelName: channel.name,
item: created,
});
// Only auto-enqueue monitored items
if (this.onNewContent && created.monitored) {
this.onNewContent(created.id);
}
}
}
// 8. Update channel status
await updateChannel(this.db, channel.id, {
lastCheckedAt: new Date().toISOString(),
lastCheckStatus: 'success',
});
this.rateLimiter.reportSuccess(channel.platform);
console.log(
`[scheduler] Check complete for channel ${channel.id}: ${insertedCount} new items (${items.length} fetched, ${existingIds.size} existing)`
);
this.eventBus?.emitScan('scan:complete', {
channelId: channel.id,
channelName: channel.name,
newItems: insertedCount,
totalFetched: items.length,
});
// 9. Background Phase 2: enrich newly inserted items with full metadata
// This runs after the scan result is returned — enrichment updates DB records
// and triggers a final cache invalidation when done.
if (insertedCount > 0 && !effectiveSignal.aborted) {
this.enrichNewItems(channel, newItems, existingIds, rateLimitDelay, source, effectiveSignal)
.catch((err) => {
console.error(
`[scheduler] Background enrichment failed for channel ${channel.id}:`,
err instanceof Error ? err.message : err
);
});
}
return {
channelId: channel.id,
channelName: channel.name,
newItems: insertedCount,
totalFetched: items.length,
status: 'success',
};
} catch (err) {
// Determine status based on error type
const isRateLimit =
err instanceof YtDlpError && err.isRateLimit;
const status = isRateLimit ? 'rate_limited' : 'error';
// Update channel status
try {
await updateChannel(this.db, channel.id, {
lastCheckedAt: new Date().toISOString(),
lastCheckStatus: status,
});
} catch (updateErr) {
console.error(
`[scheduler] Failed to update status for channel ${channel.id}:`,
updateErr
);
}
this.rateLimiter.reportError(channel.platform);
this.eventBus?.emitScan('scan:error', {
channelId: channel.id,
channelName: channel.name,
error: err instanceof Error ? err.message : String(err),
});
console.error(
`[scheduler] Check failed for channel ${channel.id} ("${channel.name}"): ${status}`,
err instanceof Error ? err.message : err
);
return {
channelId: channel.id,
channelName: channel.name,
newItems: 0,
totalFetched: 0,
status,
};
} finally {
this.activeChecks.delete(channel.id);
this.activeAbortControllers.delete(channel.id);
}
}
/**
* Check whether a channel scan is currently in progress.
*/
isScanning(channelId: number): boolean {
return this.activeChecks.has(channelId);
}
/**
* Cancel an in-progress scan for a channel.
* Returns true if a scan was running and was cancelled.
*/
cancelScan(channelId: number): boolean {
const controller = this.activeAbortControllers.get(channelId);
if (!controller) return false;
controller.abort('User cancelled');
console.log(`[scheduler] Scan cancel requested for channel ${channelId}`);
return true;
}
/**
* Get the current state of the scheduler for diagnostic inspection.
*/
getState(): SchedulerState {
const channels: ChannelJobState[] = [];
for (const [channelId, job] of this.jobs) {
const channel = this.channelCache.get(channelId);
channels.push({
channelId,
channelName: channel?.name ?? 'unknown',
platform: (channel?.platform ?? 'unknown') as Platform,
isRunning: job.isBusy(),
nextRun: job.nextRun() ?? null,
lastCheckedAt: channel?.lastCheckedAt ?? null,
lastCheckStatus: channel?.lastCheckStatus ?? null,
});
}
return {
running: this.running,
channelCount: this.jobs.size,
channels,
};
}
/**
* Background Phase 2: re-fetch full metadata for newly discovered items
* and update their DB records with enriched data (publishedAt, duration, etc).
* Runs after the main scan completes — items are already visible to the user.
*/
private async enrichNewItems(
channel: Channel,
discoveredItems: PlatformContentMetadata[],
existingIds: Set<string>,
rateLimitDelay: number,
source: PlatformSource,
signal: AbortSignal,
): Promise<void> {
const newPlatformIds = discoveredItems
.filter((item) => !existingIds.has(item.platformContentId))
.map((item) => item.platformContentId);
if (newPlatformIds.length === 0) return;
console.log(
`[scheduler] Phase 2: enriching ${newPlatformIds.length} items for channel ${channel.id}`
);
// Re-fetch with enrichment (discoveryOnly: false)
const enrichedItems = await source.fetchRecentContent(channel, {
limit: newPlatformIds.length + existingIds.size,
existingIds,
rateLimitDelay,
discoveryOnly: false,
signal,
});
// Build lookup by platformContentId
const enrichedMap = new Map<string, PlatformContentMetadata>();
for (const item of enrichedItems) {
enrichedMap.set(item.platformContentId, item);
}
// Update DB records with enriched data
let enrichedCount = 0;
for (const platformContentId of newPlatformIds) {
if (signal.aborted) break;
const enriched = enrichedMap.get(platformContentId);
if (!enriched) continue;
// Look up the DB record by platformContentId
const dbItem = await getContentByPlatformContentId(this.db, channel.id, platformContentId);
if (!dbItem) continue;
// Only update if enrichment provides new data
const updates: Record<string, unknown> = {};
if (enriched.publishedAt && !dbItem.publishedAt) {
updates.publishedAt = enriched.publishedAt;
}
if (enriched.duration != null && dbItem.duration == null) {
updates.duration = enriched.duration;
}
if (Object.keys(updates).length > 0) {
await updateContentItem(this.db, dbItem.id, updates);
enrichedCount++;
}
}
console.log(
`[scheduler] Phase 2 complete for channel ${channel.id}: ${enrichedCount} items enriched`
);
}
// ── Internal ──
/**
* Create a Cron job for a channel.
* Uses the interval option for arbitrary check intervals.
*/
private createJob(channel: Channel): void {
const intervalSeconds = channel.checkInterval * 60;
const cronPattern = minutesToCronPattern(channel.checkInterval);
const job = new Cron(
cronPattern,
{
protect: true, // Prevent overlapping runs
interval: intervalSeconds, // Minimum seconds between runs
},
async () => {
// Refresh channel from cache (it may have been updated)
const current = this.channelCache.get(channel.id) ?? channel;
await this.checkChannel(current);
}
);
this.jobs.set(channel.id, job);
this.channelCache.set(channel.id, channel);
}
}
// ── Helpers ──
/**
* Convert a check interval in minutes to a cron pattern.
* For intervals that divide evenly into 60, use `* /{n} * * * *`.
* For other intervals, use the closest reasonable pattern and rely on
* croner's `interval` option for exact timing.
*/
function minutesToCronPattern(minutes: number): string {
if (minutes <= 0) return '*/5 * * * *'; // Fallback: every 5 minutes
if (minutes < 60 && 60 % minutes === 0) {
return `*/${minutes} * * * *`;
}
if (minutes === 60) {
return '0 * * * *'; // Every hour
}
if (minutes < 60) {
// Arbitrary sub-hour interval — run every minute, use `interval` option
return '* * * * *';
}
// For intervals >= 60 minutes, run every hour and use `interval` option
return '0 * * * *';
}