スキル一覧に戻る
yonatangross

celery-advanced

by yonatangross

The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.

29🍴 4📅 2026年1月23日
GitHubで見るManusで実行

SKILL.md


name: celery-advanced description: Advanced Celery patterns including canvas workflows, priority queues, rate limiting, multi-queue routing, and production monitoring. Use when implementing complex task orchestration, task prioritization, or enterprise-grade background processing. context: fork agent: python-performance-engineer version: 1.0.0 tags: [celery, canvas, workflow, priority-queue, rate-limiting, task-routing, flower, 2026] author: OrchestKit user-invocable: false

Advanced Celery Patterns

Enterprise-grade task orchestration beyond basic background jobs.

Overview

  • Complex multi-step task workflows (ETL pipelines, order processing)
  • Priority-based task processing (premium vs standard users)
  • Rate-limited external API calls (API quotas, throttling)
  • Multi-queue routing (dedicated workers per task type)
  • Production monitoring and observability
  • Task result aggregation and fan-out patterns

Canvas Workflows

Signatures (Task Invocation)

from celery import signature, chain, group, chord

# Create a reusable task signature
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})

# Immutable signature (won't receive results from previous task)
sig = process_order.si(order_id)

# Partial signature (curry arguments)
partial_sig = send_email.s(subject="Order Update")
# Later: partial_sig.delay(to="user@example.com", body="...")

Chains (Sequential Execution)

from celery import chain

# Tasks execute sequentially, passing results
workflow = chain(
    extract_data.s(source_id),      # Returns raw_data
    transform_data.s(),              # Receives raw_data, returns clean_data
    load_data.s(destination_id),     # Receives clean_data
)
result = workflow.apply_async()

# Access intermediate results
chain_result = result.get()  # Final result
parent_result = result.parent.get()  # Previous task result

# Error handling in chains
@celery_app.task(bind=True)
def transform_data(self, raw_data):
    try:
        return do_transform(raw_data)
    except TransformError as exc:
        # Chain stops here, no subsequent tasks run
        raise self.retry(exc=exc, countdown=60)

Groups (Parallel Execution)

from celery import group

# Execute tasks in parallel
parallel = group(
    process_chunk.s(chunk) for chunk in chunks
)
group_result = parallel.apply_async()

# Wait for all to complete
results = group_result.get()  # List of results

# Check completion status
group_result.ready()      # All completed?
group_result.successful() # All succeeded?
group_result.failed()     # Any failed?

# Iterate as they complete
for result in group_result:
    if result.ready():
        print(f"Completed: {result.get()}")

Chords (Parallel + Callback)

from celery import chord

# Parallel execution with callback when all complete
workflow = chord(
    [process_chunk.s(chunk) for chunk in chunks],
    aggregate_results.s()  # Receives list of all results
)
result = workflow.apply_async()

# Chord with header and body
header = group(fetch_data.s(url) for url in urls)
body = combine_data.s()
workflow = chord(header, body)

# Error handling: if any header task fails, body won't run
@celery_app.task(bind=True)
def aggregate_results(self, results):
    # results = [result1, result2, ...]
    return sum(results)

Map and Starmap

# Map: apply same task to each item
workflow = process_item.map([item1, item2, item3])

# Starmap: unpack args for each call
workflow = send_email.starmap([
    ("user1@example.com", "Subject 1"),
    ("user2@example.com", "Subject 2"),
])

# Chunks: split large list into batches
workflow = process_item.chunks(items, batch_size=100)

Priority Queues

Queue Configuration

# celery_config.py
from kombu import Queue

celery_app.conf.task_queues = (
    Queue("high", routing_key="high"),
    Queue("default", routing_key="default"),
    Queue("low", routing_key="low"),
)

celery_app.conf.task_default_queue = "default"
celery_app.conf.task_default_routing_key = "default"

# Priority within queue (requires Redis 5+)
celery_app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),  # 0-9 priority levels
    "sep": ":",
    "queue_order_strategy": "priority",
}

Task Routing

# Route by task name
celery_app.conf.task_routes = {
    "tasks.critical_task": {"queue": "high"},
    "tasks.bulk_*": {"queue": "low"},
    "tasks.default_*": {"queue": "default"},
}

# Route dynamically at call time
critical_task.apply_async(args=[data], queue="high", priority=9)
bulk_task.apply_async(args=[data], queue="low", priority=1)

# Route by task attribute
@celery_app.task(queue="high", priority=8)
def premium_user_task(user_id):
    pass

Worker Configuration

# Start workers for specific queues
celery -A app worker -Q high -c 4 --prefetch-multiplier=1
celery -A app worker -Q default -c 8
celery -A app worker -Q low -c 2 --prefetch-multiplier=4

Rate Limiting

Per-Task Rate Limits

@celery_app.task(rate_limit="100/m")  # 100 per minute
def call_external_api(endpoint):
    return requests.get(endpoint)

@celery_app.task(rate_limit="10/s")   # 10 per second
def send_notification(user_id):
    pass

@celery_app.task(rate_limit="1000/h") # 1000 per hour
def bulk_email(batch):
    pass

Dynamic Rate Limiting

from celery import current_app

# Change rate limit at runtime
current_app.control.rate_limit(
    "tasks.call_external_api",
    "50/m",  # Reduce during high load
    destination=["worker1@hostname"],
)

# Custom rate limiter with token bucket
from celery.utils.time import rate
from celery_singleton import Singleton

class RateLimitedTask(celery_app.Task):
    _rate_limit_key = "api:rate_limit"

    def __call__(self, *args, **kwargs):
        if not self._acquire_token():
            self.retry(countdown=self._get_backoff())
        return super().__call__(*args, **kwargs)

    def _acquire_token(self):
        return redis_client.set(
            self._rate_limit_key,
            "1",
            nx=True,
            ex=1  # 1 second window
        )

Multi-Queue Routing

Router Classes

class TaskRouter:
    def route_for_task(self, task, args=None, kwargs=None):
        if task.startswith("tasks.premium"):
            return {"queue": "premium", "priority": 8}
        elif task.startswith("tasks.analytics"):
            return {"queue": "analytics"}
        elif kwargs and kwargs.get("urgent"):
            return {"queue": "high"}
        return {"queue": "default"}

celery_app.conf.task_routes = (TaskRouter(),)

Content-Based Routing

@celery_app.task(bind=True)
def process_order(self, order):
    # Route based on order value
    if order["total"] > 1000:
        self.update_state(state="ROUTING", meta={"queue": "premium"})
        return chain(
            verify_inventory.s(order).set(queue="high"),
            process_payment.s().set(queue="high"),
            notify_customer.s().set(queue="notifications"),
        ).apply_async()
    else:
        return standard_workflow(order)

Production Monitoring

Flower Dashboard

# Install and run Flower
pip install flower
celery -A app flower --port=5555 --basic_auth=admin:password

# With persistent storage
celery -A app flower --persistent=True --db=flower.db

Custom Events

from celery import signals

@signals.task_prerun.connect
def on_task_start(sender, task_id, task, args, kwargs, **_):
    metrics.counter("task_started", tags={"task": task.name})

@signals.task_postrun.connect
def on_task_complete(sender, task_id, task, args, kwargs, retval, state, **_):
    metrics.counter("task_completed", tags={"task": task.name, "state": state})

@signals.task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **_):
    alerting.send_alert(
        f"Task {sender.name} failed: {exception}",
        severity="error"
    )

Health Checks

from celery import current_app

def celery_health_check():
    try:
        # Check broker connection
        conn = current_app.connection()
        conn.ensure_connection(max_retries=3)

        # Check workers responding
        inspector = current_app.control.inspect()
        active_workers = inspector.active()

        if not active_workers:
            return {"status": "unhealthy", "reason": "No active workers"}

        return {
            "status": "healthy",
            "workers": list(active_workers.keys()),
            "active_tasks": sum(len(tasks) for tasks in active_workers.values()),
        }
    except Exception as e:
        return {"status": "unhealthy", "reason": str(e)}

Custom Task States

from celery import states

# Define custom states
VALIDATING = "VALIDATING"
PROCESSING = "PROCESSING"
UPLOADING = "UPLOADING"

