feat: Added typed scan event pipeline — EventBus emits scan:started/ite…

- "src/services/event-bus.ts"
- "src/services/scheduler.ts"
- "src/index.ts"
- "src/server/routes/websocket.ts"

GSD-Task: S05/T01
This commit is contained in:
jlightner 2026-04-03 07:03:12 +00:00
parent 1711389d9c
commit cdd1128632
4 changed files with 138 additions and 16 deletions

View file

@ -148,6 +148,7 @@ async function main(): Promise<void> {
); );
}); });
}, },
eventBus,
}); });
// Attach scheduler to server so routes can notify it // Attach scheduler to server so routes can notify it

View file

@ -1,7 +1,16 @@
import websocket from '@fastify/websocket'; import websocket from '@fastify/websocket';
import type { FastifyInstance } from 'fastify'; import type { FastifyInstance } from 'fastify';
import type { WebSocket } from 'ws'; import type { WebSocket } from 'ws';
import type { DownloadEventBus, DownloadProgressPayload, DownloadCompletePayload, DownloadFailedPayload } from '../../services/event-bus'; import type {
DownloadEventBus,
DownloadProgressPayload,
DownloadCompletePayload,
DownloadFailedPayload,
ScanStartedPayload,
ScanItemDiscoveredPayload,
ScanCompletePayload,
ScanErrorPayload,
} from '../../services/event-bus';
/** /**
* WebSocket route plugin. * WebSocket route plugin.
@ -39,16 +48,43 @@ export async function websocketRoutes(
sendJson(socket, { type: 'download:failed', ...data }); sendJson(socket, { type: 'download:failed', ...data });
}; };
// Subscribe to event bus // Subscribe to download events
eventBus.onDownload('download:progress', onProgress); eventBus.onDownload('download:progress', onProgress);
eventBus.onDownload('download:complete', onComplete); eventBus.onDownload('download:complete', onComplete);
eventBus.onDownload('download:failed', onFailed); eventBus.onDownload('download:failed', onFailed);
// Create listeners for scan event types
const onScanStarted = (data: ScanStartedPayload) => {
sendJson(socket, { type: 'scan:started', ...data });
};
const onScanItemDiscovered = (data: ScanItemDiscoveredPayload) => {
sendJson(socket, { type: 'scan:item-discovered', ...data });
};
const onScanComplete = (data: ScanCompletePayload) => {
sendJson(socket, { type: 'scan:complete', ...data });
};
const onScanError = (data: ScanErrorPayload) => {
sendJson(socket, { type: 'scan:error', ...data });
};
// Subscribe to scan events
eventBus.onScan('scan:started', onScanStarted);
eventBus.onScan('scan:item-discovered', onScanItemDiscovered);
eventBus.onScan('scan:complete', onScanComplete);
eventBus.onScan('scan:error', onScanError);
// Cleanup on disconnect // Cleanup on disconnect
const cleanup = () => { const cleanup = () => {
eventBus.offDownload('download:progress', onProgress); eventBus.offDownload('download:progress', onProgress);
eventBus.offDownload('download:complete', onComplete); eventBus.offDownload('download:complete', onComplete);
eventBus.offDownload('download:failed', onFailed); eventBus.offDownload('download:failed', onFailed);
eventBus.offScan('scan:started', onScanStarted);
eventBus.offScan('scan:item-discovered', onScanItemDiscovered);
eventBus.offScan('scan:complete', onScanComplete);
eventBus.offScan('scan:error', onScanError);
console.log('[websocket] client disconnected'); console.log('[websocket] client disconnected');
}; };

View file

@ -1,6 +1,7 @@
import { EventEmitter } from 'node:events'; import { EventEmitter } from 'node:events';
import type { ContentItem } from '../types/index';
// ── Event Payload Types ── // ── Download Event Payload Types ──
export interface DownloadProgressPayload { export interface DownloadProgressPayload {
contentItemId: number; contentItemId: number;
@ -18,7 +19,33 @@ export interface DownloadFailedPayload {
error: string; error: string;
} }
// ── Event Map ── // ── Scan Event Payload Types ──
export interface ScanStartedPayload {
channelId: number;
channelName: string;
}
export interface ScanItemDiscoveredPayload {
channelId: number;
channelName: string;
item: ContentItem;
}
export interface ScanCompletePayload {
channelId: number;
channelName: string;
newItems: number;
totalFetched: number;
}
export interface ScanErrorPayload {
channelId: number;
channelName: string;
error: string;
}
// ── Event Maps ──
export interface DownloadEventMap { export interface DownloadEventMap {
'download:progress': [DownloadProgressPayload]; 'download:progress': [DownloadProgressPayload];
@ -26,17 +53,23 @@ export interface DownloadEventMap {
'download:failed': [DownloadFailedPayload]; 'download:failed': [DownloadFailedPayload];
} }
export interface ScanEventMap {
'scan:started': [ScanStartedPayload];
'scan:item-discovered': [ScanItemDiscoveredPayload];
'scan:complete': [ScanCompletePayload];
'scan:error': [ScanErrorPayload];
}
// ── Typed Event Bus ── // ── Typed Event Bus ──
/** /**
* Typed EventEmitter for download events. * Typed EventEmitter for download and scan events.
* Decouples download progress producers (DownloadService) from * Decouples event producers (DownloadService, SchedulerService) from
* consumers (WebSocket route, logging, etc). * consumers (WebSocket route, logging, etc).
*/ */
export class DownloadEventBus extends EventEmitter { export class EventBus extends EventEmitter {
/** // ── Download events ──
* Emit a typed download event.
*/
emitDownload<K extends keyof DownloadEventMap>( emitDownload<K extends keyof DownloadEventMap>(
event: K, event: K,
...args: DownloadEventMap[K] ...args: DownloadEventMap[K]
@ -44,9 +77,6 @@ export class DownloadEventBus extends EventEmitter {
return this.emit(event, ...args); return this.emit(event, ...args);
} }
/**
* Subscribe to a typed download event.
*/
onDownload<K extends keyof DownloadEventMap>( onDownload<K extends keyof DownloadEventMap>(
event: K, event: K,
listener: (...args: DownloadEventMap[K]) => void listener: (...args: DownloadEventMap[K]) => void
@ -54,13 +84,38 @@ export class DownloadEventBus extends EventEmitter {
return this.on(event, listener as (...args: unknown[]) => void); return this.on(event, listener as (...args: unknown[]) => void);
} }
/**
* Unsubscribe from a typed download event.
*/
offDownload<K extends keyof DownloadEventMap>( offDownload<K extends keyof DownloadEventMap>(
event: K, event: K,
listener: (...args: DownloadEventMap[K]) => void listener: (...args: DownloadEventMap[K]) => void
): this { ): this {
return this.off(event, listener as (...args: unknown[]) => void); return this.off(event, listener as (...args: unknown[]) => void);
} }
// ── Scan events ──
emitScan<K extends keyof ScanEventMap>(
event: K,
...args: ScanEventMap[K]
): boolean {
return this.emit(event, ...args);
} }
onScan<K extends keyof ScanEventMap>(
event: K,
listener: (...args: ScanEventMap[K]) => void
): this {
return this.on(event, listener as (...args: unknown[]) => void);
}
offScan<K extends keyof ScanEventMap>(
event: K,
listener: (...args: ScanEventMap[K]) => void
): this {
return this.off(event, listener as (...args: unknown[]) => void);
}
}
/** @deprecated Use EventBus instead. */
export const DownloadEventBus = EventBus;
/** @deprecated Use EventBus instead. */
export type DownloadEventBus = EventBus;

View file

@ -5,6 +5,7 @@ import type { Channel, Platform, PlatformContentMetadata } from '../types/index'
import type { PlatformRegistry, FetchRecentContentOptions } from '../sources/platform-source'; import type { PlatformRegistry, FetchRecentContentOptions } from '../sources/platform-source';
import type { RateLimiter } from './rate-limiter'; import type { RateLimiter } from './rate-limiter';
import { YtDlpError } from '../sources/yt-dlp'; import { YtDlpError } from '../sources/yt-dlp';
import type { EventBus } from './event-bus';
import { import {
getEnabledChannels, getEnabledChannels,
updateChannel, updateChannel,
@ -45,6 +46,8 @@ export interface CheckChannelResult {
export interface SchedulerOptions { export interface SchedulerOptions {
/** Called when a new content item is inserted — used to auto-enqueue for download. */ /** Called when a new content item is inserted — used to auto-enqueue for download. */
onNewContent?: (contentItemId: number) => void; onNewContent?: (contentItemId: number) => void;
/** Event bus for broadcasting scan lifecycle events to WebSocket clients. */
eventBus?: EventBus;
} }
// ── Scheduler Service ── // ── Scheduler Service ──
@ -61,6 +64,7 @@ export class SchedulerService {
private readonly platformRegistry: PlatformRegistry; private readonly platformRegistry: PlatformRegistry;
private readonly rateLimiter: RateLimiter; private readonly rateLimiter: RateLimiter;
private readonly onNewContent?: (contentItemId: number) => void; private readonly onNewContent?: (contentItemId: number) => void;
private readonly eventBus?: EventBus;
private readonly jobs = new Map<number, Cron>(); private readonly jobs = new Map<number, Cron>();
private readonly channelCache = new Map<number, Channel>(); private readonly channelCache = new Map<number, Channel>();
private readonly activeChecks = new Set<number>(); private readonly activeChecks = new Set<number>();
@ -76,6 +80,7 @@ export class SchedulerService {
this.platformRegistry = platformRegistry; this.platformRegistry = platformRegistry;
this.rateLimiter = rateLimiter; this.rateLimiter = rateLimiter;
this.onNewContent = options?.onNewContent; this.onNewContent = options?.onNewContent;
this.eventBus = options?.eventBus;
} }
/** /**
@ -166,6 +171,12 @@ export class SchedulerService {
this.activeChecks.add(channel.id); this.activeChecks.add(channel.id);
// Emit scan:started before any async work
this.eventBus?.emitScan('scan:started', {
channelId: channel.id,
channelName: channel.name,
});
console.log( console.log(
`[scheduler] Checking channel ${channel.id} ("${channel.name}") on ${channel.platform}` `[scheduler] Checking channel ${channel.id} ("${channel.name}") on ${channel.platform}`
); );
@ -225,6 +236,12 @@ export class SchedulerService {
}); });
if (created) { if (created) {
insertedCount++; insertedCount++;
// Broadcast the new item to WebSocket clients
this.eventBus?.emitScan('scan:item-discovered', {
channelId: channel.id,
channelName: channel.name,
item: created,
});
// Only auto-enqueue monitored items // Only auto-enqueue monitored items
if (this.onNewContent && created.monitored) { if (this.onNewContent && created.monitored) {
this.onNewContent(created.id); this.onNewContent(created.id);
@ -244,6 +261,13 @@ export class SchedulerService {
`[scheduler] Check complete for channel ${channel.id}: ${insertedCount} new items (${items.length} fetched, ${existingIds.size} existing)` `[scheduler] Check complete for channel ${channel.id}: ${insertedCount} new items (${items.length} fetched, ${existingIds.size} existing)`
); );
this.eventBus?.emitScan('scan:complete', {
channelId: channel.id,
channelName: channel.name,
newItems: insertedCount,
totalFetched: items.length,
});
return { return {
channelId: channel.id, channelId: channel.id,
channelName: channel.name, channelName: channel.name,
@ -272,6 +296,12 @@ export class SchedulerService {
this.rateLimiter.reportError(channel.platform); this.rateLimiter.reportError(channel.platform);
this.eventBus?.emitScan('scan:error', {
channelId: channel.id,
channelName: channel.name,
error: err instanceof Error ? err.message : String(err),
});
console.error( console.error(
`[scheduler] Check failed for channel ${channel.id} ("${channel.name}"): ${status}`, `[scheduler] Check failed for channel ${channel.id} ("${channel.name}"): ${status}`,
err instanceof Error ? err.message : err err instanceof Error ? err.message : err