WIP: in-progress WebSocket download progress & event bus

Snapshot of active development by separate Claude instance.
Includes: event bus, progress parser, WebSocket route,
download progress bar component, SSE contexts/hooks.
Not tested or validated — commit for migration to dev01.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
John Lightner 2026-03-25 11:34:26 -05:00
parent dbe163bdbb
commit 0541a5f1d1
19 changed files with 1658 additions and 40 deletions

View file

@ -14,7 +14,12 @@
"Bash(MSYS_NO_PATHCONV=1 docker inspect --format='{{json .State.Health}}' tubearr 2>&1)",
"Bash(curl:*)",
"Bash(python -c \"import sys,json; d=json.load\\(sys.stdin\\); print\\(f''''Channels: {len\\(d\\)}''''\\); [print\\(f'''' - {c[name]} \\({c[platform]}\\) monitoring={c.get\\(monitoringEnabled,?\\)} mode={c.get\\(monitoringMode,?\\)}''''\\) for c in d]\")",
"Bash(python -c \":*)"
"Bash(python -c \":*)",
"Bash(gh auth:*)",
"Bash(gh repo:*)",
"Bash(git remote:*)",
"Bash(git add:*)",
"Bash(git commit:*)"
]
}
}

147
package-lock.json generated
View file

@ -9,8 +9,10 @@
"version": "0.1.0",
"dependencies": {
"@fastify/cors": "^11.0.0",
"@fastify/middie": "^9.3.1",
"@fastify/rate-limit": "^10.2.1",
"@fastify/static": "^9.0.0",
"@fastify/websocket": "^11.2.0",
"@libsql/client": "^0.14.0",
"@tanstack/react-query": "^5.95.0",
"croner": "^10.0.1",
@ -1369,6 +1371,29 @@
"dequal": "^2.0.3"
}
},
"node_modules/@fastify/middie": {
"version": "9.3.1",
"resolved": "https://registry.npmjs.org/@fastify/middie/-/middie-9.3.1.tgz",
"integrity": "sha512-5uvKKF5zkocgsSiTyBU7AW2LmQ1Fwn4MNJ/8bORAuFwsQ0hqHjtpYaPqO79BkP4aqH5T7P3F2gJ3b3kerAIk7A==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/fastify"
},
{
"type": "opencollective",
"url": "https://opencollective.com/fastify"
}
],
"license": "MIT",
"dependencies": {
"@fastify/error": "^4.0.0",
"fastify-plugin": "^5.0.0",
"find-my-way": "^9.5.0",
"path-to-regexp": "^8.1.0",
"reusify": "^1.0.4"
}
},
"node_modules/@fastify/proxy-addr": {
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/@fastify/proxy-addr/-/proxy-addr-5.1.0.tgz",
@ -1457,6 +1482,27 @@
"glob": "^13.0.0"
}
},
"node_modules/@fastify/websocket": {
"version": "11.2.0",
"resolved": "https://registry.npmjs.org/@fastify/websocket/-/websocket-11.2.0.tgz",
"integrity": "sha512-3HrDPbAG1CzUCqnslgJxppvzaAZffieOVbLp1DAy1huCSynUWPifSvfdEDUR8HlJLp3sp1A36uOM2tJogADS8w==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/fastify"
},
{
"type": "opencollective",
"url": "https://opencollective.com/fastify"
}
],
"license": "MIT",
"dependencies": {
"duplexify": "^4.1.3",
"fastify-plugin": "^5.0.0",
"ws": "^8.16.0"
}
},
"node_modules/@jridgewell/gen-mapping": {
"version": "0.3.13",
"resolved": "https://registry.npmjs.org/@jridgewell/gen-mapping/-/gen-mapping-0.3.13.tgz",
@ -2794,6 +2840,18 @@
}
}
},
"node_modules/duplexify": {
"version": "4.1.3",
"resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.3.tgz",
"integrity": "sha512-M3BmBhwJRZsSx38lZyhE53Csddgzl5R7xGJNk7CVddZD6CcmwMCH8J+7AprIrQKH7TonKxaCjcv27Qmf+sQ+oA==",
"license": "MIT",
"dependencies": {
"end-of-stream": "^1.4.1",
"inherits": "^2.0.3",
"readable-stream": "^3.1.1",
"stream-shift": "^1.0.2"
}
},
"node_modules/electron-to-chromium": {
"version": "1.5.321",
"resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.321.tgz",
@ -2801,6 +2859,15 @@
"dev": true,
"license": "ISC"
},
"node_modules/end-of-stream": {
"version": "1.4.5",
"resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.5.tgz",
"integrity": "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==",
"license": "MIT",
"dependencies": {
"once": "^1.4.0"
}
},
"node_modules/env-paths": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/env-paths/-/env-paths-3.0.0.tgz",
@ -3497,6 +3564,15 @@
"node": ">=14.0.0"
}
},
"node_modules/once": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz",
"integrity": "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==",
"license": "ISC",
"dependencies": {
"wrappy": "1"
}
},
"node_modules/path-scurry": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/path-scurry/-/path-scurry-2.0.2.tgz",
@ -3513,6 +3589,16 @@
"url": "https://github.com/sponsors/isaacs"
}
},
"node_modules/path-to-regexp": {
"version": "8.3.0",
"resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-8.3.0.tgz",
"integrity": "sha512-7jdwVIRtsP8MYpdXSwOS0YdD0Du+qOoF/AEPIt88PcCFrZCzx41oxku1jD88hZBwbNUIEfpqvuhjFaMAqMTWnA==",
"license": "MIT",
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/express"
}
},
"node_modules/pathe": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/pathe/-/pathe-2.0.3.tgz",
@ -3713,6 +3799,20 @@
"react-dom": ">=18"
}
},
"node_modules/readable-stream": {
"version": "3.6.2",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz",
"integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==",
"license": "MIT",
"dependencies": {
"inherits": "^2.0.3",
"string_decoder": "^1.1.1",
"util-deprecate": "^1.0.1"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/real-require": {
"version": "0.2.0",
"resolved": "https://registry.npmjs.org/real-require/-/real-require-0.2.0.tgz",
@ -3811,6 +3911,26 @@
"fsevents": "~2.3.2"
}
},
"node_modules/safe-buffer": {
"version": "5.2.1",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz",
"integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT"
},
"node_modules/safe-regex2": {
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/safe-regex2/-/safe-regex2-5.1.0.tgz",
@ -3980,6 +4100,21 @@
"dev": true,
"license": "MIT"
},
"node_modules/stream-shift": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz",
"integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==",
"license": "MIT"
},
"node_modules/string_decoder": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",
"integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==",
"license": "MIT",
"dependencies": {
"safe-buffer": "~5.2.0"
}
},
"node_modules/strip-literal": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/strip-literal/-/strip-literal-3.1.0.tgz",
@ -4588,6 +4723,12 @@
"browserslist": ">= 4.21.0"
}
},
"node_modules/util-deprecate": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz",
"integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==",
"license": "MIT"
},
"node_modules/vite": {
"version": "7.3.1",
"resolved": "https://registry.npmjs.org/vite/-/vite-7.3.1.tgz",
@ -5234,6 +5375,12 @@
"node": ">=8"
}
},
"node_modules/wrappy": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
"integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==",
"license": "ISC"
},
"node_modules/ws": {
"version": "8.20.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz",

View file

@ -19,8 +19,10 @@
},
"dependencies": {
"@fastify/cors": "^11.0.0",
"@fastify/middie": "^9.3.1",
"@fastify/rate-limit": "^10.2.1",
"@fastify/static": "^9.0.0",
"@fastify/websocket": "^11.2.0",
"@libsql/client": "^0.14.0",
"@tanstack/react-query": "^5.95.0",
"croner": "^10.0.1",

View file

@ -0,0 +1,370 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { EventEmitter } from 'node:events';
import { mkdtempSync, rmSync, existsSync, writeFileSync, mkdirSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { initDatabaseAsync, closeDatabase } from '../db/index';
import { runMigrations } from '../db/migrate';
import { createChannel } from '../db/repositories/channel-repository';
import { createContentItem } from '../db/repositories/content-repository';
import { DownloadService } from '../services/download';
import { DownloadEventBus, type DownloadProgressPayload } from '../services/event-bus';
import { QualityAnalyzer } from '../services/quality-analyzer';
import { FileOrganizer } from '../services/file-organizer';
import { CookieManager } from '../services/cookie-manager';
import { RateLimiter } from '../services/rate-limiter';
import type { ContentItem, Channel } from '../types/index';
// ── Mocks ──
// Mock spawnYtDlp from yt-dlp module to return a fake child process
const spawnYtDlpMock = vi.fn();
// Also mock execYtDlp for backward compat
const execYtDlpMock = vi.fn();
vi.mock('../sources/yt-dlp', async (importOriginal) => {
const actual = (await importOriginal()) as Record<string, unknown>;
return {
...actual,
execYtDlp: (...args: unknown[]) => execYtDlpMock(...args),
spawnYtDlp: (...args: unknown[]) => spawnYtDlpMock(...args),
};
});
// Mock fs.stat for file size
const statMock = vi.fn();
vi.mock('node:fs/promises', async (importOriginal) => {
const actual = (await importOriginal()) as Record<string, unknown>;
return {
...actual,
stat: (...args: unknown[]) => statMock(...args),
};
});
// ── Helpers ──
let tmpDir: string;
let db: Awaited<ReturnType<typeof initDatabaseAsync>>;
let testChannel: Channel;
let testContentItem: ContentItem;
/**
* Create a fake ChildProcess (EventEmitter) that mimics spawn behavior.
* stdout/stderr are passthrough EventEmitters with readable stream interface.
*/
function createFakeChild() {
const child = new EventEmitter() as EventEmitter & {
stdout: EventEmitter;
stderr: EventEmitter;
kill: ReturnType<typeof vi.fn>;
};
child.stdout = new EventEmitter();
child.stderr = new EventEmitter();
child.kill = vi.fn();
// readline.createInterface expects a readable stream with Symbol.asyncIterator
// and pipe/unpipe methods. Simulate the minimum needed.
const stdoutStream = child.stdout as EventEmitter & {
[Symbol.asyncIterator]?: () => AsyncIterableIterator<string>;
pipe?: (...args: unknown[]) => unknown;
unpipe?: (...args: unknown[]) => unknown;
resume?: () => void;
pause?: () => void;
setEncoding?: (enc: string) => void;
readable?: boolean;
};
stdoutStream.pipe = vi.fn().mockReturnThis();
stdoutStream.unpipe = vi.fn();
stdoutStream.resume = vi.fn();
stdoutStream.pause = vi.fn();
stdoutStream.setEncoding = vi.fn();
stdoutStream.readable = true;
return child;
}
async function setupDb(): Promise<void> {
tmpDir = mkdtempSync(join(tmpdir(), 'tubearr-spawn-test-'));
const dbPath = join(tmpDir, 'test.db');
db = await initDatabaseAsync(dbPath);
await runMigrations(dbPath);
testChannel = await createChannel(db, {
name: 'Test Channel',
platform: 'youtube',
platformId: 'UC_spawn_test',
url: 'https://www.youtube.com/channel/UC_spawn_test',
imageUrl: null,
formatProfileId: null,
monitoringEnabled: true,
checkInterval: 360,
metadata: null,
});
testContentItem = (await createContentItem(db, {
channelId: testChannel.id,
title: 'Test Video',
platformContentId: 'vid_spawn_test',
url: 'https://www.youtube.com/watch?v=spawn_test',
contentType: 'video',
duration: 600,
status: 'monitored',
}))!;
}
function cleanup(): void {
closeDatabase();
try {
if (tmpDir && existsSync(tmpDir)) {
rmSync(tmpDir, { recursive: true, force: true });
}
} catch {
// Windows cleanup best-effort
}
}
function createMockDeps() {
const mediaPath = join(tmpDir, 'media');
const cookiePath = join(tmpDir, 'cookies');
mkdirSync(mediaPath, { recursive: true });
mkdirSync(cookiePath, { recursive: true });
const rateLimiter = new RateLimiter({
youtube: { minIntervalMs: 0 },
soundcloud: { minIntervalMs: 0 },
});
const fileOrganizer = new FileOrganizer(mediaPath);
const qualityAnalyzer = new QualityAnalyzer();
const cookieManager = new CookieManager(cookiePath);
vi.spyOn(rateLimiter, 'acquire');
vi.spyOn(rateLimiter, 'reportSuccess');
vi.spyOn(rateLimiter, 'reportError');
vi.spyOn(qualityAnalyzer, 'analyze').mockResolvedValue({
actualResolution: '1920x1080',
actualCodec: 'h264',
actualBitrate: '5.0 Mbps',
containerFormat: 'mp4',
qualityWarnings: [],
});
return { rateLimiter, fileOrganizer, qualityAnalyzer, cookieManager };
}
// ── Tests ──
describe('DownloadService with EventBus (spawn path)', () => {
beforeEach(async () => {
vi.clearAllMocks();
await setupDb();
});
afterEach(cleanup);
it('emits download:progress events during download', async () => {
const deps = createMockDeps();
const eventBus = new DownloadEventBus();
const service = new DownloadService(
db,
deps.rateLimiter,
deps.fileOrganizer,
deps.qualityAnalyzer,
deps.cookieManager,
eventBus
);
const outputPath = join(tmpDir, 'media', 'youtube', 'Test Channel', 'Test Video.mp4');
mkdirSync(join(tmpDir, 'media', 'youtube', 'Test Channel'), { recursive: true });
writeFileSync(outputPath, 'fake video data');
const fakeChild = createFakeChild();
spawnYtDlpMock.mockReturnValueOnce(fakeChild);
statMock.mockResolvedValueOnce({ size: 15_000_000 });
// Track progress events
const progressEvents: DownloadProgressPayload[] = [];
eventBus.onDownload('download:progress', (data) => {
progressEvents.push(data);
});
// Start download (it's async — will resolve after child closes)
const downloadPromise = service.downloadItem(testContentItem, testChannel);
// Simulate yt-dlp progress output
await new Promise((r) => setTimeout(r, 10));
fakeChild.stdout.emit('data', Buffer.from('[download] 25.0% of ~100.00MiB at 2.00MiB/s ETA 00:37\n'));
fakeChild.stdout.emit('data', Buffer.from('[download] 50.0% of ~100.00MiB at 2.50MiB/s ETA 00:20\n'));
fakeChild.stdout.emit('data', Buffer.from('[download] 100% of 100.00MiB at 3.00MiB/s ETA 00:00\n'));
// Emit final path (non-progress line)
fakeChild.stdout.emit('data', Buffer.from(outputPath + '\n'));
// Signal process exit
fakeChild.stdout.emit('end');
fakeChild.emit('close', 0);
const result = await downloadPromise;
expect(result.status).toBe('downloaded');
expect(progressEvents.length).toBe(3);
expect(progressEvents[0]).toEqual({
contentItemId: testContentItem.id,
percent: 25.0,
speed: '2.00MiB/s',
eta: '00:37',
});
expect(progressEvents[1].percent).toBe(50.0);
expect(progressEvents[2].percent).toBe(100);
});
it('emits download:complete on successful download', async () => {
const deps = createMockDeps();
const eventBus = new DownloadEventBus();
const service = new DownloadService(
db,
deps.rateLimiter,
deps.fileOrganizer,
deps.qualityAnalyzer,
deps.cookieManager,
eventBus
);
const outputPath = join(tmpDir, 'media', 'youtube', 'Test Channel', 'Test Video.mp4');
mkdirSync(join(tmpDir, 'media', 'youtube', 'Test Channel'), { recursive: true });
writeFileSync(outputPath, 'fake video');
const fakeChild = createFakeChild();
spawnYtDlpMock.mockReturnValueOnce(fakeChild);
statMock.mockResolvedValueOnce({ size: 5_000_000 });
let completeEventReceived = false;
eventBus.onDownload('download:complete', (data) => {
expect(data.contentItemId).toBe(testContentItem.id);
completeEventReceived = true;
});
const downloadPromise = service.downloadItem(testContentItem, testChannel);
await new Promise((r) => setTimeout(r, 10));
fakeChild.stdout.emit('data', Buffer.from(outputPath + '\n'));
fakeChild.stdout.emit('end');
fakeChild.emit('close', 0);
await downloadPromise;
expect(completeEventReceived).toBe(true);
});
it('emits download:failed on yt-dlp error exit', async () => {
const deps = createMockDeps();
const eventBus = new DownloadEventBus();
const service = new DownloadService(
db,
deps.rateLimiter,
deps.fileOrganizer,
deps.qualityAnalyzer,
deps.cookieManager,
eventBus
);
const fakeChild = createFakeChild();
spawnYtDlpMock.mockReturnValueOnce(fakeChild);
let failedEvent: { contentItemId: number; error: string } | null = null;
eventBus.onDownload('download:failed', (data) => {
failedEvent = data;
});
const downloadPromise = service.downloadItem(testContentItem, testChannel);
await new Promise((r) => setTimeout(r, 10));
fakeChild.stderr.emit('data', Buffer.from('ERROR: Video not found'));
fakeChild.stdout.emit('end');
fakeChild.emit('close', 1);
await expect(downloadPromise).rejects.toThrow();
expect(failedEvent).not.toBeNull();
expect(failedEvent!.contentItemId).toBe(testContentItem.id);
expect(failedEvent!.error).toContain('Video not found');
});
it('uses spawn (not exec) when event bus is provided', async () => {
const deps = createMockDeps();
const eventBus = new DownloadEventBus();
const service = new DownloadService(
db,
deps.rateLimiter,
deps.fileOrganizer,
deps.qualityAnalyzer,
deps.cookieManager,
eventBus
);
const outputPath = join(tmpDir, 'media', 'youtube', 'Test Channel', 'Test Video.mp4');
mkdirSync(join(tmpDir, 'media', 'youtube', 'Test Channel'), { recursive: true });
writeFileSync(outputPath, 'data');
const fakeChild = createFakeChild();
spawnYtDlpMock.mockReturnValueOnce(fakeChild);
statMock.mockResolvedValueOnce({ size: 1000 });
const downloadPromise = service.downloadItem(testContentItem, testChannel);
await new Promise((r) => setTimeout(r, 10));
fakeChild.stdout.emit('data', Buffer.from(outputPath + '\n'));
fakeChild.stdout.emit('end');
fakeChild.emit('close', 0);
await downloadPromise;
// spawnYtDlp was called, execYtDlp was not
expect(spawnYtDlpMock).toHaveBeenCalledOnce();
expect(execYtDlpMock).not.toHaveBeenCalled();
// Verify --newline and --progress were added to spawn args
const args = spawnYtDlpMock.mock.calls[0][0] as string[];
expect(args).toContain('--newline');
expect(args).toContain('--progress');
});
it('preserves final path parsing from non-progress stdout lines', async () => {
const deps = createMockDeps();
const eventBus = new DownloadEventBus();
const service = new DownloadService(
db,
deps.rateLimiter,
deps.fileOrganizer,
deps.qualityAnalyzer,
deps.cookieManager,
eventBus
);
const outputPath = join(tmpDir, 'media', 'youtube', 'Test Channel', 'Test Video.mkv');
mkdirSync(join(tmpDir, 'media', 'youtube', 'Test Channel'), { recursive: true });
writeFileSync(outputPath, 'data');
const fakeChild = createFakeChild();
spawnYtDlpMock.mockReturnValueOnce(fakeChild);
statMock.mockResolvedValueOnce({ size: 2000 });
const downloadPromise = service.downloadItem(testContentItem, testChannel);
await new Promise((r) => setTimeout(r, 10));
// Emit mixed output: progress lines + info lines + final path
fakeChild.stdout.emit(
'data',
Buffer.from(
'[download] 50.0% of ~200.00MiB at 5.00MiB/s ETA 00:20\n' +
'[download] 100% of 200.00MiB at 5.00MiB/s ETA 00:00\n' +
'[Merger] Merging formats into "Test Video.mkv"\n' +
outputPath +
'\n'
)
);
fakeChild.stdout.emit('end');
fakeChild.emit('close', 0);
const result = await downloadPromise;
expect(result.filePath).toBe(outputPath);
expect(result.format).toBe('mkv');
});
});

View file

@ -0,0 +1,143 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import {
DownloadEventBus,
type DownloadProgressPayload,
type DownloadCompletePayload,
type DownloadFailedPayload,
} from '../services/event-bus';
describe('DownloadEventBus', () => {
let bus: DownloadEventBus;
beforeEach(() => {
bus = new DownloadEventBus();
});
describe('emitDownload / onDownload', () => {
it('delivers download:progress events to subscribers', () => {
const handler = vi.fn();
bus.onDownload('download:progress', handler);
const payload: DownloadProgressPayload = {
contentItemId: 42,
percent: 55.3,
speed: '1.23MiB/s',
eta: '00:42',
};
bus.emitDownload('download:progress', payload);
expect(handler).toHaveBeenCalledOnce();
expect(handler).toHaveBeenCalledWith(payload);
});
it('delivers download:complete events to subscribers', () => {
const handler = vi.fn();
bus.onDownload('download:complete', handler);
const payload: DownloadCompletePayload = { contentItemId: 7 };
bus.emitDownload('download:complete', payload);
expect(handler).toHaveBeenCalledOnce();
expect(handler).toHaveBeenCalledWith(payload);
});
it('delivers download:failed events to subscribers', () => {
const handler = vi.fn();
bus.onDownload('download:failed', handler);
const payload: DownloadFailedPayload = {
contentItemId: 99,
error: 'Video not found',
};
bus.emitDownload('download:failed', payload);
expect(handler).toHaveBeenCalledOnce();
expect(handler).toHaveBeenCalledWith(payload);
});
});
describe('multiple listeners', () => {
it('notifies all subscribers for the same event', () => {
const handler1 = vi.fn();
const handler2 = vi.fn();
bus.onDownload('download:progress', handler1);
bus.onDownload('download:progress', handler2);
const payload: DownloadProgressPayload = {
contentItemId: 1,
percent: 10,
speed: '500KiB/s',
eta: '01:30',
};
bus.emitDownload('download:progress', payload);
expect(handler1).toHaveBeenCalledOnce();
expect(handler2).toHaveBeenCalledOnce();
});
it('does not deliver events across different event types', () => {
const progressHandler = vi.fn();
const completeHandler = vi.fn();
bus.onDownload('download:progress', progressHandler);
bus.onDownload('download:complete', completeHandler);
bus.emitDownload('download:complete', { contentItemId: 1 });
expect(progressHandler).not.toHaveBeenCalled();
expect(completeHandler).toHaveBeenCalledOnce();
});
});
describe('offDownload', () => {
it('unsubscribes a listener so it no longer receives events', () => {
const handler = vi.fn();
bus.onDownload('download:progress', handler);
// Emit once — should fire
bus.emitDownload('download:progress', {
contentItemId: 1,
percent: 50,
speed: '1MiB/s',
eta: '00:30',
});
expect(handler).toHaveBeenCalledOnce();
// Unsubscribe
bus.offDownload('download:progress', handler);
// Emit again — should NOT fire
bus.emitDownload('download:progress', {
contentItemId: 1,
percent: 75,
speed: '1MiB/s',
eta: '00:15',
});
expect(handler).toHaveBeenCalledOnce(); // still 1
});
});
describe('emitDownload return value', () => {
it('returns true when there are listeners', () => {
bus.onDownload('download:progress', vi.fn());
const result = bus.emitDownload('download:progress', {
contentItemId: 1,
percent: 0,
speed: '',
eta: '',
});
expect(result).toBe(true);
});
it('returns false when there are no listeners', () => {
const result = bus.emitDownload('download:progress', {
contentItemId: 1,
percent: 0,
speed: '',
eta: '',
});
expect(result).toBe(false);
});
});
});

View file

@ -0,0 +1,229 @@
import { describe, it, expect } from 'vitest';
import { parseProgressLine, isProgressLine } from '../services/progress-parser';
describe('parseProgressLine', () => {
describe('standard progress lines', () => {
it('parses a normal progress line', () => {
const result = parseProgressLine(
'[download] 45.2% of ~150.00MiB at 1.23MiB/s ETA 00:42'
);
expect(result).toEqual({
percent: 45.2,
speed: '1.23MiB/s',
eta: '00:42',
totalSize: '~150.00MiB',
});
});
it('parses 100% completion', () => {
const result = parseProgressLine(
'[download] 100% of 150.00MiB at 2.50MiB/s ETA 00:00'
);
expect(result).toEqual({
percent: 100,
speed: '2.50MiB/s',
eta: '00:00',
totalSize: '150.00MiB',
});
});
it('parses a line with 0% progress', () => {
const result = parseProgressLine(
'[download] 0.0% of ~500.00MiB at 0.50MiB/s ETA 16:40'
);
expect(result).toEqual({
percent: 0,
speed: '0.50MiB/s',
eta: '16:40',
totalSize: '~500.00MiB',
});
});
it('parses a line with GiB total size', () => {
const result = parseProgressLine(
'[download] 12.3% of ~2.50GiB at 10.00MiB/s ETA 03:45'
);
expect(result).toEqual({
percent: 12.3,
speed: '10.00MiB/s',
eta: '03:45',
totalSize: '~2.50GiB',
});
});
it('parses a line with KiB speed', () => {
const result = parseProgressLine(
'[download] 5.0% of ~80.00MiB at 512.00KiB/s ETA 02:37'
);
expect(result).toEqual({
percent: 5.0,
speed: '512.00KiB/s',
eta: '02:37',
totalSize: '~80.00MiB',
});
});
});
describe('unknown values', () => {
it('handles Unknown speed', () => {
const result = parseProgressLine(
'[download] 10.0% of ~150.00MiB at Unknown speed ETA Unknown'
);
expect(result).toEqual({
percent: 10.0,
speed: '',
eta: '',
totalSize: '~150.00MiB',
});
});
it('handles Unknown ETA with known speed', () => {
const result = parseProgressLine(
'[download] 25.0% of ~300.00MiB at 5.00MiB/s ETA Unknown'
);
expect(result).toEqual({
percent: 25.0,
speed: '5.00MiB/s',
eta: '',
totalSize: '~300.00MiB',
});
});
});
describe('non-progress lines (returns null)', () => {
it('returns null for empty string', () => {
expect(parseProgressLine('')).toBeNull();
});
it('returns null for whitespace', () => {
expect(parseProgressLine(' ')).toBeNull();
});
it('returns null for info lines', () => {
expect(
parseProgressLine('[info] Writing video metadata as JSON to: file.json')
).toBeNull();
});
it('returns null for merge lines', () => {
expect(
parseProgressLine('[Merger] Merging formats into "video.mp4"')
).toBeNull();
});
it('returns null for postprocessor lines', () => {
expect(
parseProgressLine('[EmbedSubtitle] Embedding subtitles in "video.mp4"')
).toBeNull();
});
it('returns null for destination lines', () => {
expect(
parseProgressLine('[download] Destination: /path/to/file.mp4')
).toBeNull();
});
it('returns null for "has already been downloaded" lines', () => {
expect(
parseProgressLine('[download] Video abc123 has already been downloaded')
).toBeNull();
});
it('returns null for plain text output', () => {
expect(parseProgressLine('/path/to/downloaded/file.mp4')).toBeNull();
});
it('returns null for Deleting lines', () => {
expect(
parseProgressLine('Deleting original file /tmp/video.webm')
).toBeNull();
});
});
describe('edge cases', () => {
it('handles lines with leading whitespace', () => {
const result = parseProgressLine(
' [download] 50.0% of ~200.00MiB at 3.00MiB/s ETA 00:33 '
);
expect(result).toEqual({
percent: 50.0,
speed: '3.00MiB/s',
eta: '00:33',
totalSize: '~200.00MiB',
});
});
it('clamps percent to 100 maximum', () => {
// Shouldn't happen in practice, but defensive
const result = parseProgressLine(
'[download] 105.0% of ~100.00MiB at 1.00MiB/s ETA 00:00'
);
expect(result).not.toBeNull();
expect(result!.percent).toBe(100);
});
it('handles no tilde prefix on total size', () => {
const result = parseProgressLine(
'[download] 75.0% of 120.00MiB at 4.00MiB/s ETA 00:10'
);
expect(result).toEqual({
percent: 75.0,
speed: '4.00MiB/s',
eta: '00:10',
totalSize: '120.00MiB',
});
});
});
describe('multi-stream download lines', () => {
it('parses the first stream progress normally', () => {
// When downloading video+audio separately, yt-dlp shows progress for each
const result = parseProgressLine(
'[download] 30.0% of ~80.00MiB at 2.00MiB/s ETA 00:28'
);
expect(result).not.toBeNull();
expect(result!.percent).toBe(30.0);
});
it('parses the second stream progress (resets to 0%)', () => {
// The second stream starts at 0% again
const result = parseProgressLine(
'[download] 5.0% of ~20.00MiB at 1.50MiB/s ETA 00:13'
);
expect(result).not.toBeNull();
expect(result!.percent).toBe(5.0);
});
});
});
describe('isProgressLine', () => {
it('returns true for a progress line', () => {
expect(
isProgressLine('[download] 45.2% of ~150.00MiB at 1.23MiB/s ETA 00:42')
).toBe(true);
});
it('returns true for 100% line', () => {
expect(
isProgressLine('[download] 100% of 150.00MiB at 2.50MiB/s ETA 00:00')
).toBe(true);
});
it('returns false for destination line (no %)', () => {
expect(
isProgressLine('[download] Destination: /path/to/file.mp4')
).toBe(false);
});
it('returns false for info line', () => {
expect(isProgressLine('[info] Available formats for abc123')).toBe(false);
});
it('returns false for empty line', () => {
expect(isProgressLine('')).toBe(false);
});
it('returns false for filepath output', () => {
expect(isProgressLine('/media/youtube/channel/video.mp4')).toBe(false);
});
});

View file

@ -0,0 +1,56 @@
import type { ProgressInfo } from '../contexts/DownloadProgressContext';
interface DownloadProgressBarProps {
progress: ProgressInfo;
}
/**
* Compact progress bar for downloading content items.
* Shows percentage fill with speed/ETA text below.
*/
export function DownloadProgressBar({ progress }: DownloadProgressBarProps) {
const { percent, speed, eta } = progress;
return (
<div style={{ width: '100%', minWidth: 80 }}>
{/* Bar container */}
<div
style={{
width: '100%',
height: 6,
backgroundColor: 'var(--bg-hover)',
borderRadius: 3,
overflow: 'hidden',
}}
>
<div
style={{
width: `${Math.min(100, Math.max(0, percent))}%`,
height: '100%',
backgroundColor: 'var(--accent)',
borderRadius: 3,
transition: 'width 0.3s ease-out',
}}
/>
</div>
{/* Text line */}
<div
style={{
display: 'flex',
justifyContent: 'space-between',
fontSize: 'var(--font-size-xs)',
color: 'var(--text-muted)',
marginTop: 2,
fontVariantNumeric: 'tabular-nums',
lineHeight: 1.2,
}}
>
<span>{percent.toFixed(1)}%</span>
<span>
{speed && eta ? `${speed} · ${eta}` : speed || eta || ''}
</span>
</div>
</div>
);
}

View file

@ -0,0 +1,155 @@
import { createContext, useContext, useCallback, useRef, type ReactNode } from 'react';
import { useQueryClient } from '@tanstack/react-query';
import { useSyncExternalStore } from 'react';
import { useWebSocket } from '../hooks/useWebSocket';
// ── Types ──
export interface ProgressInfo {
percent: number;
speed: string;
eta: string;
}
interface DownloadProgressEvent {
type: 'download:progress';
contentItemId: number;
percent: number;
speed: string;
eta: string;
}
interface DownloadCompleteEvent {
type: 'download:complete';
contentItemId: number;
}
interface DownloadFailedEvent {
type: 'download:failed';
contentItemId: number;
error: string;
}
type DownloadEvent = DownloadProgressEvent | DownloadCompleteEvent | DownloadFailedEvent;
// ── Store (external to React for zero unnecessary re-renders) ──
class ProgressStore {
private _map = new Map<number, ProgressInfo>();
private _listeners = new Set<() => void>();
subscribe = (listener: () => void) => {
this._listeners.add(listener);
return () => this._listeners.delete(listener);
};
getSnapshot = () => this._map;
set(id: number, info: ProgressInfo) {
this._map = new Map(this._map);
this._map.set(id, info);
this._notify();
}
delete(id: number) {
if (!this._map.has(id)) return;
this._map = new Map(this._map);
this._map.delete(id);
this._notify();
}
private _notify() {
for (const listener of this._listeners) listener();
}
}
// ── Context ──
interface DownloadProgressContextValue {
/** Get progress for a specific content item. Returns undefined if not downloading. */
getProgress: (contentItemId: number) => ProgressInfo | undefined;
/** Whether the WebSocket is connected */
isConnected: boolean;
}
const DownloadProgressContext = createContext<DownloadProgressContextValue | null>(null);
// ── Provider ──
export function DownloadProgressProvider({ children }: { children: ReactNode }) {
const queryClient = useQueryClient();
const storeRef = useRef(new ProgressStore());
const store = storeRef.current;
// Subscribe to the store with useSyncExternalStore for optimal re-renders
const progressMap = useSyncExternalStore(store.subscribe, store.getSnapshot);
const handleMessage = useCallback(
(data: unknown) => {
const event = data as DownloadEvent;
if (!event?.type) return;
switch (event.type) {
case 'download:progress':
store.set(event.contentItemId, {
percent: event.percent,
speed: event.speed,
eta: event.eta,
});
break;
case 'download:complete':
store.delete(event.contentItemId);
// Invalidate content queries so the UI refreshes with updated status
queryClient.invalidateQueries({ queryKey: ['content'] });
queryClient.invalidateQueries({ queryKey: ['queue'] });
break;
case 'download:failed':
store.delete(event.contentItemId);
// Invalidate to show updated status (failed)
queryClient.invalidateQueries({ queryKey: ['content'] });
queryClient.invalidateQueries({ queryKey: ['queue'] });
break;
}
},
[store, queryClient],
);
const { isConnected } = useWebSocket({ onMessage: handleMessage });
const getProgress = useCallback(
(contentItemId: number): ProgressInfo | undefined => {
return progressMap.get(contentItemId);
},
[progressMap],
);
return (
<DownloadProgressContext.Provider value={{ getProgress, isConnected }}>
{children}
</DownloadProgressContext.Provider>
);
}
// ── Hook ──
/**
* Get download progress for a specific content item.
* Returns undefined when the item is not actively downloading via WebSocket.
*/
export function useDownloadProgress(contentItemId: number): ProgressInfo | undefined {
const context = useContext(DownloadProgressContext);
if (!context) {
throw new Error('useDownloadProgress must be used within a DownloadProgressProvider');
}
return context.getProgress(contentItemId);
}
/**
* Get the WebSocket connection status.
*/
export function useDownloadProgressConnection(): boolean {
const context = useContext(DownloadProgressContext);
return context?.isConnected ?? false;
}

View file

@ -0,0 +1,91 @@
import { useEffect, useRef, useState, useCallback } from 'react';
interface UseWebSocketOptions {
/** Called for each incoming message */
onMessage?: (data: unknown) => void;
/** Maximum reconnect delay in ms (default: 30000) */
maxReconnectDelay?: number;
}
/**
* Auto-reconnecting WebSocket hook.
*
* Connects to the server's /ws endpoint. Reconnects with exponential backoff
* on disconnection (1s 2s 4s ... maxReconnectDelay).
*
* The WebSocket URL is derived from the current page location:
* - `wss://host/ws` for HTTPS
* - `ws://host/ws` for HTTP
*/
export function useWebSocket(options: UseWebSocketOptions = {}) {
const { onMessage, maxReconnectDelay = 30_000 } = options;
const [isConnected, setIsConnected] = useState(false);
// Use refs to avoid stale closures in the reconnect loop
const onMessageRef = useRef(onMessage);
onMessageRef.current = onMessage;
const wsRef = useRef<WebSocket | null>(null);
const reconnectDelayRef = useRef(1000);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout>>();
const unmountedRef = useRef(false);
const connect = useCallback(() => {
if (unmountedRef.current) return;
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const url = `${protocol}//${window.location.host}/ws`;
const ws = new WebSocket(url);
wsRef.current = ws;
ws.onopen = () => {
if (unmountedRef.current) {
ws.close();
return;
}
setIsConnected(true);
reconnectDelayRef.current = 1000; // Reset backoff on success
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data as string);
onMessageRef.current?.(data);
} catch {
// Ignore non-JSON messages
}
};
ws.onclose = () => {
setIsConnected(false);
wsRef.current = null;
if (!unmountedRef.current) {
// Reconnect with exponential backoff
const delay = reconnectDelayRef.current;
reconnectTimerRef.current = setTimeout(connect, delay);
reconnectDelayRef.current = Math.min(delay * 2, maxReconnectDelay);
}
};
ws.onerror = () => {
// onclose will fire after onerror — reconnect handled there
};
}, [maxReconnectDelay]);
useEffect(() => {
unmountedRef.current = false;
connect();
return () => {
unmountedRef.current = true;
clearTimeout(reconnectTimerRef.current);
if (wsRef.current) {
wsRef.current.close();
wsRef.current = null;
}
};
}, [connect]);
return { isConnected };
}