@celery_app.task(bind=True)
def long_running_task(self, data):
    self.update_state(state=VALIDATING, meta={"step": 1, "total": 3})
    validate(data)

    self.update_state(state=PROCESSING, meta={"step": 2, "total": 3})
    result = process(data)

    self.update_state(state=UPLOADING, meta={"step": 3, "total": 3})
    upload(result)

    return {"status": "complete", "url": result.url}

# Query task progress
from celery.result import AsyncResult

result = AsyncResult(task_id)
if result.state == PROCESSING:
    print(f"Step {result.info['step']}/{result.info['total']}")

Base Tasks and Inheritance

from celery import Task

class DatabaseTask(Task):
    """Base task with database session management."""
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = create_session()
        return self._db

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self._db:
            self._db.close()
            self._db = None

class RetryableTask(Task):
    """Base task with exponential backoff retry."""
    autoretry_for = (ConnectionError, TimeoutError)
    max_retries = 5
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True

@celery_app.task(base=DatabaseTask)
def query_database(query):
    return query_database.db.execute(query)

@celery_app.task(base=RetryableTask)
def call_flaky_api(endpoint):
    return requests.get(endpoint, timeout=30)

Key Decisions

DecisionRecommendation
Workflow typeChain for sequential, Group for parallel, Chord for fan-in
Priority queues3 queues (high/default/low) for most use cases
Rate limitingPer-task rate_limit for simple, token bucket for complex
MonitoringFlower + custom signals for production
Task routingContent-based router for dynamic routing needs
Worker scalingSeparate workers per queue, autoscale with HPA
Error handlingBase task with retry + dead letter queue

Anti-Patterns (FORBIDDEN)

# NEVER block on results in tasks
@celery_app.task
def bad_task():
    result = other_task.delay()
    return result.get()  # Blocks worker, causes deadlock!

# NEVER use synchronous I/O without timeout
requests.get(url)  # Can hang forever

# NEVER ignore task acknowledgment
celery_app.conf.task_acks_late = False  # Default loses tasks on crash

# NEVER skip idempotency for retried tasks
@celery_app.task(max_retries=3)
def create_order(order):
    Order.create(order)  # Creates duplicates on retry!

# ALWAYS use immutable signatures in chords
chord([task.s(x) for x in items], callback.si())  # si() prevents arg pollution

References

For detailed implementation patterns, see:

  • references/canvas-workflows.md - Deep dive on chain/group/chord with error handling
  • references/priority-queue-setup.md - Redis priority queue configuration
  • references/rate-limiting-patterns.md - Per-task and dynamic rate limiting
  • references/celery-beat-scheduling.md - Periodic task configuration

Templates

Production-ready code templates:

  • scripts/celery-config-template.py - Complete production Celery configuration
  • scripts/canvas-workflow-template.py - ETL pipeline using canvas patterns
  • scripts/priority-worker-template.py - Multi-queue worker with per-user rate limiting

Checklists

  • checklists/celery-production-checklist.md - Production deployment verification

Examples

  • examples/order-processing-pipeline.md - Real-world e-commerce order processing
  • background-jobs - Basic Celery and ARQ patterns
  • message-queues - RabbitMQ/Kafka integration
  • resilience-patterns - Circuit breakers, retries
  • observability-monitoring - Metrics and alerting

Capability Details

canvas-workflows

Keywords: chain, group, chord, signature, canvas, workflow Solves:

  • Complex multi-step task pipelines
  • Parallel task execution with aggregation
  • Sequential task dependencies

priority-queues

Keywords: priority, queue, routing, high priority, low priority Solves:

  • Premium user task prioritization
  • Urgent vs batch task handling
  • Multi-queue worker deployment

rate-limiting

Keywords: rate limit, throttle, quota, api limit Solves:

  • External API rate limiting
  • Per-task execution limits
  • Dynamic rate adjustment

task-monitoring

Keywords: flower, monitoring, health check, task state Solves:

  • Production task monitoring
  • Worker health checks
  • Custom task state tracking

スコア

総合スコア

75/100

リポジトリの品質指標に基づく評価

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

+10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

レビュー

💬

レビュー機能は近日公開予定です