pipeline: run stages inline instead of Celery chain dispatch

Each video now completes all stages (2→6) before the worker picks up the
next queued video. Previously, dispatching celery_chain for multiple videos
caused interleaved execution — nothing finished until everything went through
all stages. Now run_pipeline calls each stage function synchronously within
the same worker task, so videos complete linearly and efficiently.
This commit is contained in:
jlightner 2026-04-01 11:39:21 +00:00
parent 84e617ab64
commit 29f6e74b4f

View file

@ -18,7 +18,7 @@ from collections import defaultdict
from pathlib import Path
import yaml
from celery import chain as celery_chain
from pydantic import ValidationError
from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker
@ -1745,18 +1745,12 @@ def run_pipeline(video_id: str, trigger: str = "manual") -> str:
video_id, stages_to_run, resume_from_idx,
)
# Build the Celery chain — first stage gets video_id as arg,
# subsequent stages receive it from the previous stage's return value
celery_sigs = []
for i, stage_name in enumerate(stages_to_run):
task_func = _STAGE_TASKS[stage_name]
if i == 0:
celery_sigs.append(task_func.s(video_id, run_id=run_id))
else:
celery_sigs.append(task_func.s(run_id=run_id))
if celery_sigs:
# Mark as processing before dispatching
# Run stages inline (synchronously) so each video completes fully
# before the worker picks up the next queued video.
# This replaces the previous celery_chain dispatch which caused
# interleaved execution when multiple videos were queued.
if stages_to_run:
# Mark as processing before starting
session = _get_sync_session()
try:
video = session.execute(
@ -1767,12 +1761,26 @@ def run_pipeline(video_id: str, trigger: str = "manual") -> str:
finally:
session.close()
pipeline = celery_chain(*celery_sigs)
error_cb = mark_pipeline_error.s(video_id, run_id=run_id)
pipeline.apply_async(link_error=error_cb)
logger.info(
"run_pipeline: dispatched %d stages for video_id=%s (run_id=%s, starting at %s)",
len(celery_sigs), video_id, run_id, stages_to_run[0],
"run_pipeline: executing %d stages inline for video_id=%s (run_id=%s, starting at %s)",
len(stages_to_run), video_id, run_id, stages_to_run[0],
)
try:
for stage_name in stages_to_run:
task_func = _STAGE_TASKS[stage_name]
# Call the task directly — runs synchronously in this worker
# process. bind=True tasks receive the task instance as self
# automatically when called this way.
task_func(video_id, run_id=run_id)
except Exception as exc:
logger.error(
"run_pipeline: stage %s failed for video_id=%s: %s",
stage_name, video_id, exc,
)
_set_error_status(video_id, stage_name, exc)
if run_id:
_finish_run(run_id, "error", error_stage=stage_name)
raise
return video_id