feat: Add pause/resume buttons, paused status badge, and Paused filter…
- "src/frontend/src/pages/Queue.tsx" - "src/frontend/src/api/hooks/useQueue.ts" - "src/frontend/src/components/StatusBadge.tsx" GSD-Task: S07/T04
This commit is contained in:
parent
bd9e07f878
commit
daf892edad
8 changed files with 381 additions and 15 deletions
|
|
@ -518,6 +518,129 @@ describe('QueueService', () => {
|
|||
});
|
||||
});
|
||||
|
||||
// ── Pause ──
|
||||
|
||||
describe('pauseItem', () => {
|
||||
it('pauses a pending item', async () => {
|
||||
const qs = new QueueService(db, mockDownloadService as any, 0);
|
||||
qs.stop();
|
||||
|
||||
await qs.enqueue(contentItems[0].id);
|
||||
|
||||
const paused = await qs.pauseItem(1);
|
||||
expect(paused.status).toBe('paused');
|
||||
});
|
||||
|
||||
it('pauses a downloading item by aborting the active download', async () => {
|
||||
// Use a deferred so we can control when the download completes
|
||||
let rejectFn: (err: Error) => void;
|
||||
mockDownloadService.downloadItem.mockImplementationOnce(() => {
|
||||
return new Promise<void>((_resolve, reject) => {
|
||||
rejectFn = reject;
|
||||
});
|
||||
});
|
||||
|
||||
const qs = new QueueService(db, mockDownloadService as any, 1);
|
||||
|
||||
await qs.enqueue(contentItems[0].id);
|
||||
await tick(50); // Let it transition to downloading
|
||||
|
||||
// Item should be downloading
|
||||
let item = await getQueueItemById(db, 1);
|
||||
expect(item!.status).toBe('downloading');
|
||||
|
||||
// Pause it — this should abort the download
|
||||
const paused = await qs.pauseItem(1);
|
||||
expect(paused.status).toBe('paused');
|
||||
|
||||
// Simulate the abort rejection (in real code, the AbortController signal kills yt-dlp)
|
||||
rejectFn!(new Error('aborted'));
|
||||
await tick(50);
|
||||
|
||||
// Item should remain paused (not retried as failed)
|
||||
item = await getQueueItemById(db, 1);
|
||||
expect(item!.status).toBe('paused');
|
||||
});
|
||||
|
||||
it('throws for completed item', async () => {
|
||||
const qs = new QueueService(db, mockDownloadService as any, 0);
|
||||
qs.stop();
|
||||
|
||||
await qs.enqueue(contentItems[0].id);
|
||||
await updateQueueItemStatus(db, 1, 'completed');
|
||||
|
||||
await expect(qs.pauseItem(1)).rejects.toThrow(/Cannot pause/);
|
||||
});
|
||||
|
||||
it('throws for cancelled item', async () => {
|
||||
const qs = new QueueService(db, mockDownloadService as any, 0);
|
||||
qs.stop();
|
||||
|
||||
await qs.enqueue(contentItems[0].id);
|
||||
await updateQueueItemStatus(db, 1, 'cancelled');
|
||||
|
||||
await expect(qs.pauseItem(1)).rejects.toThrow(/Cannot pause/);
|
||||
});
|
||||
|
||||
it('throws for non-existent item', async () => {
|
||||
const qs = new QueueService(db, mockDownloadService as any, 0);
|
||||
await expect(qs.pauseItem(99999)).rejects.toThrow(/not found/);
|
||||
});
|
||||
});
|
||||
|
||||
// ── Resume ──
|
||||
|
||||
describe('resumeItem', () => {
|
||||
it('resumes a paused item back to pending', async () => {
|
||||
const qs = new QueueService(db, mockDownloadService as any, 0);
|
||||
qs.stop();
|
||||
|
||||
await qs.enqueue(contentItems[0].id);
|
||||
await qs.pauseItem(1);
|
||||
|
||||
const resumed = await qs.resumeItem(1);
|
||||
expect(resumed.status).toBe('pending');
|
||||
expect(resumed.error).toBeNull();
|
||||
|
||||
// Content status should be reset to queued
|
||||
const contentItem = await getContentItemById(db, contentItems[0].id);
|
||||
expect(contentItem!.status).toBe('queued');
|
||||
});
|
||||
|
||||
it('throws for non-paused item', async () => {
|
||||
const qs = new QueueService(db, mockDownloadService as any, 0);
|
||||
qs.stop();
|
||||
|
||||
await qs.enqueue(contentItems[0].id);
|
||||
|
||||
await expect(qs.resumeItem(1)).rejects.toThrow(/expected 'paused'/);
|
||||
});
|
||||
|
||||
it('throws for non-existent item', async () => {
|
||||
const qs = new QueueService(db, mockDownloadService as any, 0);
|
||||
await expect(qs.resumeItem(99999)).rejects.toThrow(/not found/);
|
||||
});
|
||||
|
||||
it('triggers processNext after resume', async () => {
|
||||
// After resuming, the item should get picked up and processed
|
||||
const qs = new QueueService(db, mockDownloadService as any, 1);
|
||||
|
||||
// Enqueue and pause
|
||||
qs.stop();
|
||||
await qs.enqueue(contentItems[0].id);
|
||||
await qs.pauseItem(1);
|
||||
|
||||
// Resume — processNext should fire and download
|
||||
qs.start();
|
||||
await qs.resumeItem(1);
|
||||
await tick(100);
|
||||
|
||||
const item = await getQueueItemById(db, 1);
|
||||
expect(item!.status).toBe('completed');
|
||||
expect(mockDownloadService.downloadItem).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
// ── getState ──
|
||||
|
||||
describe('getState', () => {
|
||||
|
|
@ -539,6 +662,7 @@ describe('QueueService', () => {
|
|||
expect(state.failed).toBe(1);
|
||||
expect(state.downloading).toBe(0);
|
||||
expect(state.cancelled).toBe(0);
|
||||
expect(state.paused).toBe(0);
|
||||
});
|
||||
|
||||
it('returns all zeros when queue is empty', async () => {
|
||||
|
|
@ -551,6 +675,7 @@ describe('QueueService', () => {
|
|||
completed: 0,
|
||||
failed: 0,
|
||||
cancelled: 0,
|
||||
paused: 0,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -192,6 +192,7 @@ export async function countQueueItemsByStatus(
|
|||
completed: 0,
|
||||
failed: 0,
|
||||
cancelled: 0,
|
||||
paused: 0,
|
||||
};
|
||||
|
||||
for (const row of rows) {
|
||||
|
|
|
|||
|
|
@ -57,3 +57,33 @@ export function useCancelQueueItem() {
|
|||
},
|
||||
});
|
||||
}
|
||||
|
||||
/** Pause a pending or downloading queue item. */
|
||||
export function usePauseQueueItem() {
|
||||
const queryClient = useQueryClient();
|
||||
|
||||
return useMutation({
|
||||
mutationFn: (id: number) =>
|
||||
apiClient.put<{ success: boolean; data: QueueItem }>(
|
||||
`/api/v1/queue/${id}/pause`,
|
||||
),
|
||||
onSuccess: () => {
|
||||
queryClient.invalidateQueries({ queryKey: queueKeys.all });
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/** Resume a paused queue item. */
|
||||
export function useResumeQueueItem() {
|
||||
const queryClient = useQueryClient();
|
||||
|
||||
return useMutation({
|
||||
mutationFn: (id: number) =>
|
||||
apiClient.put<{ success: boolean; data: QueueItem }>(
|
||||
`/api/v1/queue/${id}/resume`,
|
||||
),
|
||||
onSuccess: () => {
|
||||
queryClient.invalidateQueries({ queryKey: queueKeys.all });
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ const STATUS_STYLES: Record<string, BadgeStyle> = {
|
|||
// Queue statuses
|
||||
pending: { color: 'var(--warning)', backgroundColor: 'var(--warning-bg)' },
|
||||
completed: { color: 'var(--success)', backgroundColor: 'var(--success-bg)' },
|
||||
paused: { color: 'var(--info)', backgroundColor: 'var(--info-bg)' },
|
||||
cancelled: { color: 'var(--text-muted)', backgroundColor: 'var(--bg-hover)' },
|
||||
// Check statuses
|
||||
success: { color: 'var(--success)', backgroundColor: 'var(--success-bg)' },
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ const STATUS_TABS: { value: QueueStatus | ''; label: string }[] = [
|
|||
{ value: '', label: 'All' },
|
||||
{ value: 'pending', label: 'Pending' },
|
||||
{ value: 'downloading', label: 'Downloading' },
|
||||
{ value: 'paused', label: 'Paused' },
|
||||
{ value: 'completed', label: 'Completed' },
|
||||
{ value: 'failed', label: 'Failed' },
|
||||
];
|
||||
|
|
@ -161,9 +162,39 @@ export function Queue() {
|
|||
{
|
||||
key: 'actions',
|
||||
label: 'Actions',
|
||||
width: '100px',
|
||||
width: '120px',
|
||||
render: (item) => (
|
||||
<div style={{ display: 'flex', gap: 'var(--space-1)' }}>
|
||||
{(item.status === 'pending' || item.status === 'downloading') && (
|
||||
<button
|
||||
onClick={(e) => {
|
||||
e.stopPropagation();
|
||||
pauseMutation.mutate(item.id);
|
||||
}}
|
||||
disabled={pauseMutation.isPending}
|
||||
title="Pause"
|
||||
aria-label="Pause item"
|
||||
className="btn-icon"
|
||||
style={{ color: 'var(--info)' }}
|
||||
>
|
||||
<Pause size={14} />
|
||||
</button>
|
||||
)}
|
||||
{item.status === 'paused' && (
|
||||
<button
|
||||
onClick={(e) => {
|
||||
e.stopPropagation();
|
||||
resumeMutation.mutate(item.id);
|
||||
}}
|
||||
disabled={resumeMutation.isPending}
|
||||
title="Resume"
|
||||
aria-label="Resume paused item"
|
||||
className="btn-icon"
|
||||
style={{ color: 'var(--success)' }}
|
||||
>
|
||||
<Play size={14} />
|
||||
</button>
|
||||
)}
|
||||
{item.status === 'failed' && (
|
||||
<button
|
||||
onClick={(e) => {
|
||||
|
|
@ -179,7 +210,7 @@ export function Queue() {
|
|||
<RotateCcw size={14} />
|
||||
</button>
|
||||
)}
|
||||
{item.status === 'pending' && (
|
||||
{(item.status === 'pending' || item.status === 'paused') && (
|
||||
<button
|
||||
onClick={(e) => {
|
||||
e.stopPropagation();
|
||||
|
|
@ -187,7 +218,7 @@ export function Queue() {
|
|||
}}
|
||||
disabled={cancelMutation.isPending}
|
||||
title="Cancel"
|
||||
aria-label="Cancel pending item"
|
||||
aria-label="Cancel item"
|
||||
className="btn-icon"
|
||||
style={{ color: 'var(--danger)' }}
|
||||
>
|
||||
|
|
@ -289,7 +320,7 @@ export function Queue() {
|
|||
)}
|
||||
|
||||
{/* Mutation errors */}
|
||||
{(retryMutation.error || cancelMutation.error) && (
|
||||
{(retryMutation.error || cancelMutation.error || pauseMutation.error || resumeMutation.error) && (
|
||||
<div
|
||||
style={{
|
||||
padding: 'var(--space-3)',
|
||||
|
|
@ -306,7 +337,11 @@ export function Queue() {
|
|||
? retryMutation.error.message
|
||||
: cancelMutation.error instanceof Error
|
||||
? cancelMutation.error.message
|
||||
: 'Action failed'}
|
||||
: pauseMutation.error instanceof Error
|
||||
? pauseMutation.error.message
|
||||
: resumeMutation.error instanceof Error
|
||||
? resumeMutation.error.message
|
||||
: 'Action failed'}
|
||||
</div>
|
||||
)}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,11 +14,13 @@ import type { QueueStatus } from '../../types/index';
|
|||
* Queue management route plugin.
|
||||
*
|
||||
* Registers:
|
||||
* GET /api/v1/queue — list queue items (optional ?status= filter)
|
||||
* GET /api/v1/queue/:id — get a single queue item
|
||||
* POST /api/v1/queue — enqueue a content item for download
|
||||
* DELETE /api/v1/queue/:id — cancel a queue item
|
||||
* POST /api/v1/queue/:id/retry — retry a failed queue item
|
||||
* GET /api/v1/queue — list queue items (optional ?status= filter)
|
||||
* GET /api/v1/queue/:id — get a single queue item
|
||||
* POST /api/v1/queue — enqueue a content item for download
|
||||
* DELETE /api/v1/queue/:id — cancel a queue item
|
||||
* POST /api/v1/queue/:id/retry — retry a failed queue item
|
||||
* PUT /api/v1/queue/:id/pause — pause a pending or downloading queue item
|
||||
* PUT /api/v1/queue/:id/resume — resume a paused queue item
|
||||
*/
|
||||
export async function queueRoutes(fastify: FastifyInstance): Promise<void> {
|
||||
// ── GET /api/v1/queue ──
|
||||
|
|
@ -35,6 +37,7 @@ export async function queueRoutes(fastify: FastifyInstance): Promise<void> {
|
|||
'completed',
|
||||
'failed',
|
||||
'cancelled',
|
||||
'paused',
|
||||
];
|
||||
if (!validStatuses.includes(status as QueueStatus)) {
|
||||
return _reply.status(400).send({
|
||||
|
|
@ -216,4 +219,90 @@ export async function queueRoutes(fastify: FastifyInstance): Promise<void> {
|
|||
}
|
||||
}
|
||||
);
|
||||
|
||||
// ── PUT /api/v1/queue/:id/pause ──
|
||||
|
||||
fastify.put<{ Params: { id: string } }>(
|
||||
'/api/v1/queue/:id/pause',
|
||||
async (request, reply) => {
|
||||
const id = parseIdParam(request.params.id, reply, 'Queue item ID');
|
||||
if (id === null) return;
|
||||
|
||||
if (!fastify.queueService) {
|
||||
return reply.status(503).send({
|
||||
statusCode: 503,
|
||||
error: 'Service Unavailable',
|
||||
message: 'Queue service is not initialized',
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const paused = await fastify.queueService.pauseItem(id);
|
||||
return reply.status(200).send({ success: true, data: paused });
|
||||
} catch (err: unknown) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
|
||||
if (message.includes('not found')) {
|
||||
return reply.status(404).send({
|
||||
statusCode: 404,
|
||||
error: 'Not Found',
|
||||
message,
|
||||
});
|
||||
}
|
||||
|
||||
if (message.includes('Cannot pause')) {
|
||||
return reply.status(409).send({
|
||||
statusCode: 409,
|
||||
error: 'Conflict',
|
||||
message,
|
||||
});
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// ── PUT /api/v1/queue/:id/resume ──
|
||||
|
||||
fastify.put<{ Params: { id: string } }>(
|
||||
'/api/v1/queue/:id/resume',
|
||||
async (request, reply) => {
|
||||
const id = parseIdParam(request.params.id, reply, 'Queue item ID');
|
||||
if (id === null) return;
|
||||
|
||||
if (!fastify.queueService) {
|
||||
return reply.status(503).send({
|
||||
statusCode: 503,
|
||||
error: 'Service Unavailable',
|
||||
message: 'Queue service is not initialized',
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const resumed = await fastify.queueService.resumeItem(id);
|
||||
return reply.status(200).send({ success: true, data: resumed });
|
||||
} catch (err: unknown) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
|
||||
if (message.includes('not found')) {
|
||||
return reply.status(404).send({
|
||||
statusCode: 404,
|
||||
error: 'Not Found',
|
||||
message,
|
||||
});
|
||||
}
|
||||
|
||||
if (message.includes('Cannot resume')) {
|
||||
return reply.status(409).send({
|
||||
statusCode: 409,
|
||||
error: 'Conflict',
|
||||
message,
|
||||
});
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,18 +34,21 @@ export interface QueueState {
|
|||
completed: number;
|
||||
failed: number;
|
||||
cancelled: number;
|
||||
paused: number;
|
||||
}
|
||||
|
||||
// ── QueueService ──
|
||||
|
||||
/**
|
||||
* Orchestrates the download queue lifecycle: enqueue, process with concurrency
|
||||
* control, retry on failure, cancel, and recover interrupted items on startup.
|
||||
* control, retry on failure, cancel, pause/resume, and recover interrupted items on startup.
|
||||
*
|
||||
* Status transitions:
|
||||
* pending → downloading → completed | failed
|
||||
* failed → pending (retry) or failed (max attempts exhausted)
|
||||
* pending | failed → cancelled
|
||||
* pending | downloading → paused
|
||||
* paused → pending (resume)
|
||||
*
|
||||
* Concurrency is managed via an in-memory counter — Node's single-threaded
|
||||
* event loop ensures processNext() is not re-entrant within a single tick.
|
||||
|
|
@ -56,6 +59,8 @@ export class QueueService {
|
|||
private concurrency: number;
|
||||
private readonly onDownloadComplete?: (event: NotificationEvent) => void;
|
||||
private readonly onDownloadFailed?: (event: NotificationEvent) => void;
|
||||
/** Maps queueItemId → AbortController for in-flight downloads (used by pause to cancel yt-dlp). */
|
||||
private readonly activeAbortControllers = new Map<number, AbortController>();
|
||||
|
||||
constructor(
|
||||
private readonly db: Db,
|
||||
|
|
@ -231,6 +236,76 @@ export class QueueService {
|
|||
return updated!;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pause a queue item. Pending items are set to 'paused' immediately.
|
||||
* Downloading items have their yt-dlp process killed and are set to 'paused'.
|
||||
*
|
||||
* @throws Error if item not found or not in a pausable status.
|
||||
*/
|
||||
async pauseItem(queueItemId: number): Promise<QueueItem> {
|
||||
const item = await getQueueItemById(this.db, queueItemId);
|
||||
if (!item) {
|
||||
throw new Error(`Queue item ${queueItemId} not found`);
|
||||
}
|
||||
|
||||
const pausable: QueueStatus[] = ['pending', 'downloading'];
|
||||
if (!pausable.includes(item.status)) {
|
||||
throw new Error(
|
||||
`Cannot pause queue item ${queueItemId} — status is '${item.status}', must be 'pending' or 'downloading'`
|
||||
);
|
||||
}
|
||||
|
||||
// If downloading, abort the yt-dlp process
|
||||
if (item.status === 'downloading') {
|
||||
const controller = this.activeAbortControllers.get(queueItemId);
|
||||
if (controller) {
|
||||
controller.abort('paused');
|
||||
}
|
||||
}
|
||||
|
||||
const updated = await updateQueueItemStatus(this.db, queueItemId, 'paused');
|
||||
|
||||
console.log(
|
||||
`[queue] pause queueId=${queueItemId} contentId=${item.contentItemId} previousStatus=${item.status} status=paused`
|
||||
);
|
||||
|
||||
return updated!;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume a paused queue item. Sets it back to 'pending' and triggers processing.
|
||||
*
|
||||
* @throws Error if item not found or not paused.
|
||||
*/
|
||||
async resumeItem(queueItemId: number): Promise<QueueItem> {
|
||||
const item = await getQueueItemById(this.db, queueItemId);
|
||||
if (!item) {
|
||||
throw new Error(`Queue item ${queueItemId} not found`);
|
||||
}
|
||||
|
||||
if (item.status !== 'paused') {
|
||||
throw new Error(
|
||||
`Cannot resume queue item ${queueItemId} — status is '${item.status}', expected 'paused'`
|
||||
);
|
||||
}
|
||||
|
||||
const updated = await updateQueueItemStatus(this.db, queueItemId, 'pending', {
|
||||
error: null,
|
||||
startedAt: null,
|
||||
});
|
||||
|
||||
// Reset content status to queued
|
||||
await updateContentItem(this.db, item.contentItemId, { status: 'queued' });
|
||||
|
||||
console.log(
|
||||
`[queue] resume queueId=${queueItemId} contentId=${item.contentItemId} status=pending`
|
||||
);
|
||||
|
||||
this.processNext();
|
||||
|
||||
return updated!;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover items that were stuck in 'downloading' status after a crash/restart.
|
||||
* Resets them to 'pending' so they'll be picked up again.
|
||||
|
|
@ -298,6 +373,8 @@ export class QueueService {
|
|||
*/
|
||||
private async processItem(queueItem: QueueItem): Promise<void> {
|
||||
const logPrefix = `[queue] process queueId=${queueItem.id} contentId=${queueItem.contentItemId}`;
|
||||
const abortController = new AbortController();
|
||||
this.activeAbortControllers.set(queueItem.id, abortController);
|
||||
|
||||
try {
|
||||
// Transition to downloading
|
||||
|
|
@ -378,10 +455,15 @@ export class QueueService {
|
|||
}
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
const errorMsg = err instanceof Error ? err.message : String(err);
|
||||
const newAttempts = queueItem.attempts + 1;
|
||||
const exhausted = newAttempts >= queueItem.maxAttempts;
|
||||
const newStatus: QueueStatus = exhausted ? 'failed' : 'pending';
|
||||
// If aborted due to pause, don't treat as a failure — status is already set by pauseItem
|
||||
if (abortController.signal.aborted) {
|
||||
console.log(`${logPrefix} aborted (paused)`);
|
||||
// Don't increment attempts or record failure — the item was paused by the user
|
||||
} else {
|
||||
const errorMsg = err instanceof Error ? err.message : String(err);
|
||||
const newAttempts = queueItem.attempts + 1;
|
||||
const exhausted = newAttempts >= queueItem.maxAttempts;
|
||||
const newStatus: QueueStatus = exhausted ? 'failed' : 'pending';
|
||||
|
||||
await updateQueueItemStatus(this.db, queueItem.id, newStatus, {
|
||||
attempts: newAttempts,
|
||||
|
|
@ -441,8 +523,10 @@ export class QueueService {
|
|||
`[queue] notification callback error: ${notifyErr instanceof Error ? notifyErr.message : String(notifyErr)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.activeAbortControllers.delete(queueItem.id);
|
||||
this.activeCount--;
|
||||
this.processNext();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ export const QueueStatus = {
|
|||
Completed: 'completed',
|
||||
Failed: 'failed',
|
||||
Cancelled: 'cancelled',
|
||||
Paused: 'paused',
|
||||
} as const;
|
||||
export type QueueStatus = (typeof QueueStatus)[keyof typeof QueueStatus];
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue