
django-celery-expert
by vintasoftware
A collection of AI Skills specialized in Django Backend development
SKILL.md
name: django-celery-expert description: Expert Django and Celery guidance for asynchronous task processing. Use when designing background tasks, configuring workers, handling retries and errors, optimizing task performance, implementing periodic tasks, or setting up production monitoring. Follows Celery best practices with Django integration patterns.
Django Celery Expert
Overview
This skill provides expert guidance for Django applications using Celery for asynchronous task processing. It covers task design patterns, worker configuration, error handling, monitoring, and production deployment strategies.
Key Capabilities:
- Task design and implementation patterns
- Worker configuration and scaling
- Error handling and retry strategies
- Periodic/scheduled task management
- Monitoring and observability
- Production deployment best practices
When to Use
Invoke this skill when you encounter these triggers:
Task Design:
- "Create a Celery task for..."
- "Move this to a background job"
- "Process this asynchronously"
- "Handle this outside the request"
Configuration & Setup:
- "Configure Celery for Django"
- "Set up task queues"
- "Configure Celery workers"
- "Set up Celery Beat for scheduling"
Error Handling:
- "Handle task failures"
- "Implement retry logic"
- "Task keeps failing"
- "Set up dead letter queue"
Performance & Scaling:
- "Scale Celery workers"
- "Optimize task throughput"
- "Tasks are too slow"
- "Handle high task volume"
Monitoring:
- "Monitor Celery tasks"
- "Set up Flower"
- "Track task progress"
- "Debug stuck tasks"
Instructions
Follow this workflow when handling Django Celery requests:
1. Analyze the Request
Identify the task type:
- Simple background task (fire-and-forget)
- Task with result tracking (need to poll for completion)
- Chained/grouped tasks (workflow orchestration)
- Periodic/scheduled tasks (cron-like behavior)
- Long-running tasks (need progress tracking)
Key questions:
- Does the caller need the result?
- Should failures be retried?
- Is idempotency required?
- What's the expected execution time?
- How critical is guaranteed execution?
2. Load Relevant Reference Documentation
Based on the task type, reference the appropriate bundled documentation:
- Django-specific patterns ->
references/django-integration.md - Task implementation ->
references/task-design-patterns.md - Configuration & setup ->
references/configuration-guide.md - Error handling & retries ->
references/error-handling.md - Periodic tasks ->
references/periodic-tasks.md - Monitoring & debugging ->
references/monitoring-observability.md - Production deployment ->
references/production-deployment.md
3. Implement Following Best Practices
Task design principles:
- Keep tasks small and focused
- Design for idempotency when possible
- Use explicit task names
- Bind tasks for access to self
- Pass serializable arguments only (IDs, not objects)
Error handling:
- Configure appropriate retry behavior
- Use exponential backoff
- Set max retry limits
- Handle specific exceptions appropriately
- Log failures with context
Performance:
- Use appropriate serializers (JSON for safety, pickle for Python objects)
- Configure prefetch limits
- Use task routing for prioritization
- Batch operations when appropriate
- Monitor memory usage
4. Validate Implementation
Before presenting the solution:
- Verify task is idempotent if retries enabled
- Check serialization of arguments
- Ensure proper error handling
- Verify monitoring/logging is in place
- Consider failure scenarios
Bundled Resources
references/ - Comprehensive Celery documentation loaded into context as needed
-
references/django-integration.md- transaction.on_commit() for safe task queuing
- Database as source of truth with recovery tasks
- Request-task correlation with django-guid
- Testing Django Celery tasks
-
references/task-design-patterns.md- Task signatures and calling patterns
- Binding and accessing task properties
- Task inheritance and base classes
- Workflow patterns (chains, groups, chords)
- Idempotency and exactly-once delivery
-
references/configuration-guide.md- Django-Celery integration setup
- Broker configuration (Redis, RabbitMQ)
- Result backend options
- Worker settings and concurrency
- Task routing and queues
-
references/error-handling.md- Retry strategies and backoff
- Exception handling patterns
- Dead letter queues
- Task rejection and requeue
- Timeout handling
-
references/periodic-tasks.md- Celery Beat configuration
- Crontab and interval schedules
- Django database scheduler
- Dynamic schedule management
- Timezone considerations
-
references/monitoring-observability.md- Flower setup and usage
- Prometheus/Grafana integration
- Task event monitoring
- Logging best practices
- Debugging stuck tasks
-
references/production-deployment.md- Worker deployment patterns
- Process supervision (systemd, supervisor)
- Containerized deployments
- Scaling strategies
- Health checks and graceful shutdown
Examples
Example 1: Basic Background Task
User Request:
"Send welcome emails in the background after user registration"
Implementation:
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
from users.models import User
try:
user = User.objects.get(id=user_id)
send_mail(
subject="Welcome!",
message=f"Hi {user.name}, welcome to our platform!",
from_email="noreply@example.com",
recipient_list=[user.email],
)
except User.DoesNotExist:
pass # User deleted, skip
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# views.py
def register(request):
user = User.objects.create(...)
send_welcome_email.delay(user.id) # Fire and forget
return redirect('dashboard')
Example 2: Task with Progress Tracking
User Request:
"Process a large CSV import with progress updates"
Implementation:
@shared_task(bind=True)
def import_csv(self, file_path, total_rows):
from myapp.models import Record
with open(file_path) as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
Record.objects.create(**row)
if i % 100 == 0:
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': total_rows}
)
return {'status': 'complete', 'processed': total_rows}
# Check progress
result = import_csv.AsyncResult(task_id)
if result.state == 'PROGRESS':
progress = result.info.get('current', 0) / result.info.get('total', 1)
Example 3: Workflow with Chains
User Request:
"Process an order: validate inventory, charge payment, then send confirmation"
Implementation:
from celery import chain
@shared_task
def validate_inventory(order_id):
# Returns order_id if valid, raises if not
order = Order.objects.get(id=order_id)
if not order.items_in_stock():
raise ValueError("Items out of stock")
return order_id
@shared_task
def charge_payment(order_id):
order = Order.objects.get(id=order_id)
order.charge()
return order_id
@shared_task
def send_confirmation(order_id):
order = Order.objects.get(id=order_id)
order.send_confirmation_email()
def process_order(order_id):
workflow = chain(
validate_inventory.s(order_id),
charge_payment.s(),
send_confirmation.s()
)
workflow.delay()
Additional Notes
Common Pitfalls:
- Passing Django model instances instead of IDs
- Not handling task idempotency with retries
- Missing timeout configuration for long tasks
- Not monitoring task queue depth
- Ignoring result backend cleanup
Django Integration:
- Use
django-celery-beatfor database-backed schedules - Use
django-celery-resultsfor storing task results in Django - Configure
CELERY_settings in Django settings.py - Use
@shared_taskfor reusable apps
Security:
- Never pass sensitive data in task arguments
- Use signed serializers if pickle is required
- Restrict Flower access in production
- Validate task arguments
スコア
総合スコア
リポジトリの品質指標に基づく評価
SKILL.mdファイルが含まれている
ライセンスが設定されている
100文字以上の説明がある
GitHub Stars 100以上
1ヶ月以内に更新
10回以上フォークされている
オープンIssueが50未満
プログラミング言語が設定されている
1つ以上のタグが設定されている
レビュー
レビュー機能は近日公開予定です