View file

@ -2,6 +2,7 @@ import { StrictMode } from 'react';
import { createRoot } from 'react-dom/client';
import { BrowserRouter } from 'react-router-dom';
import { QueryClient, QueryClientProvider } from '@tanstack/react-query';
import { DownloadProgressProvider } from './contexts/DownloadProgressContext';
import { App } from './App';
import './styles/global.css';
@ -21,9 +22,11 @@ if (!root) throw new Error('Root element not found');
createRoot(root).render(
<StrictMode>
<QueryClientProvider client={queryClient}>
<BrowserRouter>
<App />
</BrowserRouter>
<DownloadProgressProvider>
<BrowserRouter>
<App />
</BrowserRouter>
</DownloadProgressProvider>
</QueryClientProvider>
</StrictMode>,
);

View file

@ -27,11 +27,24 @@ import { Table, type Column } from '../components/Table';
import { PlatformBadge } from '../components/PlatformBadge';
import { StatusBadge } from '../components/StatusBadge';
import { QualityLabel } from '../components/QualityLabel';
import { DownloadProgressBar } from '../components/DownloadProgressBar';
import { Modal } from '../components/Modal';
import { useDownloadProgress } from '../contexts/DownloadProgressContext';
import type { ContentItem, MonitoringMode } from '@shared/types/index';
// ── Helpers ──
/** Status cell that shows progress bar for downloading items */
function ContentStatusCell({ item }: { item: ContentItem }) {
const progress = useDownloadProgress(item.id);
if (item.status === 'downloading' && progress) {
return <DownloadProgressBar progress={progress} />;
}
return <StatusBadge status={item.status} />;
}
function formatDuration(seconds: number | null): string {
if (seconds == null) return '—';
const h = Math.floor(seconds / 3600);
@ -544,7 +557,7 @@ export function ChannelDetail() {
label: 'Status',
width: '120px',
sortable: true,
render: (item) => <StatusBadge status={item.status} />,
render: (item) => <ContentStatusCell item={item} />,
},
{
key: 'quality',

View file

@ -14,11 +14,4 @@ export default defineConfig({
'@shared': resolve(__dirname, '../types'),
},
},
server: {
port: 3000,
proxy: {
'/api': 'http://localhost:8989',
'/ping': 'http://localhost:8989',
},
},
});

