diff --git a/.claude/settings.local.json b/.claude/settings.local.json index bfe07c9..ed09d09 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -14,7 +14,12 @@ "Bash(MSYS_NO_PATHCONV=1 docker inspect --format='{{json .State.Health}}' tubearr 2>&1)", "Bash(curl:*)", "Bash(python -c \"import sys,json; d=json.load\\(sys.stdin\\); print\\(f''''Channels: {len\\(d\\)}''''\\); [print\\(f'''' - {c[name]} \\({c[platform]}\\) monitoring={c.get\\(monitoringEnabled,?\\)} mode={c.get\\(monitoringMode,?\\)}''''\\) for c in d]\")", - "Bash(python -c \":*)" + "Bash(python -c \":*)", + "Bash(gh auth:*)", + "Bash(gh repo:*)", + "Bash(git remote:*)", + "Bash(git add:*)", + "Bash(git commit:*)" ] } } diff --git a/package-lock.json b/package-lock.json index b57c497..c20ab03 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,8 +9,10 @@ "version": "0.1.0", "dependencies": { "@fastify/cors": "^11.0.0", + "@fastify/middie": "^9.3.1", "@fastify/rate-limit": "^10.2.1", "@fastify/static": "^9.0.0", + "@fastify/websocket": "^11.2.0", "@libsql/client": "^0.14.0", "@tanstack/react-query": "^5.95.0", "croner": "^10.0.1", @@ -1369,6 +1371,29 @@ "dequal": "^2.0.3" } }, + "node_modules/@fastify/middie": { + "version": "9.3.1", + "resolved": "https://registry.npmjs.org/@fastify/middie/-/middie-9.3.1.tgz", + "integrity": "sha512-5uvKKF5zkocgsSiTyBU7AW2LmQ1Fwn4MNJ/8bORAuFwsQ0hqHjtpYaPqO79BkP4aqH5T7P3F2gJ3b3kerAIk7A==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT", + "dependencies": { + "@fastify/error": "^4.0.0", + "fastify-plugin": "^5.0.0", + "find-my-way": "^9.5.0", + "path-to-regexp": "^8.1.0", + "reusify": "^1.0.4" + } + }, "node_modules/@fastify/proxy-addr": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/@fastify/proxy-addr/-/proxy-addr-5.1.0.tgz", @@ -1457,6 +1482,27 @@ "glob": "^13.0.0" } }, + "node_modules/@fastify/websocket": { + "version": "11.2.0", + "resolved": "https://registry.npmjs.org/@fastify/websocket/-/websocket-11.2.0.tgz", + "integrity": "sha512-3HrDPbAG1CzUCqnslgJxppvzaAZffieOVbLp1DAy1huCSynUWPifSvfdEDUR8HlJLp3sp1A36uOM2tJogADS8w==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT", + "dependencies": { + "duplexify": "^4.1.3", + "fastify-plugin": "^5.0.0", + "ws": "^8.16.0" + } + }, "node_modules/@jridgewell/gen-mapping": { "version": "0.3.13", "resolved": "https://registry.npmjs.org/@jridgewell/gen-mapping/-/gen-mapping-0.3.13.tgz", @@ -2794,6 +2840,18 @@ } } }, + "node_modules/duplexify": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.3.tgz", + "integrity": "sha512-M3BmBhwJRZsSx38lZyhE53Csddgzl5R7xGJNk7CVddZD6CcmwMCH8J+7AprIrQKH7TonKxaCjcv27Qmf+sQ+oA==", + "license": "MIT", + "dependencies": { + "end-of-stream": "^1.4.1", + "inherits": "^2.0.3", + "readable-stream": "^3.1.1", + "stream-shift": "^1.0.2" + } + }, "node_modules/electron-to-chromium": { "version": "1.5.321", "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.321.tgz", @@ -2801,6 +2859,15 @@ "dev": true, "license": "ISC" }, + "node_modules/end-of-stream": { + "version": "1.4.5", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.5.tgz", + "integrity": "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==", + "license": "MIT", + "dependencies": { + "once": "^1.4.0" + } + }, "node_modules/env-paths": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/env-paths/-/env-paths-3.0.0.tgz", @@ -3497,6 +3564,15 @@ "node": ">=14.0.0" } }, + "node_modules/once": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", + "integrity": "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==", + "license": "ISC", + "dependencies": { + "wrappy": "1" + } + }, "node_modules/path-scurry": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/path-scurry/-/path-scurry-2.0.2.tgz", @@ -3513,6 +3589,16 @@ "url": "https://github.com/sponsors/isaacs" } }, + "node_modules/path-to-regexp": { + "version": "8.3.0", + "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-8.3.0.tgz", + "integrity": "sha512-7jdwVIRtsP8MYpdXSwOS0YdD0Du+qOoF/AEPIt88PcCFrZCzx41oxku1jD88hZBwbNUIEfpqvuhjFaMAqMTWnA==", + "license": "MIT", + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/express" + } + }, "node_modules/pathe": { "version": "2.0.3", "resolved": "https://registry.npmjs.org/pathe/-/pathe-2.0.3.tgz", @@ -3713,6 +3799,20 @@ "react-dom": ">=18" } }, + "node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "license": "MIT", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/real-require": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/real-require/-/real-require-0.2.0.tgz", @@ -3811,6 +3911,26 @@ "fsevents": "~2.3.2" } }, + "node_modules/safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, "node_modules/safe-regex2": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/safe-regex2/-/safe-regex2-5.1.0.tgz", @@ -3980,6 +4100,21 @@ "dev": true, "license": "MIT" }, + "node_modules/stream-shift": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", + "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", + "license": "MIT" + }, + "node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "license": "MIT", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, "node_modules/strip-literal": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/strip-literal/-/strip-literal-3.1.0.tgz", @@ -4588,6 +4723,12 @@ "browserslist": ">= 4.21.0" } }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", + "license": "MIT" + }, "node_modules/vite": { "version": "7.3.1", "resolved": "https://registry.npmjs.org/vite/-/vite-7.3.1.tgz", @@ -5234,6 +5375,12 @@ "node": ">=8" } }, + "node_modules/wrappy": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", + "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", + "license": "ISC" + }, "node_modules/ws": { "version": "8.20.0", "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", diff --git a/package.json b/package.json index 36d0bd7..46b225a 100644 --- a/package.json +++ b/package.json @@ -19,8 +19,10 @@ }, "dependencies": { "@fastify/cors": "^11.0.0", + "@fastify/middie": "^9.3.1", "@fastify/rate-limit": "^10.2.1", "@fastify/static": "^9.0.0", + "@fastify/websocket": "^11.2.0", "@libsql/client": "^0.14.0", "@tanstack/react-query": "^5.95.0", "croner": "^10.0.1", diff --git a/src/__tests__/download-spawn.test.ts b/src/__tests__/download-spawn.test.ts new file mode 100644 index 0000000..f4b0e95 --- /dev/null +++ b/src/__tests__/download-spawn.test.ts @@ -0,0 +1,370 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { EventEmitter } from 'node:events'; +import { mkdtempSync, rmSync, existsSync, writeFileSync, mkdirSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { initDatabaseAsync, closeDatabase } from '../db/index'; +import { runMigrations } from '../db/migrate'; +import { createChannel } from '../db/repositories/channel-repository'; +import { createContentItem } from '../db/repositories/content-repository'; +import { DownloadService } from '../services/download'; +import { DownloadEventBus, type DownloadProgressPayload } from '../services/event-bus'; +import { QualityAnalyzer } from '../services/quality-analyzer'; +import { FileOrganizer } from '../services/file-organizer'; +import { CookieManager } from '../services/cookie-manager'; +import { RateLimiter } from '../services/rate-limiter'; +import type { ContentItem, Channel } from '../types/index'; + +// ── Mocks ── + +// Mock spawnYtDlp from yt-dlp module to return a fake child process +const spawnYtDlpMock = vi.fn(); +// Also mock execYtDlp for backward compat +const execYtDlpMock = vi.fn(); +vi.mock('../sources/yt-dlp', async (importOriginal) => { + const actual = (await importOriginal()) as Record; + return { + ...actual, + execYtDlp: (...args: unknown[]) => execYtDlpMock(...args), + spawnYtDlp: (...args: unknown[]) => spawnYtDlpMock(...args), + }; +}); + +// Mock fs.stat for file size +const statMock = vi.fn(); +vi.mock('node:fs/promises', async (importOriginal) => { + const actual = (await importOriginal()) as Record; + return { + ...actual, + stat: (...args: unknown[]) => statMock(...args), + }; +}); + +// ── Helpers ── + +let tmpDir: string; +let db: Awaited>; +let testChannel: Channel; +let testContentItem: ContentItem; + +/** + * Create a fake ChildProcess (EventEmitter) that mimics spawn behavior. + * stdout/stderr are passthrough EventEmitters with readable stream interface. + */ +function createFakeChild() { + const child = new EventEmitter() as EventEmitter & { + stdout: EventEmitter; + stderr: EventEmitter; + kill: ReturnType; + }; + child.stdout = new EventEmitter(); + child.stderr = new EventEmitter(); + child.kill = vi.fn(); + + // readline.createInterface expects a readable stream with Symbol.asyncIterator + // and pipe/unpipe methods. Simulate the minimum needed. + const stdoutStream = child.stdout as EventEmitter & { + [Symbol.asyncIterator]?: () => AsyncIterableIterator; + pipe?: (...args: unknown[]) => unknown; + unpipe?: (...args: unknown[]) => unknown; + resume?: () => void; + pause?: () => void; + setEncoding?: (enc: string) => void; + readable?: boolean; + }; + stdoutStream.pipe = vi.fn().mockReturnThis(); + stdoutStream.unpipe = vi.fn(); + stdoutStream.resume = vi.fn(); + stdoutStream.pause = vi.fn(); + stdoutStream.setEncoding = vi.fn(); + stdoutStream.readable = true; + + return child; +} + +async function setupDb(): Promise { + tmpDir = mkdtempSync(join(tmpdir(), 'tubearr-spawn-test-')); + const dbPath = join(tmpDir, 'test.db'); + db = await initDatabaseAsync(dbPath); + await runMigrations(dbPath); + + testChannel = await createChannel(db, { + name: 'Test Channel', + platform: 'youtube', + platformId: 'UC_spawn_test', + url: 'https://www.youtube.com/channel/UC_spawn_test', + imageUrl: null, + formatProfileId: null, + monitoringEnabled: true, + checkInterval: 360, + metadata: null, + }); + + testContentItem = (await createContentItem(db, { + channelId: testChannel.id, + title: 'Test Video', + platformContentId: 'vid_spawn_test', + url: 'https://www.youtube.com/watch?v=spawn_test', + contentType: 'video', + duration: 600, + status: 'monitored', + }))!; +} + +function cleanup(): void { + closeDatabase(); + try { + if (tmpDir && existsSync(tmpDir)) { + rmSync(tmpDir, { recursive: true, force: true }); + } + } catch { + // Windows cleanup best-effort + } +} + +function createMockDeps() { + const mediaPath = join(tmpDir, 'media'); + const cookiePath = join(tmpDir, 'cookies'); + mkdirSync(mediaPath, { recursive: true }); + mkdirSync(cookiePath, { recursive: true }); + + const rateLimiter = new RateLimiter({ + youtube: { minIntervalMs: 0 }, + soundcloud: { minIntervalMs: 0 }, + }); + const fileOrganizer = new FileOrganizer(mediaPath); + const qualityAnalyzer = new QualityAnalyzer(); + const cookieManager = new CookieManager(cookiePath); + + vi.spyOn(rateLimiter, 'acquire'); + vi.spyOn(rateLimiter, 'reportSuccess'); + vi.spyOn(rateLimiter, 'reportError'); + + vi.spyOn(qualityAnalyzer, 'analyze').mockResolvedValue({ + actualResolution: '1920x1080', + actualCodec: 'h264', + actualBitrate: '5.0 Mbps', + containerFormat: 'mp4', + qualityWarnings: [], + }); + + return { rateLimiter, fileOrganizer, qualityAnalyzer, cookieManager }; +} + +// ── Tests ── + +describe('DownloadService with EventBus (spawn path)', () => { + beforeEach(async () => { + vi.clearAllMocks(); + await setupDb(); + }); + + afterEach(cleanup); + + it('emits download:progress events during download', async () => { + const deps = createMockDeps(); + const eventBus = new DownloadEventBus(); + const service = new DownloadService( + db, + deps.rateLimiter, + deps.fileOrganizer, + deps.qualityAnalyzer, + deps.cookieManager, + eventBus + ); + + const outputPath = join(tmpDir, 'media', 'youtube', 'Test Channel', 'Test Video.mp4'); + mkdirSync(join(tmpDir, 'media', 'youtube', 'Test Channel'), { recursive: true }); + writeFileSync(outputPath, 'fake video data'); + + const fakeChild = createFakeChild(); + spawnYtDlpMock.mockReturnValueOnce(fakeChild); + statMock.mockResolvedValueOnce({ size: 15_000_000 }); + + // Track progress events + const progressEvents: DownloadProgressPayload[] = []; + eventBus.onDownload('download:progress', (data) => { + progressEvents.push(data); + }); + + // Start download (it's async — will resolve after child closes) + const downloadPromise = service.downloadItem(testContentItem, testChannel); + + // Simulate yt-dlp progress output + await new Promise((r) => setTimeout(r, 10)); + fakeChild.stdout.emit('data', Buffer.from('[download] 25.0% of ~100.00MiB at 2.00MiB/s ETA 00:37\n')); + fakeChild.stdout.emit('data', Buffer.from('[download] 50.0% of ~100.00MiB at 2.50MiB/s ETA 00:20\n')); + fakeChild.stdout.emit('data', Buffer.from('[download] 100% of 100.00MiB at 3.00MiB/s ETA 00:00\n')); + + // Emit final path (non-progress line) + fakeChild.stdout.emit('data', Buffer.from(outputPath + '\n')); + + // Signal process exit + fakeChild.stdout.emit('end'); + fakeChild.emit('close', 0); + + const result = await downloadPromise; + + expect(result.status).toBe('downloaded'); + expect(progressEvents.length).toBe(3); + expect(progressEvents[0]).toEqual({ + contentItemId: testContentItem.id, + percent: 25.0, + speed: '2.00MiB/s', + eta: '00:37', + }); + expect(progressEvents[1].percent).toBe(50.0); + expect(progressEvents[2].percent).toBe(100); + }); + + it('emits download:complete on successful download', async () => { + const deps = createMockDeps(); + const eventBus = new DownloadEventBus(); + const service = new DownloadService( + db, + deps.rateLimiter, + deps.fileOrganizer, + deps.qualityAnalyzer, + deps.cookieManager, + eventBus + ); + + const outputPath = join(tmpDir, 'media', 'youtube', 'Test Channel', 'Test Video.mp4'); + mkdirSync(join(tmpDir, 'media', 'youtube', 'Test Channel'), { recursive: true }); + writeFileSync(outputPath, 'fake video'); + + const fakeChild = createFakeChild(); + spawnYtDlpMock.mockReturnValueOnce(fakeChild); + statMock.mockResolvedValueOnce({ size: 5_000_000 }); + + let completeEventReceived = false; + eventBus.onDownload('download:complete', (data) => { + expect(data.contentItemId).toBe(testContentItem.id); + completeEventReceived = true; + }); + + const downloadPromise = service.downloadItem(testContentItem, testChannel); + + await new Promise((r) => setTimeout(r, 10)); + fakeChild.stdout.emit('data', Buffer.from(outputPath + '\n')); + fakeChild.stdout.emit('end'); + fakeChild.emit('close', 0); + + await downloadPromise; + expect(completeEventReceived).toBe(true); + }); + + it('emits download:failed on yt-dlp error exit', async () => { + const deps = createMockDeps(); + const eventBus = new DownloadEventBus(); + const service = new DownloadService( + db, + deps.rateLimiter, + deps.fileOrganizer, + deps.qualityAnalyzer, + deps.cookieManager, + eventBus + ); + + const fakeChild = createFakeChild(); + spawnYtDlpMock.mockReturnValueOnce(fakeChild); + + let failedEvent: { contentItemId: number; error: string } | null = null; + eventBus.onDownload('download:failed', (data) => { + failedEvent = data; + }); + + const downloadPromise = service.downloadItem(testContentItem, testChannel); + + await new Promise((r) => setTimeout(r, 10)); + fakeChild.stderr.emit('data', Buffer.from('ERROR: Video not found')); + fakeChild.stdout.emit('end'); + fakeChild.emit('close', 1); + + await expect(downloadPromise).rejects.toThrow(); + expect(failedEvent).not.toBeNull(); + expect(failedEvent!.contentItemId).toBe(testContentItem.id); + expect(failedEvent!.error).toContain('Video not found'); + }); + + it('uses spawn (not exec) when event bus is provided', async () => { + const deps = createMockDeps(); + const eventBus = new DownloadEventBus(); + const service = new DownloadService( + db, + deps.rateLimiter, + deps.fileOrganizer, + deps.qualityAnalyzer, + deps.cookieManager, + eventBus + ); + + const outputPath = join(tmpDir, 'media', 'youtube', 'Test Channel', 'Test Video.mp4'); + mkdirSync(join(tmpDir, 'media', 'youtube', 'Test Channel'), { recursive: true }); + writeFileSync(outputPath, 'data'); + + const fakeChild = createFakeChild(); + spawnYtDlpMock.mockReturnValueOnce(fakeChild); + statMock.mockResolvedValueOnce({ size: 1000 }); + + const downloadPromise = service.downloadItem(testContentItem, testChannel); + + await new Promise((r) => setTimeout(r, 10)); + fakeChild.stdout.emit('data', Buffer.from(outputPath + '\n')); + fakeChild.stdout.emit('end'); + fakeChild.emit('close', 0); + + await downloadPromise; + + // spawnYtDlp was called, execYtDlp was not + expect(spawnYtDlpMock).toHaveBeenCalledOnce(); + expect(execYtDlpMock).not.toHaveBeenCalled(); + + // Verify --newline and --progress were added to spawn args + const args = spawnYtDlpMock.mock.calls[0][0] as string[]; + expect(args).toContain('--newline'); + expect(args).toContain('--progress'); + }); + + it('preserves final path parsing from non-progress stdout lines', async () => { + const deps = createMockDeps(); + const eventBus = new DownloadEventBus(); + const service = new DownloadService( + db, + deps.rateLimiter, + deps.fileOrganizer, + deps.qualityAnalyzer, + deps.cookieManager, + eventBus + ); + + const outputPath = join(tmpDir, 'media', 'youtube', 'Test Channel', 'Test Video.mkv'); + mkdirSync(join(tmpDir, 'media', 'youtube', 'Test Channel'), { recursive: true }); + writeFileSync(outputPath, 'data'); + + const fakeChild = createFakeChild(); + spawnYtDlpMock.mockReturnValueOnce(fakeChild); + statMock.mockResolvedValueOnce({ size: 2000 }); + + const downloadPromise = service.downloadItem(testContentItem, testChannel); + + await new Promise((r) => setTimeout(r, 10)); + // Emit mixed output: progress lines + info lines + final path + fakeChild.stdout.emit( + 'data', + Buffer.from( + '[download] 50.0% of ~200.00MiB at 5.00MiB/s ETA 00:20\n' + + '[download] 100% of 200.00MiB at 5.00MiB/s ETA 00:00\n' + + '[Merger] Merging formats into "Test Video.mkv"\n' + + outputPath + + '\n' + ) + ); + fakeChild.stdout.emit('end'); + fakeChild.emit('close', 0); + + const result = await downloadPromise; + expect(result.filePath).toBe(outputPath); + expect(result.format).toBe('mkv'); + }); +}); diff --git a/src/__tests__/event-bus.test.ts b/src/__tests__/event-bus.test.ts new file mode 100644 index 0000000..81d5c8f --- /dev/null +++ b/src/__tests__/event-bus.test.ts @@ -0,0 +1,143 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { + DownloadEventBus, + type DownloadProgressPayload, + type DownloadCompletePayload, + type DownloadFailedPayload, +} from '../services/event-bus'; + +describe('DownloadEventBus', () => { + let bus: DownloadEventBus; + + beforeEach(() => { + bus = new DownloadEventBus(); + }); + + describe('emitDownload / onDownload', () => { + it('delivers download:progress events to subscribers', () => { + const handler = vi.fn(); + bus.onDownload('download:progress', handler); + + const payload: DownloadProgressPayload = { + contentItemId: 42, + percent: 55.3, + speed: '1.23MiB/s', + eta: '00:42', + }; + + bus.emitDownload('download:progress', payload); + + expect(handler).toHaveBeenCalledOnce(); + expect(handler).toHaveBeenCalledWith(payload); + }); + + it('delivers download:complete events to subscribers', () => { + const handler = vi.fn(); + bus.onDownload('download:complete', handler); + + const payload: DownloadCompletePayload = { contentItemId: 7 }; + bus.emitDownload('download:complete', payload); + + expect(handler).toHaveBeenCalledOnce(); + expect(handler).toHaveBeenCalledWith(payload); + }); + + it('delivers download:failed events to subscribers', () => { + const handler = vi.fn(); + bus.onDownload('download:failed', handler); + + const payload: DownloadFailedPayload = { + contentItemId: 99, + error: 'Video not found', + }; + bus.emitDownload('download:failed', payload); + + expect(handler).toHaveBeenCalledOnce(); + expect(handler).toHaveBeenCalledWith(payload); + }); + }); + + describe('multiple listeners', () => { + it('notifies all subscribers for the same event', () => { + const handler1 = vi.fn(); + const handler2 = vi.fn(); + bus.onDownload('download:progress', handler1); + bus.onDownload('download:progress', handler2); + + const payload: DownloadProgressPayload = { + contentItemId: 1, + percent: 10, + speed: '500KiB/s', + eta: '01:30', + }; + + bus.emitDownload('download:progress', payload); + + expect(handler1).toHaveBeenCalledOnce(); + expect(handler2).toHaveBeenCalledOnce(); + }); + + it('does not deliver events across different event types', () => { + const progressHandler = vi.fn(); + const completeHandler = vi.fn(); + bus.onDownload('download:progress', progressHandler); + bus.onDownload('download:complete', completeHandler); + + bus.emitDownload('download:complete', { contentItemId: 1 }); + + expect(progressHandler).not.toHaveBeenCalled(); + expect(completeHandler).toHaveBeenCalledOnce(); + }); + }); + + describe('offDownload', () => { + it('unsubscribes a listener so it no longer receives events', () => { + const handler = vi.fn(); + bus.onDownload('download:progress', handler); + + // Emit once — should fire + bus.emitDownload('download:progress', { + contentItemId: 1, + percent: 50, + speed: '1MiB/s', + eta: '00:30', + }); + expect(handler).toHaveBeenCalledOnce(); + + // Unsubscribe + bus.offDownload('download:progress', handler); + + // Emit again — should NOT fire + bus.emitDownload('download:progress', { + contentItemId: 1, + percent: 75, + speed: '1MiB/s', + eta: '00:15', + }); + expect(handler).toHaveBeenCalledOnce(); // still 1 + }); + }); + + describe('emitDownload return value', () => { + it('returns true when there are listeners', () => { + bus.onDownload('download:progress', vi.fn()); + const result = bus.emitDownload('download:progress', { + contentItemId: 1, + percent: 0, + speed: '', + eta: '', + }); + expect(result).toBe(true); + }); + + it('returns false when there are no listeners', () => { + const result = bus.emitDownload('download:progress', { + contentItemId: 1, + percent: 0, + speed: '', + eta: '', + }); + expect(result).toBe(false); + }); + }); +}); diff --git a/src/__tests__/progress-parser.test.ts b/src/__tests__/progress-parser.test.ts new file mode 100644 index 0000000..9e683c5 --- /dev/null +++ b/src/__tests__/progress-parser.test.ts @@ -0,0 +1,229 @@ +import { describe, it, expect } from 'vitest'; +import { parseProgressLine, isProgressLine } from '../services/progress-parser'; + +describe('parseProgressLine', () => { + describe('standard progress lines', () => { + it('parses a normal progress line', () => { + const result = parseProgressLine( + '[download] 45.2% of ~150.00MiB at 1.23MiB/s ETA 00:42' + ); + expect(result).toEqual({ + percent: 45.2, + speed: '1.23MiB/s', + eta: '00:42', + totalSize: '~150.00MiB', + }); + }); + + it('parses 100% completion', () => { + const result = parseProgressLine( + '[download] 100% of 150.00MiB at 2.50MiB/s ETA 00:00' + ); + expect(result).toEqual({ + percent: 100, + speed: '2.50MiB/s', + eta: '00:00', + totalSize: '150.00MiB', + }); + }); + + it('parses a line with 0% progress', () => { + const result = parseProgressLine( + '[download] 0.0% of ~500.00MiB at 0.50MiB/s ETA 16:40' + ); + expect(result).toEqual({ + percent: 0, + speed: '0.50MiB/s', + eta: '16:40', + totalSize: '~500.00MiB', + }); + }); + + it('parses a line with GiB total size', () => { + const result = parseProgressLine( + '[download] 12.3% of ~2.50GiB at 10.00MiB/s ETA 03:45' + ); + expect(result).toEqual({ + percent: 12.3, + speed: '10.00MiB/s', + eta: '03:45', + totalSize: '~2.50GiB', + }); + }); + + it('parses a line with KiB speed', () => { + const result = parseProgressLine( + '[download] 5.0% of ~80.00MiB at 512.00KiB/s ETA 02:37' + ); + expect(result).toEqual({ + percent: 5.0, + speed: '512.00KiB/s', + eta: '02:37', + totalSize: '~80.00MiB', + }); + }); + }); + + describe('unknown values', () => { + it('handles Unknown speed', () => { + const result = parseProgressLine( + '[download] 10.0% of ~150.00MiB at Unknown speed ETA Unknown' + ); + expect(result).toEqual({ + percent: 10.0, + speed: '', + eta: '', + totalSize: '~150.00MiB', + }); + }); + + it('handles Unknown ETA with known speed', () => { + const result = parseProgressLine( + '[download] 25.0% of ~300.00MiB at 5.00MiB/s ETA Unknown' + ); + expect(result).toEqual({ + percent: 25.0, + speed: '5.00MiB/s', + eta: '', + totalSize: '~300.00MiB', + }); + }); + }); + + describe('non-progress lines (returns null)', () => { + it('returns null for empty string', () => { + expect(parseProgressLine('')).toBeNull(); + }); + + it('returns null for whitespace', () => { + expect(parseProgressLine(' ')).toBeNull(); + }); + + it('returns null for info lines', () => { + expect( + parseProgressLine('[info] Writing video metadata as JSON to: file.json') + ).toBeNull(); + }); + + it('returns null for merge lines', () => { + expect( + parseProgressLine('[Merger] Merging formats into "video.mp4"') + ).toBeNull(); + }); + + it('returns null for postprocessor lines', () => { + expect( + parseProgressLine('[EmbedSubtitle] Embedding subtitles in "video.mp4"') + ).toBeNull(); + }); + + it('returns null for destination lines', () => { + expect( + parseProgressLine('[download] Destination: /path/to/file.mp4') + ).toBeNull(); + }); + + it('returns null for "has already been downloaded" lines', () => { + expect( + parseProgressLine('[download] Video abc123 has already been downloaded') + ).toBeNull(); + }); + + it('returns null for plain text output', () => { + expect(parseProgressLine('/path/to/downloaded/file.mp4')).toBeNull(); + }); + + it('returns null for Deleting lines', () => { + expect( + parseProgressLine('Deleting original file /tmp/video.webm') + ).toBeNull(); + }); + }); + + describe('edge cases', () => { + it('handles lines with leading whitespace', () => { + const result = parseProgressLine( + ' [download] 50.0% of ~200.00MiB at 3.00MiB/s ETA 00:33 ' + ); + expect(result).toEqual({ + percent: 50.0, + speed: '3.00MiB/s', + eta: '00:33', + totalSize: '~200.00MiB', + }); + }); + + it('clamps percent to 100 maximum', () => { + // Shouldn't happen in practice, but defensive + const result = parseProgressLine( + '[download] 105.0% of ~100.00MiB at 1.00MiB/s ETA 00:00' + ); + expect(result).not.toBeNull(); + expect(result!.percent).toBe(100); + }); + + it('handles no tilde prefix on total size', () => { + const result = parseProgressLine( + '[download] 75.0% of 120.00MiB at 4.00MiB/s ETA 00:10' + ); + expect(result).toEqual({ + percent: 75.0, + speed: '4.00MiB/s', + eta: '00:10', + totalSize: '120.00MiB', + }); + }); + }); + + describe('multi-stream download lines', () => { + it('parses the first stream progress normally', () => { + // When downloading video+audio separately, yt-dlp shows progress for each + const result = parseProgressLine( + '[download] 30.0% of ~80.00MiB at 2.00MiB/s ETA 00:28' + ); + expect(result).not.toBeNull(); + expect(result!.percent).toBe(30.0); + }); + + it('parses the second stream progress (resets to 0%)', () => { + // The second stream starts at 0% again + const result = parseProgressLine( + '[download] 5.0% of ~20.00MiB at 1.50MiB/s ETA 00:13' + ); + expect(result).not.toBeNull(); + expect(result!.percent).toBe(5.0); + }); + }); +}); + +describe('isProgressLine', () => { + it('returns true for a progress line', () => { + expect( + isProgressLine('[download] 45.2% of ~150.00MiB at 1.23MiB/s ETA 00:42') + ).toBe(true); + }); + + it('returns true for 100% line', () => { + expect( + isProgressLine('[download] 100% of 150.00MiB at 2.50MiB/s ETA 00:00') + ).toBe(true); + }); + + it('returns false for destination line (no %)', () => { + expect( + isProgressLine('[download] Destination: /path/to/file.mp4') + ).toBe(false); + }); + + it('returns false for info line', () => { + expect(isProgressLine('[info] Available formats for abc123')).toBe(false); + }); + + it('returns false for empty line', () => { + expect(isProgressLine('')).toBe(false); + }); + + it('returns false for filepath output', () => { + expect(isProgressLine('/media/youtube/channel/video.mp4')).toBe(false); + }); +}); diff --git a/src/frontend/src/components/DownloadProgressBar.tsx b/src/frontend/src/components/DownloadProgressBar.tsx new file mode 100644 index 0000000..74d185f --- /dev/null +++ b/src/frontend/src/components/DownloadProgressBar.tsx @@ -0,0 +1,56 @@ +import type { ProgressInfo } from '../contexts/DownloadProgressContext'; + +interface DownloadProgressBarProps { + progress: ProgressInfo; +} + +/** + * Compact progress bar for downloading content items. + * Shows percentage fill with speed/ETA text below. + */ +export function DownloadProgressBar({ progress }: DownloadProgressBarProps) { + const { percent, speed, eta } = progress; + + return ( +
+ {/* Bar container */} +
+
+
+ + {/* Text line */} +
+ {percent.toFixed(1)}% + + {speed && eta ? `${speed} · ${eta}` : speed || eta || ''} + +
+
+ ); +} diff --git a/src/frontend/src/contexts/DownloadProgressContext.tsx b/src/frontend/src/contexts/DownloadProgressContext.tsx new file mode 100644 index 0000000..5683b66 --- /dev/null +++ b/src/frontend/src/contexts/DownloadProgressContext.tsx @@ -0,0 +1,155 @@ +import { createContext, useContext, useCallback, useRef, type ReactNode } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; +import { useSyncExternalStore } from 'react'; +import { useWebSocket } from '../hooks/useWebSocket'; + +// ── Types ── + +export interface ProgressInfo { + percent: number; + speed: string; + eta: string; +} + +interface DownloadProgressEvent { + type: 'download:progress'; + contentItemId: number; + percent: number; + speed: string; + eta: string; +} + +interface DownloadCompleteEvent { + type: 'download:complete'; + contentItemId: number; +} + +interface DownloadFailedEvent { + type: 'download:failed'; + contentItemId: number; + error: string; +} + +type DownloadEvent = DownloadProgressEvent | DownloadCompleteEvent | DownloadFailedEvent; + +// ── Store (external to React for zero unnecessary re-renders) ── + +class ProgressStore { + private _map = new Map(); + private _listeners = new Set<() => void>(); + + subscribe = (listener: () => void) => { + this._listeners.add(listener); + return () => this._listeners.delete(listener); + }; + + getSnapshot = () => this._map; + + set(id: number, info: ProgressInfo) { + this._map = new Map(this._map); + this._map.set(id, info); + this._notify(); + } + + delete(id: number) { + if (!this._map.has(id)) return; + this._map = new Map(this._map); + this._map.delete(id); + this._notify(); + } + + private _notify() { + for (const listener of this._listeners) listener(); + } +} + +// ── Context ── + +interface DownloadProgressContextValue { + /** Get progress for a specific content item. Returns undefined if not downloading. */ + getProgress: (contentItemId: number) => ProgressInfo | undefined; + /** Whether the WebSocket is connected */ + isConnected: boolean; +} + +const DownloadProgressContext = createContext(null); + +// ── Provider ── + +export function DownloadProgressProvider({ children }: { children: ReactNode }) { + const queryClient = useQueryClient(); + const storeRef = useRef(new ProgressStore()); + const store = storeRef.current; + + // Subscribe to the store with useSyncExternalStore for optimal re-renders + const progressMap = useSyncExternalStore(store.subscribe, store.getSnapshot); + + const handleMessage = useCallback( + (data: unknown) => { + const event = data as DownloadEvent; + if (!event?.type) return; + + switch (event.type) { + case 'download:progress': + store.set(event.contentItemId, { + percent: event.percent, + speed: event.speed, + eta: event.eta, + }); + break; + + case 'download:complete': + store.delete(event.contentItemId); + // Invalidate content queries so the UI refreshes with updated status + queryClient.invalidateQueries({ queryKey: ['content'] }); + queryClient.invalidateQueries({ queryKey: ['queue'] }); + break; + + case 'download:failed': + store.delete(event.contentItemId); + // Invalidate to show updated status (failed) + queryClient.invalidateQueries({ queryKey: ['content'] }); + queryClient.invalidateQueries({ queryKey: ['queue'] }); + break; + } + }, + [store, queryClient], + ); + + const { isConnected } = useWebSocket({ onMessage: handleMessage }); + + const getProgress = useCallback( + (contentItemId: number): ProgressInfo | undefined => { + return progressMap.get(contentItemId); + }, + [progressMap], + ); + + return ( + + {children} + + ); +} + +// ── Hook ── + +/** + * Get download progress for a specific content item. + * Returns undefined when the item is not actively downloading via WebSocket. + */ +export function useDownloadProgress(contentItemId: number): ProgressInfo | undefined { + const context = useContext(DownloadProgressContext); + if (!context) { + throw new Error('useDownloadProgress must be used within a DownloadProgressProvider'); + } + return context.getProgress(contentItemId); +} + +/** + * Get the WebSocket connection status. + */ +export function useDownloadProgressConnection(): boolean { + const context = useContext(DownloadProgressContext); + return context?.isConnected ?? false; +} diff --git a/src/frontend/src/hooks/useWebSocket.ts b/src/frontend/src/hooks/useWebSocket.ts new file mode 100644 index 0000000..b5c8ba1 --- /dev/null +++ b/src/frontend/src/hooks/useWebSocket.ts @@ -0,0 +1,91 @@ +import { useEffect, useRef, useState, useCallback } from 'react'; + +interface UseWebSocketOptions { + /** Called for each incoming message */ + onMessage?: (data: unknown) => void; + /** Maximum reconnect delay in ms (default: 30000) */ + maxReconnectDelay?: number; +} + +/** + * Auto-reconnecting WebSocket hook. + * + * Connects to the server's /ws endpoint. Reconnects with exponential backoff + * on disconnection (1s → 2s → 4s → ... → maxReconnectDelay). + * + * The WebSocket URL is derived from the current page location: + * - `wss://host/ws` for HTTPS + * - `ws://host/ws` for HTTP + */ +export function useWebSocket(options: UseWebSocketOptions = {}) { + const { onMessage, maxReconnectDelay = 30_000 } = options; + const [isConnected, setIsConnected] = useState(false); + + // Use refs to avoid stale closures in the reconnect loop + const onMessageRef = useRef(onMessage); + onMessageRef.current = onMessage; + + const wsRef = useRef(null); + const reconnectDelayRef = useRef(1000); + const reconnectTimerRef = useRef>(); + const unmountedRef = useRef(false); + + const connect = useCallback(() => { + if (unmountedRef.current) return; + + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = `${protocol}//${window.location.host}/ws`; + + const ws = new WebSocket(url); + wsRef.current = ws; + + ws.onopen = () => { + if (unmountedRef.current) { + ws.close(); + return; + } + setIsConnected(true); + reconnectDelayRef.current = 1000; // Reset backoff on success + }; + + ws.onmessage = (event) => { + try { + const data = JSON.parse(event.data as string); + onMessageRef.current?.(data); + } catch { + // Ignore non-JSON messages + } + }; + + ws.onclose = () => { + setIsConnected(false); + wsRef.current = null; + if (!unmountedRef.current) { + // Reconnect with exponential backoff + const delay = reconnectDelayRef.current; + reconnectTimerRef.current = setTimeout(connect, delay); + reconnectDelayRef.current = Math.min(delay * 2, maxReconnectDelay); + } + }; + + ws.onerror = () => { + // onclose will fire after onerror — reconnect handled there + }; + }, [maxReconnectDelay]); + + useEffect(() => { + unmountedRef.current = false; + connect(); + + return () => { + unmountedRef.current = true; + clearTimeout(reconnectTimerRef.current); + if (wsRef.current) { + wsRef.current.close(); + wsRef.current = null; + } + }; + }, [connect]); + + return { isConnected }; +} diff --git a/src/frontend/src/main.tsx b/src/frontend/src/main.tsx index 96d4939..b737def 100644 --- a/src/frontend/src/main.tsx +++ b/src/frontend/src/main.tsx @@ -2,6 +2,7 @@ import { StrictMode } from 'react'; import { createRoot } from 'react-dom/client'; import { BrowserRouter } from 'react-router-dom'; import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { DownloadProgressProvider } from './contexts/DownloadProgressContext'; import { App } from './App'; import './styles/global.css'; @@ -21,9 +22,11 @@ if (!root) throw new Error('Root element not found'); createRoot(root).render( - - - + + + + + , ); diff --git a/src/frontend/src/pages/ChannelDetail.tsx b/src/frontend/src/pages/ChannelDetail.tsx index 3255dc0..6c85291 100644 --- a/src/frontend/src/pages/ChannelDetail.tsx +++ b/src/frontend/src/pages/ChannelDetail.tsx @@ -27,11 +27,24 @@ import { Table, type Column } from '../components/Table'; import { PlatformBadge } from '../components/PlatformBadge'; import { StatusBadge } from '../components/StatusBadge'; import { QualityLabel } from '../components/QualityLabel'; +import { DownloadProgressBar } from '../components/DownloadProgressBar'; import { Modal } from '../components/Modal'; +import { useDownloadProgress } from '../contexts/DownloadProgressContext'; import type { ContentItem, MonitoringMode } from '@shared/types/index'; // ── Helpers ── +/** Status cell that shows progress bar for downloading items */ +function ContentStatusCell({ item }: { item: ContentItem }) { + const progress = useDownloadProgress(item.id); + + if (item.status === 'downloading' && progress) { + return ; + } + + return ; +} + function formatDuration(seconds: number | null): string { if (seconds == null) return '—'; const h = Math.floor(seconds / 3600); @@ -544,7 +557,7 @@ export function ChannelDetail() { label: 'Status', width: '120px', sortable: true, - render: (item) => , + render: (item) => , }, { key: 'quality', diff --git a/src/frontend/vite.config.ts b/src/frontend/vite.config.ts index bfba457..1afb973 100644 --- a/src/frontend/vite.config.ts +++ b/src/frontend/vite.config.ts @@ -14,11 +14,4 @@ export default defineConfig({ '@shared': resolve(__dirname, '../types'), }, }, - server: { - port: 3000, - proxy: { - '/api': 'http://localhost:8989', - '/ping': 'http://localhost:8989', - }, - }, }); diff --git a/src/index.ts b/src/index.ts index b5a031b..db4d5af 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,7 @@ import { FileOrganizer } from './services/file-organizer'; import { CookieManager } from './services/cookie-manager'; import { QualityAnalyzer } from './services/quality-analyzer'; import { DownloadService } from './services/download'; +import { DownloadEventBus } from './services/event-bus'; import { QueueService } from './services/queue'; import { NotificationService } from './services/notification'; import { HealthService } from './services/health'; @@ -22,6 +23,7 @@ import { PlatformRegistry } from './sources/platform-source'; import { YouTubeSource } from './sources/youtube'; import { SoundCloudSource } from './sources/soundcloud'; import { Platform } from './types/index'; +import type { ViteDevServer } from 'vite'; const APP_NAME = 'Tubearr'; @@ -43,7 +45,21 @@ async function main(): Promise { console.log(`[${APP_NAME}] App settings seeded`); // 3. Build and configure Fastify server - const server = await buildServer({ db }); + // In dev mode, embed Vite for HMR — single port, no separate frontend process + let vite: ViteDevServer | undefined; + if (appConfig.nodeEnv !== 'production') { + const { createServer: createViteServer } = await import('vite'); + const { resolve } = await import('node:path'); + vite = await createViteServer({ + configFile: resolve(process.cwd(), 'src/frontend/vite.config.ts'), + server: { middlewareMode: true }, + appType: 'custom', + }); + console.log(`[${APP_NAME}] Vite dev server embedded (HMR active)`); + } + + const eventBus = new DownloadEventBus(); + const server = await buildServer({ db, eventBus, vite }); // 4. Set up shared services const rateLimiter = new RateLimiter({ @@ -59,7 +75,8 @@ async function main(): Promise { rateLimiter, fileOrganizer, qualityAnalyzer, - cookieManager + cookieManager, + eventBus ); // Attach download service to server for route access @@ -135,6 +152,7 @@ async function main(): Promise { scheduler.stop(); } await server.close(); + if (vite) await vite.close(); console.log(`[${APP_NAME}] Server closed.`); } catch (err) { console.error(`[${APP_NAME}] Error closing server:`, err); diff --git a/src/server/index.ts b/src/server/index.ts index 5d3c295..6ca1c68 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -1,7 +1,8 @@ import Fastify, { type FastifyInstance } from 'fastify'; import cors from '@fastify/cors'; import fastifyStatic from '@fastify/static'; -import { existsSync } from 'node:fs'; +import middie from '@fastify/middie'; +import { existsSync, readFileSync } from 'node:fs'; import { join } from 'node:path'; import { type LibSQLDatabase } from 'drizzle-orm/libsql'; import type * as schema from '../db/schema/index'; @@ -21,10 +22,13 @@ import { platformSettingsRoutes } from './routes/platform-settings'; import { scanRoutes } from './routes/scan'; import { collectRoutes } from './routes/collect'; import { playlistRoutes } from './routes/playlist'; +import { websocketRoutes } from './routes/websocket'; import type { SchedulerService } from '../services/scheduler'; import type { DownloadService } from '../services/download'; import type { QueueService } from '../services/queue'; import type { HealthService } from '../services/health'; +import type { DownloadEventBus } from '../services/event-bus'; +import type { ViteDevServer } from 'vite'; // Extend Fastify's type system so routes can access the database and scheduler declare module 'fastify' { @@ -39,6 +43,9 @@ declare module 'fastify' { export interface BuildServerOptions { db: LibSQLDatabase; + eventBus?: DownloadEventBus; + /** Vite dev server instance for HMR in development — omit in production */ + vite?: ViteDevServer; } /** @@ -103,24 +110,39 @@ export async function buildServer(opts: BuildServerOptions): Promise { if ( request.method === 'GET' && !request.url.startsWith('/api/') && request.url !== '/ping' ) { - return reply.sendFile('index.html'); + try { + const indexPath = join(vite.config.root, 'index.html'); + let html = readFileSync(indexPath, 'utf-8'); + html = await vite.transformIndexHtml(request.url, html); + return reply.type('text/html').send(html); + } catch (err) { + // Let Vite fix the stack trace for better DX + if (err instanceof Error) vite.ssrFixStacktrace(err); + throw err; + } } return reply.status(404).send({ @@ -130,14 +152,42 @@ export async function buildServer(opts: BuildServerOptions): Promise { - return reply.status(404).send({ - statusCode: 404, - error: 'Not Found', - message: 'Route not found', + // Production: serve pre-built static frontend + const frontendDir = join(process.cwd(), 'dist', 'frontend'); + if (existsSync(frontendDir)) { + await server.register(fastifyStatic, { + root: frontendDir, + prefix: '/', + wildcard: false, }); - }); + + // SPA catch-all: serve index.html for any GET request that isn't an API route, + // /ping, or a static file. API and non-GET requests get a standard 404 JSON. + server.setNotFoundHandler(async (request, reply) => { + if ( + request.method === 'GET' && + !request.url.startsWith('/api/') && + request.url !== '/ping' + ) { + return reply.sendFile('index.html'); + } + + return reply.status(404).send({ + statusCode: 404, + error: 'Not Found', + message: `Route ${request.method}:${request.url} not found`, + }); + }); + } else { + // No frontend build — standard 404 for all unknown routes + server.setNotFoundHandler(async (_request, reply) => { + return reply.status(404).send({ + statusCode: 404, + error: 'Not Found', + message: 'Route not found', + }); + }); + } } return server; diff --git a/src/server/routes/websocket.ts b/src/server/routes/websocket.ts new file mode 100644 index 0000000..d3fc0a9 --- /dev/null +++ b/src/server/routes/websocket.ts @@ -0,0 +1,75 @@ +import websocket from '@fastify/websocket'; +import type { FastifyInstance } from 'fastify'; +import type { WebSocket } from 'ws'; +import type { DownloadEventBus, DownloadProgressPayload, DownloadCompletePayload, DownloadFailedPayload } from '../../services/event-bus'; + +/** + * WebSocket route plugin. + * + * Registers @fastify/websocket and a GET /ws route that broadcasts + * download events (progress, complete, failed) to all connected clients. + * + * The event bus is passed via plugin options. Each connected client + * gets its own set of event bus listeners, which are cleaned up on disconnect. + * + * Auth: The /ws route bypasses API key auth (not under /api/*). + * This is intentional — WebSocket only broadcasts read-only progress data. + */ +export async function websocketRoutes( + fastify: FastifyInstance, + opts: { eventBus: DownloadEventBus } +): Promise { + const { eventBus } = opts; + + await fastify.register(websocket); + + fastify.get('/ws', { websocket: true }, (socket: WebSocket) => { + console.log('[websocket] client connected'); + + // Create listeners for each event type + const onProgress = (data: DownloadProgressPayload) => { + sendJson(socket, { type: 'download:progress', ...data }); + }; + + const onComplete = (data: DownloadCompletePayload) => { + sendJson(socket, { type: 'download:complete', ...data }); + }; + + const onFailed = (data: DownloadFailedPayload) => { + sendJson(socket, { type: 'download:failed', ...data }); + }; + + // Subscribe to event bus + eventBus.onDownload('download:progress', onProgress); + eventBus.onDownload('download:complete', onComplete); + eventBus.onDownload('download:failed', onFailed); + + // Cleanup on disconnect + const cleanup = () => { + eventBus.offDownload('download:progress', onProgress); + eventBus.offDownload('download:complete', onComplete); + eventBus.offDownload('download:failed', onFailed); + console.log('[websocket] client disconnected'); + }; + + socket.on('close', cleanup); + socket.on('error', (err) => { + console.log(`[websocket] client error: ${err.message}`); + cleanup(); + }); + }); +} + +/** + * Send a JSON message to a WebSocket client. + * Silently catches send errors (client may have disconnected). + */ +function sendJson(socket: WebSocket, data: Record): void { + try { + if (socket.readyState === socket.OPEN) { + socket.send(JSON.stringify(data)); + } + } catch { + // Client disconnected — swallow + } +} diff --git a/src/services/download.ts b/src/services/download.ts index 4a737c6..e0642f4 100644 --- a/src/services/download.ts +++ b/src/services/download.ts @@ -1,9 +1,12 @@ import { stat } from 'node:fs/promises'; import { extname } from 'node:path'; +import { createInterface } from 'node:readline'; import type { LibSQLDatabase } from 'drizzle-orm/libsql'; import type * as schema from '../db/schema/index'; -import { execYtDlp, YtDlpError } from '../sources/yt-dlp'; +import { execYtDlp, spawnYtDlp, YtDlpError } from '../sources/yt-dlp'; import { updateContentItem } from '../db/repositories/content-repository'; +import { parseProgressLine } from './progress-parser'; +import type { DownloadEventBus } from './event-bus'; import type { RateLimiter } from './rate-limiter'; import type { FileOrganizer } from './file-organizer'; import type { QualityAnalyzer } from './quality-analyzer'; @@ -28,13 +31,18 @@ type Db = LibSQLDatabase; * run quality analysis → update content item. */ export class DownloadService { + private readonly eventBus?: DownloadEventBus; + constructor( private readonly db: Db, private readonly rateLimiter: RateLimiter, private readonly fileOrganizer: FileOrganizer, private readonly qualityAnalyzer: QualityAnalyzer, - private readonly cookieManager: CookieManager - ) {} + private readonly cookieManager: CookieManager, + eventBus?: DownloadEventBus + ) { + this.eventBus = eventBus; + } /** * Download a content item and update its record in the database. @@ -77,14 +85,20 @@ export class DownloadService { ); const startTime = Date.now(); - // Execute download — 30 minute timeout - const result = await execYtDlp(args, { timeout: 1_800_000 }); + // Execute download — streaming spawn when event bus is available, buffered exec otherwise + let stdout: string; + if (this.eventBus) { + stdout = await this.spawnDownload(args, contentItem.id, 1_800_000); + } else { + const result = await execYtDlp(args, { timeout: 1_800_000 }); + stdout = result.stdout; + } const duration = Date.now() - startTime; console.log(`${logPrefix} yt-dlp completed in ${duration}ms`); // Parse final file path from --print after_move:filepath output - const finalPath = this.parseFinalPath(result.stdout, outputTemplate); + const finalPath = this.parseFinalPath(stdout, outputTemplate); // Ensure directories exist and resolve duplicate filenames await this.fileOrganizer.ensureDirectory(finalPath); @@ -111,6 +125,9 @@ export class DownloadService { this.rateLimiter.reportSuccess(channel.platform as Platform); + // Emit download:complete event + this.eventBus?.emitDownload('download:complete', { contentItemId: contentItem.id }); + console.log( `${logPrefix} status=downloaded path="${finalPath}" size=${fileSize} format=${format}` ); @@ -126,12 +143,107 @@ export class DownloadService { const errorMsg = err instanceof Error ? err.message : String(err); console.log(`${logPrefix} status=failed error="${errorMsg.slice(0, 200)}"`); + // Emit download:failed event + this.eventBus?.emitDownload('download:failed', { + contentItemId: contentItem.id, + error: errorMsg.slice(0, 200), + }); + throw err; } } // ── Internal ── + /** + * Spawn yt-dlp and stream progress events via the event bus. + * Returns collected stdout (non-progress lines) for final path parsing. + */ + private spawnDownload( + args: string[], + contentItemId: number, + timeoutMs: number + ): Promise { + return new Promise((resolve, reject) => { + // Add --newline to ensure progress updates are separate lines + const spawnArgs = ['--newline', '--progress', ...args]; + const child = spawnYtDlp(spawnArgs); + + const stdoutLines: string[] = []; + const stderrChunks: string[] = []; + let killed = false; + + // Timeout — kill child after timeoutMs + const timer = setTimeout(() => { + killed = true; + child.kill('SIGTERM'); + }, timeoutMs); + + // Parse stdout line-by-line + if (child.stdout) { + const rl = createInterface({ input: child.stdout, crlfDelay: Infinity }); + rl.on('line', (line: string) => { + const progress = parseProgressLine(line); + if (progress) { + this.eventBus!.emitDownload('download:progress', { + contentItemId, + percent: progress.percent, + speed: progress.speed, + eta: progress.eta, + }); + } else { + // Non-progress lines — collect for final path parsing + stdoutLines.push(line); + } + }); + } + + // Collect stderr + if (child.stderr) { + child.stderr.on('data', (chunk: Buffer) => { + stderrChunks.push(chunk.toString()); + }); + } + + // Handle process exit + child.on('close', (code: number | null) => { + clearTimeout(timer); + const stdout = stdoutLines.join('\n'); + const stderr = stderrChunks.join(''); + + if (killed) { + reject(new YtDlpError( + `yt-dlp timed out after ${timeoutMs}ms`, + stderr, + -1 + )); + return; + } + + if (code !== 0 && code !== null) { + reject(new YtDlpError( + `yt-dlp exited with code ${code}: ${stderr.slice(0, 200)}`, + stderr, + code + )); + return; + } + + resolve(stdout); + }); + + // Handle spawn errors (e.g., yt-dlp not found) + child.on('error', (err: Error) => { + clearTimeout(timer); + reject(new YtDlpError( + `Failed to spawn yt-dlp: ${err.message}`, + '', + -1 + )); + }); + }); + } + /** * Build the yt-dlp command-line args based on content type and format profile. */ diff --git a/src/services/event-bus.ts b/src/services/event-bus.ts new file mode 100644 index 0000000..e529738 --- /dev/null +++ b/src/services/event-bus.ts @@ -0,0 +1,66 @@ +import { EventEmitter } from 'node:events'; + +// ── Event Payload Types ── + +export interface DownloadProgressPayload { + contentItemId: number; + percent: number; + speed: string; + eta: string; +} + +export interface DownloadCompletePayload { + contentItemId: number; +} + +export interface DownloadFailedPayload { + contentItemId: number; + error: string; +} + +// ── Event Map ── + +export interface DownloadEventMap { + 'download:progress': [DownloadProgressPayload]; + 'download:complete': [DownloadCompletePayload]; + 'download:failed': [DownloadFailedPayload]; +} + +// ── Typed Event Bus ── + +/** + * Typed EventEmitter for download events. + * Decouples download progress producers (DownloadService) from + * consumers (WebSocket route, logging, etc). + */ +export class DownloadEventBus extends EventEmitter { + /** + * Emit a typed download event. + */ + emitDownload( + event: K, + ...args: DownloadEventMap[K] + ): boolean { + return this.emit(event, ...args); + } + + /** + * Subscribe to a typed download event. + */ + onDownload( + event: K, + listener: (...args: DownloadEventMap[K]) => void + ): this { + return this.on(event, listener as (...args: unknown[]) => void); + } + + /** + * Unsubscribe from a typed download event. + */ + offDownload( + event: K, + listener: (...args: DownloadEventMap[K]) => void + ): this { + return this.off(event, listener as (...args: unknown[]) => void); + } +} diff --git a/src/services/progress-parser.ts b/src/services/progress-parser.ts new file mode 100644 index 0000000..56a427b --- /dev/null +++ b/src/services/progress-parser.ts @@ -0,0 +1,71 @@ +// ── Types ── + +export interface ProgressInfo { + /** Download percentage (0–100). */ + percent: number; + /** Human-readable speed string, e.g. "1.23MiB/s". Empty if unknown. */ + speed: string; + /** Human-readable ETA string, e.g. "00:42". Empty if unknown. */ + eta: string; + /** Human-readable total size string, e.g. "~150.00MiB". Empty if unknown. */ + totalSize: string; +} + +// ── Parser ── + +/** + * Regex for the standard yt-dlp progress line format: + * [download] 45.2% of ~150.00MiB at 1.23MiB/s ETA 00:42 + * [download] 100% of 150.00MiB at 1.23MiB/s ETA 00:00 + * [download] 12.3% of ~150.00MiB at Unknown speed ETA Unknown + * + * The `--newline` flag ensures each progress update is a separate line. + */ +const PROGRESS_REGEX = + /\[download\]\s+([\d.]+)%\s+of\s+(~?[\d.]+\S+)\s+at\s+(.+?)\s+ETA\s+(.+)/; + +/** + * Parse a single line of yt-dlp stdout into structured progress info. + * + * Returns `null` for non-progress lines (info messages, merge output, + * postprocessor lines, etc). Only parses `[download] XX.X% of ...` lines. + * + * @param line - A single line from yt-dlp stdout (may include trailing whitespace) + */ +export function parseProgressLine(line: string): ProgressInfo | null { + const trimmed = line.trim(); + if (!trimmed) return null; + + const match = trimmed.match(PROGRESS_REGEX); + if (!match) return null; + + const [, percentStr, totalSize, speedRaw, etaRaw] = match; + + const percent = parseFloat(percentStr); + if (isNaN(percent)) return null; + + // Normalize speed: "Unknown speed" → "", trim whitespace + const speed = speedRaw.trim().toLowerCase().startsWith('unknown') + ? '' + : speedRaw.trim(); + + // Normalize ETA: "Unknown" → "", trim whitespace + const eta = etaRaw.trim().toLowerCase().startsWith('unknown') + ? '' + : etaRaw.trim(); + + return { + percent: Math.min(100, Math.max(0, percent)), + speed, + eta, + totalSize: totalSize.trim(), + }; +} + +/** + * Check if a line is a yt-dlp progress line (starts with [download] and contains %). + * Faster than running the full regex for filtering purposes. + */ +export function isProgressLine(line: string): boolean { + return line.includes('[download]') && line.includes('%'); +} diff --git a/src/sources/yt-dlp.ts b/src/sources/yt-dlp.ts index a94e548..23aeb2a 100644 --- a/src/sources/yt-dlp.ts +++ b/src/sources/yt-dlp.ts @@ -1,4 +1,4 @@ -import { execFile as execFileCb } from 'node:child_process'; +import { execFile as execFileCb, spawn, type ChildProcess } from 'node:child_process'; import { promisify } from 'node:util'; const execFileAsync = promisify(execFileCb); @@ -159,6 +159,25 @@ export function parseSingleJson(stdout: string): unknown { } } +// ── Spawn (streaming) ── + +/** + * Spawn yt-dlp as a child process for streaming stdout. + * Unlike `execYtDlp`, this returns the `ChildProcess` immediately so the caller + * can read stdout/stderr line-by-line in real time (for progress parsing). + * + * The caller is responsible for: + * - Reading stdout/stderr + * - Handling the 'close' event + * - Implementing timeout (via setTimeout + process.kill) + */ +export function spawnYtDlp(args: string[]): ChildProcess { + return spawn('yt-dlp', args, { + windowsHide: true, + stdio: ['ignore', 'pipe', 'pipe'], + }); +} + // ── Health Check ── /**