Back to list
FortiumPartners

using-celery

by FortiumPartners

Ensemble Plugin Ecosystem - Modular Claude Code plugins for AI-augmented development workflows

0🍴 1📅 Jan 22, 2026

SKILL.md


name: using-celery description: Celery 5.3+ distributed task queue with Beat scheduler, Redis/RabbitMQ brokers, workflow patterns, and FastAPI integration. Use for background jobs, periodic tasks, and async processing.

Celery & Beat Development Skill

Quick Reference

Celery 5.3+ distributed task queue with Beat scheduler for Python applications. Background job processing, periodic scheduling, workflow patterns, and FastAPI integration.


Table of Contents

  1. Quick Reference
  2. When to Use
  3. Project Structure
  4. Celery Application Setup
  5. Task Definitions
  6. Queue Routing
  7. Beat Scheduler
  8. Workflow Patterns
  9. FastAPI Integration
  10. Testing
  11. CLI Commands
  12. Essential Configuration
  13. Anti-Patterns to Avoid
  14. Integration Checklist
  15. See Also

When to Use

This skill is loaded by backend-developer when:

  • celery or celery[redis] in dependencies
  • celeryconfig.py or celery.py present
  • Beat schedule configuration detected
  • User mentions "background tasks", "job queue", or "periodic tasks"
  • Task decorator patterns (@app.task) found

Minimum Detection Confidence: 0.8 (80%)

Prerequisite: Python skill should be loaded for core patterns.


Project Structure

my_project/
├── src/my_app/
│   ├── celery_app.py        # Celery application
│   ├── config.py            # Settings
│   ├── tasks/               # Task modules
│   │   ├── email.py
│   │   ├── reports.py
│   │   └── cleanup.py
│   └── workers/queues.py    # Queue definitions
├── tests/
│   ├── conftest.py          # Celery fixtures
│   └── tasks/
├── docker-compose.yml       # Redis + workers
└── pyproject.toml

Celery Application Setup

from celery import Celery
from kombu import Queue
from .config import settings

app = Celery(
    "my_app",
    broker=settings.celery_broker_url,
    backend=settings.celery_result_backend,
    include=["my_app.tasks.email", "my_app.tasks.reports"],
)

app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,
    task_soft_time_limit=240,
    worker_prefetch_multiplier=1,
    task_acks_late=True,
    task_reject_on_worker_lost=True,
)

# Queue routing
app.conf.task_queues = (
    Queue("default", routing_key="default"),
    Queue("high_priority", routing_key="high"),
    Queue("low_priority", routing_key="low"),
)

Task Definitions

Basic Task

from celery import shared_task
from my_app.celery_app import app

@shared_task(name="tasks.add")
def add(x: int, y: int) -> int:
    return x + y

@app.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str) -> dict:
    task_id = self.request.id
    return {"task_id": task_id, "status": "sent"}

Task with Retry Logic

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(httpx.TimeoutException, httpx.ConnectError),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def call_external_api(self, endpoint: str, payload: dict) -> dict:
    with httpx.Client(timeout=30) as client:
        response = client.post(endpoint, json=payload)
        response.raise_for_status()
        return response.json()

Task with Rate Limiting

@shared_task(bind=True, rate_limit="10/m", name="tasks.send_sms")
def send_sms(self, phone: str, message: str) -> dict:
    return sms_service.send(phone, message)

Task with Time Limits

from celery.exceptions import SoftTimeLimitExceeded

@shared_task(bind=True, soft_time_limit=300, time_limit=360)
def generate_report(self, report_id: int) -> dict:
    try:
        return build_report(report_id)
    except SoftTimeLimitExceeded:
        partial_save(report_id)
        raise

See REFERENCE.md for manual retry, progress tracking, and custom retry backoff patterns.


Queue Routing

Route by Task

app.conf.task_routes = {
    "tasks.send_email": {"queue": "high_priority"},
    "tasks.generate_report": {"queue": "low_priority"},
    "tasks.process_payment": {"queue": "payments"},
    "tasks.*": {"queue": "default"},
}

Route Dynamically

process_order.apply_async(args=[123], queue="high_priority")
process_order.apply_async(args=[456], routing_key="payments")

Worker Queue Assignment

# High priority only
celery -A my_app.celery_app worker -Q high_priority -c 4

# Multiple queues
celery -A my_app.celery_app worker -Q default,low_priority -c 2

Beat Scheduler

Basic Schedule

from celery.schedules import crontab

app.conf.beat_schedule = {
    "health-check": {
        "task": "tasks.health_check",
        "schedule": 30.0,  # Every 30 seconds
    },
    "daily-report": {
        "task": "tasks.generate_daily_report",
        "schedule": crontab(hour=2, minute=0),  # Daily at 2 AM
    },
    "weekly-summary": {
        "task": "tasks.send_weekly_summary",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),  # Monday 9 AM
    },
}

