diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index db9d62f..e148cd6 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -18,7 +18,7 @@ from collections import defaultdict from pathlib import Path import yaml -from celery import chain as celery_chain + from pydantic import ValidationError from sqlalchemy import create_engine, func, select from sqlalchemy.orm import Session, sessionmaker @@ -1745,18 +1745,12 @@ def run_pipeline(video_id: str, trigger: str = "manual") -> str: video_id, stages_to_run, resume_from_idx, ) - # Build the Celery chain — first stage gets video_id as arg, - # subsequent stages receive it from the previous stage's return value - celery_sigs = [] - for i, stage_name in enumerate(stages_to_run): - task_func = _STAGE_TASKS[stage_name] - if i == 0: - celery_sigs.append(task_func.s(video_id, run_id=run_id)) - else: - celery_sigs.append(task_func.s(run_id=run_id)) - - if celery_sigs: - # Mark as processing before dispatching + # Run stages inline (synchronously) so each video completes fully + # before the worker picks up the next queued video. + # This replaces the previous celery_chain dispatch which caused + # interleaved execution when multiple videos were queued. + if stages_to_run: + # Mark as processing before starting session = _get_sync_session() try: video = session.execute( @@ -1767,12 +1761,26 @@ def run_pipeline(video_id: str, trigger: str = "manual") -> str: finally: session.close() - pipeline = celery_chain(*celery_sigs) - error_cb = mark_pipeline_error.s(video_id, run_id=run_id) - pipeline.apply_async(link_error=error_cb) logger.info( - "run_pipeline: dispatched %d stages for video_id=%s (run_id=%s, starting at %s)", - len(celery_sigs), video_id, run_id, stages_to_run[0], + "run_pipeline: executing %d stages inline for video_id=%s (run_id=%s, starting at %s)", + len(stages_to_run), video_id, run_id, stages_to_run[0], ) + try: + for stage_name in stages_to_run: + task_func = _STAGE_TASKS[stage_name] + # Call the task directly — runs synchronously in this worker + # process. bind=True tasks receive the task instance as self + # automatically when called this way. + task_func(video_id, run_id=run_id) + except Exception as exc: + logger.error( + "run_pipeline: stage %s failed for video_id=%s: %s", + stage_name, video_id, exc, + ) + _set_error_status(video_id, stage_name, exc) + if run_id: + _finish_run(run_id, "error", error_stage=stage_name) + raise + return video_id