diff --git a/src/index.ts b/src/index.ts index 854899a..598ac22 100644 --- a/src/index.ts +++ b/src/index.ts @@ -148,6 +148,7 @@ async function main(): Promise { ); }); }, + eventBus, }); // Attach scheduler to server so routes can notify it diff --git a/src/server/routes/websocket.ts b/src/server/routes/websocket.ts index d3fc0a9..77412d3 100644 --- a/src/server/routes/websocket.ts +++ b/src/server/routes/websocket.ts @@ -1,7 +1,16 @@ import websocket from '@fastify/websocket'; import type { FastifyInstance } from 'fastify'; import type { WebSocket } from 'ws'; -import type { DownloadEventBus, DownloadProgressPayload, DownloadCompletePayload, DownloadFailedPayload } from '../../services/event-bus'; +import type { + DownloadEventBus, + DownloadProgressPayload, + DownloadCompletePayload, + DownloadFailedPayload, + ScanStartedPayload, + ScanItemDiscoveredPayload, + ScanCompletePayload, + ScanErrorPayload, +} from '../../services/event-bus'; /** * WebSocket route plugin. @@ -39,16 +48,43 @@ export async function websocketRoutes( sendJson(socket, { type: 'download:failed', ...data }); }; - // Subscribe to event bus + // Subscribe to download events eventBus.onDownload('download:progress', onProgress); eventBus.onDownload('download:complete', onComplete); eventBus.onDownload('download:failed', onFailed); + // Create listeners for scan event types + const onScanStarted = (data: ScanStartedPayload) => { + sendJson(socket, { type: 'scan:started', ...data }); + }; + + const onScanItemDiscovered = (data: ScanItemDiscoveredPayload) => { + sendJson(socket, { type: 'scan:item-discovered', ...data }); + }; + + const onScanComplete = (data: ScanCompletePayload) => { + sendJson(socket, { type: 'scan:complete', ...data }); + }; + + const onScanError = (data: ScanErrorPayload) => { + sendJson(socket, { type: 'scan:error', ...data }); + }; + + // Subscribe to scan events + eventBus.onScan('scan:started', onScanStarted); + eventBus.onScan('scan:item-discovered', onScanItemDiscovered); + eventBus.onScan('scan:complete', onScanComplete); + eventBus.onScan('scan:error', onScanError); + // Cleanup on disconnect const cleanup = () => { eventBus.offDownload('download:progress', onProgress); eventBus.offDownload('download:complete', onComplete); eventBus.offDownload('download:failed', onFailed); + eventBus.offScan('scan:started', onScanStarted); + eventBus.offScan('scan:item-discovered', onScanItemDiscovered); + eventBus.offScan('scan:complete', onScanComplete); + eventBus.offScan('scan:error', onScanError); console.log('[websocket] client disconnected'); }; diff --git a/src/services/event-bus.ts b/src/services/event-bus.ts index e529738..ad77c55 100644 --- a/src/services/event-bus.ts +++ b/src/services/event-bus.ts @@ -1,6 +1,7 @@ import { EventEmitter } from 'node:events'; +import type { ContentItem } from '../types/index'; -// ── Event Payload Types ── +// ── Download Event Payload Types ── export interface DownloadProgressPayload { contentItemId: number; @@ -18,7 +19,33 @@ export interface DownloadFailedPayload { error: string; } -// ── Event Map ── +// ── Scan Event Payload Types ── + +export interface ScanStartedPayload { + channelId: number; + channelName: string; +} + +export interface ScanItemDiscoveredPayload { + channelId: number; + channelName: string; + item: ContentItem; +} + +export interface ScanCompletePayload { + channelId: number; + channelName: string; + newItems: number; + totalFetched: number; +} + +export interface ScanErrorPayload { + channelId: number; + channelName: string; + error: string; +} + +// ── Event Maps ── export interface DownloadEventMap { 'download:progress': [DownloadProgressPayload]; @@ -26,17 +53,23 @@ export interface DownloadEventMap { 'download:failed': [DownloadFailedPayload]; } +export interface ScanEventMap { + 'scan:started': [ScanStartedPayload]; + 'scan:item-discovered': [ScanItemDiscoveredPayload]; + 'scan:complete': [ScanCompletePayload]; + 'scan:error': [ScanErrorPayload]; +} + // ── Typed Event Bus ── /** - * Typed EventEmitter for download events. - * Decouples download progress producers (DownloadService) from + * Typed EventEmitter for download and scan events. + * Decouples event producers (DownloadService, SchedulerService) from * consumers (WebSocket route, logging, etc). */ -export class DownloadEventBus extends EventEmitter { - /** - * Emit a typed download event. - */ +export class EventBus extends EventEmitter { + // ── Download events ── + emitDownload( event: K, ...args: DownloadEventMap[K] @@ -44,9 +77,6 @@ export class DownloadEventBus extends EventEmitter { return this.emit(event, ...args); } - /** - * Subscribe to a typed download event. - */ onDownload( event: K, listener: (...args: DownloadEventMap[K]) => void @@ -54,13 +84,38 @@ export class DownloadEventBus extends EventEmitter { return this.on(event, listener as (...args: unknown[]) => void); } - /** - * Unsubscribe from a typed download event. - */ offDownload( event: K, listener: (...args: DownloadEventMap[K]) => void ): this { return this.off(event, listener as (...args: unknown[]) => void); } + + // ── Scan events ── + + emitScan( + event: K, + ...args: ScanEventMap[K] + ): boolean { + return this.emit(event, ...args); + } + + onScan( + event: K, + listener: (...args: ScanEventMap[K]) => void + ): this { + return this.on(event, listener as (...args: unknown[]) => void); + } + + offScan( + event: K, + listener: (...args: ScanEventMap[K]) => void + ): this { + return this.off(event, listener as (...args: unknown[]) => void); + } } + +/** @deprecated Use EventBus instead. */ +export const DownloadEventBus = EventBus; +/** @deprecated Use EventBus instead. */ +export type DownloadEventBus = EventBus; diff --git a/src/services/scheduler.ts b/src/services/scheduler.ts index 5949ec7..d117d6d 100644 --- a/src/services/scheduler.ts +++ b/src/services/scheduler.ts @@ -5,6 +5,7 @@ import type { Channel, Platform, PlatformContentMetadata } from '../types/index' import type { PlatformRegistry, FetchRecentContentOptions } from '../sources/platform-source'; import type { RateLimiter } from './rate-limiter'; import { YtDlpError } from '../sources/yt-dlp'; +import type { EventBus } from './event-bus'; import { getEnabledChannels, updateChannel, @@ -45,6 +46,8 @@ export interface CheckChannelResult { export interface SchedulerOptions { /** Called when a new content item is inserted — used to auto-enqueue for download. */ onNewContent?: (contentItemId: number) => void; + /** Event bus for broadcasting scan lifecycle events to WebSocket clients. */ + eventBus?: EventBus; } // ── Scheduler Service ── @@ -61,6 +64,7 @@ export class SchedulerService { private readonly platformRegistry: PlatformRegistry; private readonly rateLimiter: RateLimiter; private readonly onNewContent?: (contentItemId: number) => void; + private readonly eventBus?: EventBus; private readonly jobs = new Map(); private readonly channelCache = new Map(); private readonly activeChecks = new Set(); @@ -76,6 +80,7 @@ export class SchedulerService { this.platformRegistry = platformRegistry; this.rateLimiter = rateLimiter; this.onNewContent = options?.onNewContent; + this.eventBus = options?.eventBus; } /** @@ -166,6 +171,12 @@ export class SchedulerService { this.activeChecks.add(channel.id); + // Emit scan:started before any async work + this.eventBus?.emitScan('scan:started', { + channelId: channel.id, + channelName: channel.name, + }); + console.log( `[scheduler] Checking channel ${channel.id} ("${channel.name}") on ${channel.platform}` ); @@ -225,6 +236,12 @@ export class SchedulerService { }); if (created) { insertedCount++; + // Broadcast the new item to WebSocket clients + this.eventBus?.emitScan('scan:item-discovered', { + channelId: channel.id, + channelName: channel.name, + item: created, + }); // Only auto-enqueue monitored items if (this.onNewContent && created.monitored) { this.onNewContent(created.id); @@ -244,6 +261,13 @@ export class SchedulerService { `[scheduler] Check complete for channel ${channel.id}: ${insertedCount} new items (${items.length} fetched, ${existingIds.size} existing)` ); + this.eventBus?.emitScan('scan:complete', { + channelId: channel.id, + channelName: channel.name, + newItems: insertedCount, + totalFetched: items.length, + }); + return { channelId: channel.id, channelName: channel.name, @@ -272,6 +296,12 @@ export class SchedulerService { this.rateLimiter.reportError(channel.platform); + this.eventBus?.emitScan('scan:error', { + channelId: channel.id, + channelName: channel.name, + error: err instanceof Error ? err.message : String(err), + }); + console.error( `[scheduler] Check failed for channel ${channel.id} ("${channel.name}"): ${status}`, err instanceof Error ? err.message : err