View file

@ -15,6 +15,7 @@ import { FileOrganizer } from './services/file-organizer';
import { CookieManager } from './services/cookie-manager';
import { QualityAnalyzer } from './services/quality-analyzer';
import { DownloadService } from './services/download';
import { DownloadEventBus } from './services/event-bus';
import { QueueService } from './services/queue';
import { NotificationService } from './services/notification';
import { HealthService } from './services/health';
@ -22,6 +23,7 @@ import { PlatformRegistry } from './sources/platform-source';
import { YouTubeSource } from './sources/youtube';
import { SoundCloudSource } from './sources/soundcloud';
import { Platform } from './types/index';
import type { ViteDevServer } from 'vite';
const APP_NAME = 'Tubearr';
@ -43,7 +45,21 @@ async function main(): Promise<void> {
console.log(`[${APP_NAME}] App settings seeded`);
// 3. Build and configure Fastify server
const server = await buildServer({ db });
// In dev mode, embed Vite for HMR — single port, no separate frontend process
let vite: ViteDevServer | undefined;
if (appConfig.nodeEnv !== 'production') {
const { createServer: createViteServer } = await import('vite');
const { resolve } = await import('node:path');
vite = await createViteServer({
configFile: resolve(process.cwd(), 'src/frontend/vite.config.ts'),
server: { middlewareMode: true },
appType: 'custom',
});
console.log(`[${APP_NAME}] Vite dev server embedded (HMR active)`);
}
const eventBus = new DownloadEventBus();
const server = await buildServer({ db, eventBus, vite });
// 4. Set up shared services
const rateLimiter = new RateLimiter({
@ -59,7 +75,8 @@ async function main(): Promise<void> {
rateLimiter,
fileOrganizer,
qualityAnalyzer,
cookieManager
cookieManager,
eventBus
);
// Attach download service to server for route access
@ -135,6 +152,7 @@ async function main(): Promise<void> {
scheduler.stop();
}
await server.close();
if (vite) await vite.close();
console.log(`[${APP_NAME}] Server closed.`);
} catch (err) {
console.error(`[${APP_NAME}] Error closing server:`, err);

View file

@ -1,7 +1,8 @@
import Fastify, { type FastifyInstance } from 'fastify';
import cors from '@fastify/cors';
import fastifyStatic from '@fastify/static';
import { existsSync } from 'node:fs';
import middie from '@fastify/middie';
import { existsSync, readFileSync } from 'node:fs';
import { join } from 'node:path';
import { type LibSQLDatabase } from 'drizzle-orm/libsql';
import type * as schema from '../db/schema/index';
@ -21,10 +22,13 @@ import { platformSettingsRoutes } from './routes/platform-settings';
import { scanRoutes } from './routes/scan';
import { collectRoutes } from './routes/collect';
import { playlistRoutes } from './routes/playlist';
import { websocketRoutes } from './routes/websocket';
import type { SchedulerService } from '../services/scheduler';
import type { DownloadService } from '../services/download';
import type { QueueService } from '../services/queue';
import type { HealthService } from '../services/health';
import type { DownloadEventBus } from '../services/event-bus';
import type { ViteDevServer } from 'vite';
// Extend Fastify's type system so routes can access the database and scheduler
declare module 'fastify' {
@ -39,6 +43,9 @@ declare module 'fastify' {
export interface BuildServerOptions {
db: LibSQLDatabase<typeof schema>;
eventBus?: DownloadEventBus;
/** Vite dev server instance for HMR in development — omit in production */
vite?: ViteDevServer;
}
/**
@ -103,24 +110,39 @@ export async function buildServer(opts: BuildServerOptions): Promise<FastifyInst
await server.register(collectRoutes);
await server.register(playlistRoutes);
// ── Static file serving for the frontend SPA ──
const frontendDir = join(process.cwd(), 'dist', 'frontend');
if (existsSync(frontendDir)) {
await server.register(fastifyStatic, {
root: frontendDir,
prefix: '/',
wildcard: false,
});
// Register WebSocket route (before static file serving so /ws is handled)
if (opts.eventBus) {
await server.register(websocketRoutes, { eventBus: opts.eventBus });
}
// SPA catch-all: serve index.html for any GET request that isn't an API route,
// /ping, or a static file. API and non-GET requests get a standard 404 JSON.
// ── Frontend serving ──
// Dev mode: Vite middleware handles HMR, module transforms, and index.html
// Production: @fastify/static serves the built frontend from dist/frontend/
if (opts.vite) {
// Register @fastify/middie to support Connect-style middleware
await server.register(middie);
// Pipe Vite's dev middleware through Fastify (HMR websocket, module transforms, etc.)
server.use(opts.vite.middlewares);
// SPA catch-all: transform and serve index.html through Vite's pipeline
const vite = opts.vite;
server.setNotFoundHandler(async (request, reply) => {
if (
request.method === 'GET' &&
!request.url.startsWith('/api/') &&
request.url !== '/ping'
) {
return reply.sendFile('index.html');
try {
const indexPath = join(vite.config.root, 'index.html');
let html = readFileSync(indexPath, 'utf-8');
html = await vite.transformIndexHtml(request.url, html);
return reply.type('text/html').send(html);
} catch (err) {
// Let Vite fix the stack trace for better DX
if (err instanceof Error) vite.ssrFixStacktrace(err);
throw err;
}
}
return reply.status(404).send({
@ -130,14 +152,42 @@ export async function buildServer(opts: BuildServerOptions): Promise<FastifyInst
});
});
} else {
// No frontend build — standard 404 for all unknown routes
server.setNotFoundHandler(async (_request, reply) => {
return reply.status(404).send({
statusCode: 404,
error: 'Not Found',
message: 'Route not found',
// Production: serve pre-built static frontend
const frontendDir = join(process.cwd(), 'dist', 'frontend');
if (existsSync(frontendDir)) {
await server.register(fastifyStatic, {
root: frontendDir,
prefix: '/',
wildcard: false,
});
});
// SPA catch-all: serve index.html for any GET request that isn't an API route,
// /ping, or a static file. API and non-GET requests get a standard 404 JSON.
server.setNotFoundHandler(async (request, reply) => {
if (
request.method === 'GET' &&
!request.url.startsWith('/api/') &&
request.url !== '/ping'
) {
return reply.sendFile('index.html');
}
return reply.status(404).send({
statusCode: 404,
error: 'Not Found',
message: `Route ${request.method}:${request.url} not found`,
});
});
} else {
// No frontend build — standard 404 for all unknown routes
server.setNotFoundHandler(async (_request, reply) => {
return reply.status(404).send({
statusCode: 404,
error: 'Not Found',
message: 'Route not found',
});
});
}
}
return server;

View file

@ -0,0 +1,75 @@
import websocket from '@fastify/websocket';
import type { FastifyInstance } from 'fastify';
import type { WebSocket } from 'ws';
import type { DownloadEventBus, DownloadProgressPayload, DownloadCompletePayload, DownloadFailedPayload } from '../../services/event-bus';
/**
* WebSocket route plugin.
*
* Registers @fastify/websocket and a GET /ws route that broadcasts
* download events (progress, complete, failed) to all connected clients.
*
* The event bus is passed via plugin options. Each connected client
* gets its own set of event bus listeners, which are cleaned up on disconnect.
*
* Auth: The /ws route bypasses API key auth (not under /api/*).
* This is intentional WebSocket only broadcasts read-only progress data.
*/
export async function websocketRoutes(
fastify: FastifyInstance,
opts: { eventBus: DownloadEventBus }
): Promise<void> {
const { eventBus } = opts;
await fastify.register(websocket);
fastify.get('/ws', { websocket: true }, (socket: WebSocket) => {
console.log('[websocket] client connected');
// Create listeners for each event type
const onProgress = (data: DownloadProgressPayload) => {
sendJson(socket, { type: 'download:progress', ...data });
};
const onComplete = (data: DownloadCompletePayload) => {
sendJson(socket, { type: 'download:complete', ...data });
};
const onFailed = (data: DownloadFailedPayload) => {
sendJson(socket, { type: 'download:failed', ...data });
};
// Subscribe to event bus
eventBus.onDownload('download:progress', onProgress);
eventBus.onDownload('download:complete', onComplete);
eventBus.onDownload('download:failed', onFailed);
// Cleanup on disconnect
const cleanup = () => {
eventBus.offDownload('download:progress', onProgress);
eventBus.offDownload('download:complete', onComplete);
eventBus.offDownload('download:failed', onFailed);
console.log('[websocket] client disconnected');
};
socket.on('close', cleanup);
socket.on('error', (err) => {
console.log(`[websocket] client error: ${err.message}`);
cleanup();
});
});
}
/**
* Send a JSON message to a WebSocket client.
* Silently catches send errors (client may have disconnected).
*/
function sendJson(socket: WebSocket, data: Record<string, unknown>): void {
try {
if (socket.readyState === socket.OPEN) {
socket.send(JSON.stringify(data));
}
} catch {
// Client disconnected — swallow
}
}

View file

@ -1,9 +1,12 @@
import { stat } from 'node:fs/promises';
import { extname } from 'node:path';
import { createInterface } from 'node:readline';
import type { LibSQLDatabase } from 'drizzle-orm/libsql';
import type * as schema from '../db/schema/index';
import { execYtDlp, YtDlpError } from '../sources/yt-dlp';
import { execYtDlp, spawnYtDlp, YtDlpError } from '../sources/yt-dlp';
import { updateContentItem } from '../db/repositories/content-repository';
import { parseProgressLine } from './progress-parser';
import type { DownloadEventBus } from './event-bus';
import type { RateLimiter } from './rate-limiter';
import type { FileOrganizer } from './file-organizer';
import type { QualityAnalyzer } from './quality-analyzer';
@ -28,13 +31,18 @@ type Db = LibSQLDatabase<typeof schema>;
* run quality analysis update content item.
*/
export class DownloadService {
private readonly eventBus?: DownloadEventBus;
constructor(
private readonly db: Db,
private readonly rateLimiter: RateLimiter,
private readonly fileOrganizer: FileOrganizer,
private readonly qualityAnalyzer: QualityAnalyzer,
private readonly cookieManager: CookieManager
) {}
private readonly cookieManager: CookieManager,
eventBus?: DownloadEventBus
) {
this.eventBus = eventBus;
}
/**
* Download a content item and update its record in the database.
@ -77,14 +85,20 @@ export class DownloadService {
);
const startTime = Date.now();
// Execute download — 30 minute timeout
const result = await execYtDlp(args, { timeout: 1_800_000 });
// Execute download — streaming spawn when event bus is available, buffered exec otherwise
let stdout: string;
if (this.eventBus) {
stdout = await this.spawnDownload(args, contentItem.id, 1_800_000);
} else {
const result = await execYtDlp(args, { timeout: 1_800_000 });
stdout = result.stdout;
}
const duration = Date.now() - startTime;
console.log(`${logPrefix} yt-dlp completed in ${duration}ms`);
// Parse final file path from --print after_move:filepath output
const finalPath = this.parseFinalPath(result.stdout, outputTemplate);
const finalPath = this.parseFinalPath(stdout, outputTemplate);
// Ensure directories exist and resolve duplicate filenames
await this.fileOrganizer.ensureDirectory(finalPath);
@ -111,6 +125,9 @@ export class DownloadService {
this.rateLimiter.reportSuccess(channel.platform as Platform);
// Emit download:complete event
this.eventBus?.emitDownload('download:complete', { contentItemId: contentItem.id });
console.log(
`${logPrefix} status=downloaded path="${finalPath}" size=${fileSize} format=${format}`
);
@ -126,12 +143,107 @@ export class DownloadService {
const errorMsg = err instanceof Error ? err.message : String(err);
console.log(`${logPrefix} status=failed error="${errorMsg.slice(0, 200)}"`);
// Emit download:failed event
this.eventBus?.emitDownload('download:failed', {
contentItemId: contentItem.id,
error: errorMsg.slice(0, 200),
});
throw err;
}
}
// ── Internal ──
/**
* Spawn yt-dlp and stream progress events via the event bus.
* Returns collected stdout (non-progress lines) for final path parsing.
*/
private spawnDownload(
args: string[],
contentItemId: number,
timeoutMs: number
): Promise<string> {
return new Promise((resolve, reject) => {
// Add --newline to ensure progress updates are separate lines
const spawnArgs = ['--newline', '--progress', ...args];
const child = spawnYtDlp(spawnArgs);
const stdoutLines: string[] = [];
const stderrChunks: string[] = [];
let killed = false;
// Timeout — kill child after timeoutMs
const timer = setTimeout(() => {
killed = true;
child.kill('SIGTERM');
}, timeoutMs);
// Parse stdout line-by-line
if (child.stdout) {
const rl = createInterface({ input: child.stdout, crlfDelay: Infinity });
rl.on('line', (line: string) => {
const progress = parseProgressLine(line);
if (progress) {
this.eventBus!.emitDownload('download:progress', {
contentItemId,
percent: progress.percent,
speed: progress.speed,
eta: progress.eta,
});
} else {
// Non-progress lines — collect for final path parsing
stdoutLines.push(line);
}
});
}
// Collect stderr
if (child.stderr) {
child.stderr.on('data', (chunk: Buffer) => {
stderrChunks.push(chunk.toString());
});
}
// Handle process exit
child.on('close', (code: number | null) => {
clearTimeout(timer);
const stdout = stdoutLines.join('\n');
const stderr = stderrChunks.join('');
if (killed) {
reject(new YtDlpError(
`yt-dlp timed out after ${timeoutMs}ms`,
stderr,
-1
));
return;
}
if (code !== 0 && code !== null) {
reject(new YtDlpError(
`yt-dlp exited with code ${code}: ${stderr.slice(0, 200)}`,
stderr,
code
));
return;
}
resolve(stdout);
});
// Handle spawn errors (e.g., yt-dlp not found)
child.on('error', (err: Error) => {
clearTimeout(timer);
reject(new YtDlpError(
`Failed to spawn yt-dlp: ${err.message}`,
'',
-1
));
});
});
}
/**
* Build the yt-dlp command-line args based on content type and format profile.
*/

66
src/services/event-bus.ts Normal file
View file

@ -0,0 +1,66 @@
import { EventEmitter } from 'node:events';
// ── Event Payload Types ──
export interface DownloadProgressPayload {
contentItemId: number;
percent: number;
speed: string;
eta: string;
}
export interface DownloadCompletePayload {
contentItemId: number;
}
export interface DownloadFailedPayload {
contentItemId: number;
error: string;
}
// ── Event Map ──
export interface DownloadEventMap {
'download:progress': [DownloadProgressPayload];
'download:complete': [DownloadCompletePayload];
'download:failed': [DownloadFailedPayload];
}
// ── Typed Event Bus ──
/**
* Typed EventEmitter for download events.
* Decouples download progress producers (DownloadService) from
* consumers (WebSocket route, logging, etc).
*/
export class DownloadEventBus extends EventEmitter {
/**
* Emit a typed download event.
*/
emitDownload<K extends keyof DownloadEventMap>(
event: K,
...args: DownloadEventMap[K]
): boolean {
return this.emit(event, ...args);
}
/**
* Subscribe to a typed download event.
*/
onDownload<K extends keyof DownloadEventMap>(
event: K,
listener: (...args: DownloadEventMap[K]) => void
): this {
return this.on(event, listener as (...args: unknown[]) => void);
}
/**
* Unsubscribe from a typed download event.
*/
offDownload<K extends keyof DownloadEventMap>(
event: K,
listener: (...args: DownloadEventMap[K]) => void
): this {
return this.off(event, listener as (...args: unknown[]) => void);
}
}

View file

@ -0,0 +1,71 @@
// ── Types ──
export interface ProgressInfo {
/** Download percentage (0100). */
percent: number;
/** Human-readable speed string, e.g. "1.23MiB/s". Empty if unknown. */
speed: string;
/** Human-readable ETA string, e.g. "00:42". Empty if unknown. */
eta: string;
/** Human-readable total size string, e.g. "~150.00MiB". Empty if unknown. */
totalSize: string;
}
// ── Parser ──
/**
* Regex for the standard yt-dlp progress line format:
* [download] 45.2% of ~150.00MiB at 1.23MiB/s ETA 00:42
* [download] 100% of 150.00MiB at 1.23MiB/s ETA 00:00
* [download] 12.3% of ~150.00MiB at Unknown speed ETA Unknown
*
* The `--newline` flag ensures each progress update is a separate line.
*/
const PROGRESS_REGEX =
/\[download\]\s+([\d.]+)%\s+of\s+(~?[\d.]+\S+)\s+at\s+(.+?)\s+ETA\s+(.+)/;
/**
* Parse a single line of yt-dlp stdout into structured progress info.
*
* Returns `null` for non-progress lines (info messages, merge output,
* postprocessor lines, etc). Only parses `[download] XX.X% of ...` lines.
*
* @param line - A single line from yt-dlp stdout (may include trailing whitespace)
*/
export function parseProgressLine(line: string): ProgressInfo | null {
const trimmed = line.trim();
if (!trimmed) return null;
const match = trimmed.match(PROGRESS_REGEX);
if (!match) return null;
const [, percentStr, totalSize, speedRaw, etaRaw] = match;
const percent = parseFloat(percentStr);
if (isNaN(percent)) return null;
// Normalize speed: "Unknown speed" → "", trim whitespace
const speed = speedRaw.trim().toLowerCase().startsWith('unknown')
? ''
: speedRaw.trim();
// Normalize ETA: "Unknown" → "", trim whitespace
const eta = etaRaw.trim().toLowerCase().startsWith('unknown')
? ''
: etaRaw.trim();
return {
percent: Math.min(100, Math.max(0, percent)),
speed,
eta,
totalSize: totalSize.trim(),
};
}
/**
* Check if a line is a yt-dlp progress line (starts with [download] and contains %).
* Faster than running the full regex for filtering purposes.
*/
export function isProgressLine(line: string): boolean {
return line.includes('[download]') && line.includes('%');
}

View file

@ -1,4 +1,4 @@
import { execFile as execFileCb } from 'node:child_process';
import { execFile as execFileCb, spawn, type ChildProcess } from 'node:child_process';
import { promisify } from 'node:util';
const execFileAsync = promisify(execFileCb);
@ -159,6 +159,25 @@ export function parseSingleJson(stdout: string): unknown {
}
}
// ── Spawn (streaming) ──
/**
* Spawn yt-dlp as a child process for streaming stdout.
* Unlike `execYtDlp`, this returns the `ChildProcess` immediately so the caller
* can read stdout/stderr line-by-line in real time (for progress parsing).
*
* The caller is responsible for:
* - Reading stdout/stderr
* - Handling the 'close' event
* - Implementing timeout (via setTimeout + process.kill)
*/
export function spawnYtDlp(args: string[]): ChildProcess {
return spawn('yt-dlp', args, {
windowsHide: true,
stdio: ['ignore', 'pipe', 'pipe'],
});
}
// ── Health Check ──
/**