feat: real-time scan streaming with fire-and-forget API and cancel support

- Scan endpoint returns 202 immediately, runs in background
- Items appear in real-time via WebSocket scan:item-discovered events
- Phase 1 (fast flat-playlist) runs first with discoveryOnly flag
- Phase 2 (slow enrichment) runs as background post-scan pass
- Added POST /api/v1/channel/:id/scan-cancel endpoint
- AbortController support in scheduler for scan cancellation
- Frontend: Scan button toggles to Stop button during scan
- Frontend: Live item count shown during scanning
- Frontend: useCancelScan hook for cancel functionality
- Moved tubearr config to local Docker volume (SQLite on CIFS fix)
This commit is contained in:
jlightner 2026-04-03 21:43:23 +00:00
parent 24dbf79ac0
commit 4546ddb4ea
6 changed files with 259 additions and 43 deletions

View file

@ -86,9 +86,9 @@ export function useDeleteChannel() {
export interface ScanChannelResult {
channelId: number;
channelName: string;
newItems: number;
totalFetched: number;
status: 'success' | 'error' | 'rate_limited' | 'already_running';
newItems?: number;
totalFetched?: number;
status: 'started' | 'success' | 'error' | 'rate_limited' | 'already_running';
}
export interface ScanAllResult {
@ -98,18 +98,22 @@ export interface ScanAllResult {
// ── Scan Mutations ──
/** Trigger a manual scan for a single channel. */
/**
* Trigger a manual scan for a single channel.
* Returns immediately with status 'started' progress is streamed via WebSocket.
*/
export function useScanChannel(id: number) {
const queryClient = useQueryClient();
return useMutation({
mutationFn: () =>
apiClient.post<ScanChannelResult>(`/api/v1/channel/${id}/scan`),
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: channelKeys.all });
queryClient.invalidateQueries({ queryKey: channelKeys.detail(id) });
queryClient.invalidateQueries({ queryKey: contentKeys.byChannel(id) });
},
});
}
/** Cancel an in-progress scan for a channel. */
export function useCancelScan(id: number) {
return useMutation({
mutationFn: () =>
apiClient.post<{ channelId: number; cancelled: boolean }>(`/api/v1/channel/${id}/scan-cancel`),
});
}

View file

@ -19,10 +19,11 @@ import {
Music,
RefreshCw,
Save,
Square,
Trash2,
Users,
} from 'lucide-react';
import { useChannel, useUpdateChannel, useDeleteChannel, useScanChannel, useSetMonitoringMode } from '../api/hooks/useChannels';
import { useChannel, useUpdateChannel, useDeleteChannel, useScanChannel, useCancelScan, useSetMonitoringMode } from '../api/hooks/useChannels';
import { useChannelContentPaginated, useDownloadContent, useToggleMonitored, useBulkMonitored, useCollectMonitored, type ChannelContentFilters } from '../api/hooks/useContent';
import { useChannelPlaylists, useRefreshPlaylists } from '../api/hooks/usePlaylists';
import { useFormatProfiles } from '../api/hooks/useFormatProfiles';
@ -166,6 +167,7 @@ export function ChannelDetail() {
const deleteChannel = useDeleteChannel();
const downloadContent = useDownloadContent();
const scanChannel = useScanChannel(channelId);
const cancelScan = useCancelScan(channelId);
const collectMonitored = useCollectMonitored(channelId);
const setMonitoringMode = useSetMonitoringMode(channelId);
const toggleMonitored = useToggleMonitored(channelId);
@ -173,7 +175,7 @@ export function ChannelDetail() {
const bulkMonitored = useBulkMonitored(channelId);
// ── Scan state (WebSocket-driven) ──
const { scanning: scanInProgress, newItemCount: _scanNewItemCount } = useScanProgress(channelId);
const { scanning: scanInProgress, newItemCount: scanNewItemCount } = useScanProgress(channelId);
// ── Local state ──
const [showDeleteConfirm, setShowDeleteConfirm] = useState(false);
@ -256,16 +258,8 @@ export function ChannelDetail() {
onSuccess: (result) => {
if (result.status === 'already_running') {
toast('Scan already in progress', 'info');
} else if (result.status === 'rate_limited') {
toast('Rate limited — try again later', 'info');
} else if (result.status === 'error') {
toast('Scan failed — check server logs', 'error');
} else {
// Scan completed synchronously (fast — small channel or cached)
const msg = result.newItems > 0
? `Found ${result.newItems} new item${result.newItems === 1 ? '' : 's'}`
: 'No new content';
toast(msg, 'success');
} else if (result.status === 'started') {
toast('Scan started — items will appear as they\'re found', 'success');
}
},
onError: (err) => {
@ -274,6 +268,21 @@ export function ChannelDetail() {
});
}, [scanChannel, toast]);
const handleCancelScan = useCallback(() => {
cancelScan.mutate(undefined, {
onSuccess: (result) => {
if (result.cancelled) {
toast('Scan cancelled', 'info');
} else {
toast('No scan in progress', 'info');
}
},
onError: (err) => {
toast(err instanceof Error ? err.message : 'Failed to cancel scan', 'error');
},
});
}, [cancelScan, toast]);
const handleCollect = useCallback(() => {
collectMonitored.mutate(undefined, {
onSuccess: (result) => {
@ -973,18 +982,20 @@ export function ChannelDetail() {
</select>
<button
onClick={handleScan}
disabled={scanChannel.isPending || scanInProgress}
title={scanInProgress ? 'Scan in progress…' : 'Refresh & Scan'}
className="btn btn-ghost"
style={{ padding: '4px 10px', fontSize: 'var(--font-size-xs)', opacity: (scanChannel.isPending || scanInProgress) ? 0.6 : 1 }}
onClick={scanInProgress ? handleCancelScan : handleScan}
disabled={scanChannel.isPending || cancelScan.isPending}
title={scanInProgress ? 'Cancel scan' : 'Refresh & Scan'}
className={scanInProgress ? 'btn btn-danger' : 'btn btn-ghost'}
style={{ padding: '4px 10px', fontSize: 'var(--font-size-xs)', opacity: (scanChannel.isPending || cancelScan.isPending) ? 0.6 : 1 }}
>
{(scanChannel.isPending || scanInProgress) ? (
{scanInProgress ? (
<Square size={12} />
) : scanChannel.isPending ? (
<Loader size={12} style={{ animation: 'spin 1s linear infinite' }} />
) : (
<RefreshCw size={12} />
)}
{scanInProgress ? 'Scanning…' : 'Scan'}
{scanInProgress ? `Stop (${scanNewItemCount})` : 'Scan'}
</button>
<button
@ -1284,18 +1295,20 @@ export function ChannelDetail() {
</span>
<div style={{ display: 'flex', alignItems: 'center', gap: 'var(--space-2)' }}>
<button
onClick={handleScan}
disabled={scanChannel.isPending || scanInProgress}
title={scanInProgress ? 'Scan in progress…' : 'Refresh & Scan'}
className="btn btn-ghost"
style={{ opacity: (scanChannel.isPending || scanInProgress) ? 0.6 : 1 }}
onClick={scanInProgress ? handleCancelScan : handleScan}
disabled={scanChannel.isPending || cancelScan.isPending}
title={scanInProgress ? 'Cancel scan' : 'Refresh & Scan'}
className={scanInProgress ? 'btn btn-danger' : 'btn btn-ghost'}
style={{ opacity: (scanChannel.isPending || cancelScan.isPending) ? 0.6 : 1 }}
>
{(scanChannel.isPending || scanInProgress) ? (
{scanInProgress ? (
<Square size={14} />
) : scanChannel.isPending ? (
<Loader size={14} style={{ animation: 'spin 1s linear infinite' }} />
) : (
<RefreshCw size={14} />
)}
{scanInProgress ? 'Scanning…' : scanChannel.isPending ? 'Scanning…' : 'Scan'}
{scanInProgress ? `Stop Scan (${scanNewItemCount} found)` : 'Scan'}
</button>
<button
onClick={handleCollect}

View file

@ -101,8 +101,61 @@ export async function scanRoutes(fastify: FastifyInstance): Promise<void> {
});
}
const result = await fastify.scheduler.checkChannel(channel);
return result;
// Check if already scanning — return immediately
if (fastify.scheduler.isScanning(id)) {
return reply.status(200).send({
channelId: id,
channelName: channel.name,
newItems: 0,
totalFetched: 0,
status: 'already_running',
});
}
// Fire-and-forget: start the scan in the background
// Progress is streamed via WebSocket scan events
console.log(`[scan] Fire-and-forget scan for channel ${id} — returning 202`);
fastify.scheduler.checkChannel(channel).catch((err) => {
fastify.log.error(
{ err, channelId: id },
'[scan] Background scan failed for channel %d',
id
);
});
reply.status(202).send({
channelId: id,
channelName: channel.name,
status: 'started',
});
return;
}
);
// ── POST /api/v1/channel/:id/scan-cancel ──
fastify.post<{ Params: { id: string } }>(
'/api/v1/channel/:id/scan-cancel',
async (request, reply) => {
const id = parseInt(request.params.id, 10);
if (isNaN(id)) {
return reply.status(400).send({
statusCode: 400,
error: 'Bad Request',
message: 'Channel ID must be a number',
});
}
if (!fastify.scheduler) {
return reply.status(503).send({
statusCode: 503,
error: 'Service Unavailable',
message: 'Scheduler is not running',
});
}
const cancelled = fastify.scheduler.cancelScan(id);
return { channelId: id, cancelled };
}
);

View file

@ -2,7 +2,7 @@ import { Cron } from 'croner';
import type { LibSQLDatabase } from 'drizzle-orm/libsql';
import type * as schema from '../db/schema/index';
import type { Channel, Platform, PlatformContentMetadata } from '../types/index';
import type { PlatformRegistry, FetchRecentContentOptions } from '../sources/platform-source';
import type { PlatformRegistry, PlatformSource, FetchRecentContentOptions } from '../sources/platform-source';
import type { RateLimiter } from './rate-limiter';
import { YtDlpError } from '../sources/yt-dlp';
import type { EventBus } from './event-bus';
@ -13,6 +13,8 @@ import {
import {
createContentItem,
getRecentContentIds,
getContentByPlatformContentId,
updateContentItem,
} from '../db/repositories/content-repository';
import { getPlatformSettings } from '../db/repositories/platform-settings-repository';
@ -68,6 +70,7 @@ export class SchedulerService {
private readonly jobs = new Map<number, Cron>();
private readonly channelCache = new Map<number, Channel>();
private readonly activeChecks = new Set<number>();
private readonly activeAbortControllers = new Map<number, AbortController>();
private running = false;
constructor(
@ -154,7 +157,7 @@ export class SchedulerService {
*
* Returns a structured result with item counts and status.
*/
async checkChannel(channel: Channel): Promise<CheckChannelResult> {
async checkChannel(channel: Channel, signal?: AbortSignal): Promise<CheckChannelResult> {
// Per-channel lock — reject overlap before any async work
if (this.activeChecks.has(channel.id)) {
console.log(
@ -169,7 +172,16 @@ export class SchedulerService {
};
}
// Create AbortController for this scan if no external signal provided
const abortController = new AbortController();
const effectiveSignal = signal ?? abortController.signal;
// Link external signal to our controller if provided
if (signal) {
signal.addEventListener('abort', () => abortController.abort(signal.reason), { once: true });
}
this.activeChecks.add(channel.id);
this.activeAbortControllers.set(channel.id, abortController);
// Emit scan:started before any async work
this.eventBus?.emitScan('scan:started', {
@ -203,11 +215,13 @@ export class SchedulerService {
await getRecentContentIds(this.db, channel.id)
);
// 5. Fetch recent content with hybrid options
// 5. Fetch content — discovery-only (fast Phase 1, skip slow enrichment)
const fetchOptions: FetchRecentContentOptions = {
limit: scanLimit,
existingIds,
rateLimitDelay,
discoveryOnly: true,
signal: effectiveSignal,
};
const items: PlatformContentMetadata[] =
await source.fetchRecentContent(channel, fetchOptions);
@ -217,9 +231,28 @@ export class SchedulerService {
(item) => !existingIds.has(item.platformContentId)
);
// 7. Insert new items
// 7. Insert new items (check abort between each)
let insertedCount = 0;
for (const item of newItems) {
// Check if scan was cancelled
if (effectiveSignal.aborted) {
console.log(
`[scheduler] Scan cancelled for channel ${channel.id} ("${channel.name}") after ${insertedCount} items`
);
this.eventBus?.emitScan('scan:complete', {
channelId: channel.id,
channelName: channel.name,
newItems: insertedCount,
totalFetched: items.length,
});
return {
channelId: channel.id,
channelName: channel.name,
newItems: insertedCount,
totalFetched: items.length,
status: 'success',
};
}
// Scheduler discovers *new* content (future), so 'all' and 'future' → monitored
const monitored = channel.monitoringMode === 'all' || channel.monitoringMode === 'future';
const created = await createContentItem(this.db, {
@ -268,6 +301,19 @@ export class SchedulerService {
totalFetched: items.length,
});
// 9. Background Phase 2: enrich newly inserted items with full metadata
// This runs after the scan result is returned — enrichment updates DB records
// and triggers a final cache invalidation when done.
if (insertedCount > 0 && !effectiveSignal.aborted) {
this.enrichNewItems(channel, newItems, existingIds, rateLimitDelay, source, effectiveSignal)
.catch((err) => {
console.error(
`[scheduler] Background enrichment failed for channel ${channel.id}:`,
err instanceof Error ? err.message : err
);
});
}
return {
channelId: channel.id,
channelName: channel.name,
@ -316,6 +362,7 @@ export class SchedulerService {
};
} finally {
this.activeChecks.delete(channel.id);
this.activeAbortControllers.delete(channel.id);
}
}
@ -326,6 +373,18 @@ export class SchedulerService {
return this.activeChecks.has(channelId);
}
/**
* Cancel an in-progress scan for a channel.
* Returns true if a scan was running and was cancelled.
*/
cancelScan(channelId: number): boolean {
const controller = this.activeAbortControllers.get(channelId);
if (!controller) return false;
controller.abort('User cancelled');
console.log(`[scheduler] Scan cancel requested for channel ${channelId}`);
return true;
}
/**
* Get the current state of the scheduler for diagnostic inspection.
*/
@ -352,6 +411,76 @@ export class SchedulerService {
};
}
/**
* Background Phase 2: re-fetch full metadata for newly discovered items
* and update their DB records with enriched data (publishedAt, duration, etc).
* Runs after the main scan completes items are already visible to the user.
*/
private async enrichNewItems(
channel: Channel,
discoveredItems: PlatformContentMetadata[],
existingIds: Set<string>,
rateLimitDelay: number,
source: PlatformSource,
signal: AbortSignal,
): Promise<void> {
const newPlatformIds = discoveredItems
.filter((item) => !existingIds.has(item.platformContentId))
.map((item) => item.platformContentId);
if (newPlatformIds.length === 0) return;
console.log(
`[scheduler] Phase 2: enriching ${newPlatformIds.length} items for channel ${channel.id}`
);
// Re-fetch with enrichment (discoveryOnly: false)
const enrichedItems = await source.fetchRecentContent(channel, {
limit: newPlatformIds.length + existingIds.size,
existingIds,
rateLimitDelay,
discoveryOnly: false,
signal,
});
// Build lookup by platformContentId
const enrichedMap = new Map<string, PlatformContentMetadata>();
for (const item of enrichedItems) {
enrichedMap.set(item.platformContentId, item);
}
// Update DB records with enriched data
let enrichedCount = 0;
for (const platformContentId of newPlatformIds) {
if (signal.aborted) break;
const enriched = enrichedMap.get(platformContentId);
if (!enriched) continue;
// Look up the DB record by platformContentId
const dbItem = await getContentByPlatformContentId(this.db, channel.id, platformContentId);
if (!dbItem) continue;
// Only update if enrichment provides new data
const updates: Record<string, unknown> = {};
if (enriched.publishedAt && !dbItem.publishedAt) {
updates.publishedAt = enriched.publishedAt;
}
if (enriched.duration != null && dbItem.duration == null) {
updates.duration = enriched.duration;
}
if (Object.keys(updates).length > 0) {
await updateContentItem(this.db, dbItem.id, updates);
enrichedCount++;
}
}
console.log(
`[scheduler] Phase 2 complete for channel ${channel.id}: ${enrichedCount} items enriched`
);
}
// ── Internal ──
/**

View file

@ -16,6 +16,10 @@ export interface FetchRecentContentOptions {
existingIds?: Set<string>;
/** Milliseconds to wait between per-item enrichment calls. Default: 1000 */
rateLimitDelay?: number;
/** If true, skip Phase 2 enrichment and return discovery-phase results only. */
discoveryOnly?: boolean;
/** AbortSignal for cancellation support. */
signal?: AbortSignal;
}
// ── Interface ──

View file

@ -95,6 +95,8 @@ export class YouTubeSource implements PlatformSource {
const limit = options?.limit ?? 50;
const existingIds = options?.existingIds ?? new Set<string>();
const rateLimitDelay = options?.rateLimitDelay ?? 1000;
const discoveryOnly = options?.discoveryOnly ?? false;
const signal = options?.signal;
// ── Phase 1: Fast discovery via --flat-playlist ──
const flatResult = await execYtDlp(
@ -111,6 +113,11 @@ export class YouTubeSource implements PlatformSource {
const flatEntries = parseJsonLines(flatResult.stdout);
const discoveredItems = flatEntries.map((entry) => mapEntry(entry));
// If discovery-only, skip Phase 2 entirely — caller gets fast results
if (discoveryOnly) {
return discoveredItems;
}
// ── Phase 2: Enrich new items with upload_date ──
const newItems = discoveredItems.filter(
(item) => !existingIds.has(item.platformContentId)
@ -129,6 +136,12 @@ export class YouTubeSource implements PlatformSource {
const enrichedMap = new Map<string, PlatformContentMetadata>();
for (let i = 0; i < newItems.length; i++) {
// Check cancellation between enrichment calls
if (signal?.aborted) {
console.log(`[youtube] Phase 2 aborted after ${i} enrichments`);
break;
}
const item = newItems[i];
// Rate limit delay between enrichment calls (skip before first)