Crontab Quick Reference

PatternExpression
Every minutecrontab()
Every 15 mincrontab(minute="*/15")
Daily midnightcrontab(hour=0, minute=0)
Weekdays 9 AMcrontab(hour=9, minute=0, day_of_week="1-5")
Monthly 1stcrontab(hour=0, minute=0, day_of_month=1)

Running Beat

# Standalone
celery -A my_app.celery_app beat --loglevel=info

# With worker (dev only)
celery -A my_app.celery_app worker --beat --loglevel=info

See REFERENCE.md for dynamic database schedules and advanced crontab patterns.


Workflow Patterns

Chain (Sequential)

from celery import chain

workflow = chain(
    fetch_data.s(url),
    process_data.s(),
    save_results.s(destination),
)
result = workflow.apply_async()

Group (Parallel)

from celery import group

workflow = group(process_image.s(id) for id in image_ids)
result = workflow.apply_async()
all_results = result.get()

Chord (Parallel + Callback)

from celery import chord

workflow = chord(
    (process_chunk.s(chunk) for chunk in chunks),
    aggregate_results.s()
)
result = workflow.apply_async()

See REFERENCE.md for complex multi-step workflows and error handling in chains.


FastAPI Integration

Triggering Tasks

from fastapi import APIRouter
from celery.result import AsyncResult
from .celery_app import celery_app
from .tasks.email import send_email

router = APIRouter()

@router.post("/emails/send")
async def queue_email(to: str, subject: str, body: str) -> dict:
    task = send_email.delay(to, subject, body)
    return {"task_id": task.id, "status": "queued"}

@router.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str) -> dict:
    result = AsyncResult(task_id, app=celery_app)
    response = {"task_id": task_id, "status": result.status, "ready": result.ready()}
    if result.ready():
        response["result"] = result.get() if result.successful() else str(result.result)
    return response

Progress Tracking

@shared_task(bind=True)
def process_large_file(self, file_id: int) -> dict:
    file_data = load_file(file_id)
    for i, chunk in enumerate(file_data):
        process_chunk(chunk)
        self.update_state(state="PROGRESS", meta={"current": i + 1, "total": len(file_data)})
    return {"processed": len(file_data)}

See REFERENCE.md for polling patterns, revocation, and lifespan management.


Testing

pytest Configuration

import pytest

@pytest.fixture(scope="session")
def celery_config():
    return {
        "broker_url": "memory://",
        "result_backend": "cache+memory://",
        "task_always_eager": True,
        "task_eager_propagates": True,
    }

Unit Testing (Eager Mode)

def test_send_email_success(celery_app):
    with patch("my_app.tasks.email.email_client") as mock:
        mock.send.return_value = {"id": "msg_123"}
        result = send_email.delay("user@example.com", "Test", "Hello")
        assert result.successful()
        assert result.get()["status"] == "sent"

See REFERENCE.md for integration tests with real workers and Beat schedule testing.


CLI Commands

Worker Management

celery -A my_app.celery_app worker --loglevel=info
celery -A my_app.celery_app worker -c 4 -Q high,default
celery -A my_app.celery_app worker --pool=gevent -c 100
celery -A my_app.celery_app worker --autoscale=10,3

Inspection

celery -A my_app.celery_app inspect active
celery -A my_app.celery_app inspect registered
celery -A my_app.celery_app inspect scheduled
celery -A my_app.celery_app inspect ping

Control

celery -A my_app.celery_app control shutdown
celery -A my_app.celery_app purge
celery -A my_app.celery_app control revoke <task_id>
celery -A my_app.celery_app control rate_limit tasks.send_email 10/m

Essential Configuration

# Broker & backend
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
result_expires = 3600

# Serialization
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]

# Execution
task_time_limit = 300
task_soft_time_limit = 240
task_acks_late = True
task_reject_on_worker_lost = True

# Worker
worker_prefetch_multiplier = 1
worker_concurrency = 4

See REFERENCE.md for full configuration reference and environment-based settings.


Anti-Patterns to Avoid

Anti-PatternProblemSolution
Blocking in taskstime.sleep() blocks workerUse countdown or async
Large argumentsMegabytes through brokerPass ID, fetch in task
Not idempotentDuplicate charges on retryUse idempotency keys
Ignoring resultsMemory leaks in backendSet ignore_result=True or configure result_expires
DB in task moduleImport-time connectionsImport inside task function

See REFERENCE.md for detailed examples and solutions.


Integration Checklist

  • Celery app configured with broker/backend
  • Tasks defined with proper retry logic
  • Queues defined and routed appropriately
  • Beat schedule configured for periodic tasks
  • Tests use eager mode with memory broker
  • Health check endpoint monitors workers
  • Docker Compose includes Redis + workers

See Also

Score

Total Score

65/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

0/10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon