
asyncio-advanced
by yonatangross
The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.
SKILL.md
name: asyncio-advanced description: Python asyncio patterns with TaskGroup, structured concurrency, and modern 3.11+ features. Use when implementing concurrent operations, async context managers, or high-performance async services. context: fork agent: backend-system-architect version: 1.0.0 tags: [asyncio, python, concurrency, taskgroup, structured-concurrency, 2026] allowed-tools: [Read, Write, Edit, Bash, Grep, Glob] author: OrchestKit user-invocable: false
Asyncio Advanced Patterns (2026)
Modern Python asyncio patterns using structured concurrency, TaskGroup, and Python 3.11+ features.
Overview
- Implementing concurrent HTTP requests or database queries
- Building async services with proper cancellation handling
- Managing multiple concurrent tasks with error propagation
- Rate limiting async operations with semaphores
- Bridging sync code to async contexts
Quick Reference
TaskGroup (Replaces gather)
import asyncio
async def fetch_user_data(user_id: str) -> dict:
"""Fetch user data concurrently - all tasks complete or all cancelled."""
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(user_id))
orders_task = tg.create_task(fetch_orders(user_id))
preferences_task = tg.create_task(fetch_preferences(user_id))
# All tasks guaranteed complete here
return {
"user": user_task.result(),
"orders": orders_task.result(),
"preferences": preferences_task.result(),
}
TaskGroup with Timeout
async def fetch_with_timeout(urls: list[str], timeout_sec: float = 30) -> list[dict]:
"""Fetch all URLs with overall timeout - structured concurrency."""
results = []
async with asyncio.timeout(timeout_sec):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]
Semaphore for Concurrency Limiting
class RateLimitedClient:
"""HTTP client with concurrency limiting."""
def __init__(self, max_concurrent: int = 10):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._session: aiohttp.ClientSession | None = None
async def fetch(self, url: str) -> dict:
async with self._semaphore: # Limit concurrent requests
async with self._session.get(url) as response:
return await response.json()
async def fetch_many(self, urls: list[str]) -> list[dict]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(self.fetch(url)) for url in urls]
return [t.result() for t in tasks]
Exception Group Handling
async def process_batch(items: list[dict]) -> tuple[list[dict], list[Exception]]:
"""Process batch, collecting both successes and failures."""
results = []
errors = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_item(item)) for item in items]
except* ValueError as eg:
# Handle specific exception types from ExceptionGroup
errors.extend(eg.exceptions)
except* Exception as eg:
errors.extend(eg.exceptions)
else:
results = [t.result() for t in tasks]
return results, errors
Sync-to-Async Bridge
import asyncio
from concurrent.futures import ThreadPoolExecutor
# For CPU-bound or blocking sync code
async def run_blocking_operation(data: bytes) -> dict:
"""Run blocking sync code in thread pool."""
return await asyncio.to_thread(cpu_intensive_parse, data)
# For sync code that needs async context
def sync_caller():
"""Call async code from sync context (not in existing loop)."""
return asyncio.run(async_main())
# For sync code within existing async context
async def wrapper_for_sync_lib():
"""Bridge sync library to async - use with care."""
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, sync_blocking_call)
return result
Cancellation Handling
async def cancellable_operation(resource_id: str) -> dict:
"""Properly handle cancellation - NEVER swallow CancelledError."""
resource = await acquire_resource(resource_id)
try:
return await process_resource(resource)
except asyncio.CancelledError:
# Clean up but RE-RAISE - this is critical!
await cleanup_resource(resource)
raise # ALWAYS re-raise CancelledError
finally:
await release_resource(resource)
Key Decisions
| Decision | 2026 Recommendation | Rationale |
|---|---|---|
| Task spawning | TaskGroup not gather() | Structured concurrency, auto-cancellation |
| Timeouts | asyncio.timeout() context manager | Composable, cancels on exit |
| Concurrency limit | asyncio.Semaphore | Prevents resource exhaustion |
| Sync bridge | asyncio.to_thread() | Clean API, manages thread pool |
| Exception handling | except* with ExceptionGroup | Handle multiple failures properly |
| Cancellation | Always re-raise CancelledError | Breaking this breaks TaskGroup/timeout |
Anti-Patterns (FORBIDDEN)
# NEVER use gather() for new code - no structured concurrency
results = await asyncio.gather(task1(), task2()) # LEGACY
# NEVER swallow CancelledError - breaks structured concurrency
except asyncio.CancelledError:
return None # BREAKS TaskGroup and timeout!
# NEVER use create_task() without TaskGroup - tasks leak
asyncio.create_task(background_work()) # Fire and forget = leaked task
# NEVER yield inside async context managers (PEP 789)
async with asyncio.timeout(10):
yield item # DANGEROUS - cancellation bugs!
# NEVER use asyncio.run() inside existing event loop
async def handler():
asyncio.run(other_async()) # CRASHES - loop already running
# NEVER block the event loop with sync calls
async def bad_handler():
time.sleep(1) # BLOCKS ALL TASKS
requests.get(url) # BLOCKS ALL TASKS
Related Skills
sqlalchemy-2-async- Async database sessions with SQLAlchemy 2.0fastapi-advanced- Async FastAPI patternsbackground-jobs- Celery/ARQ for heavy async workstreaming-api-patterns- SSE/WebSocket async patterns
Capability Details
taskgroup-patterns
Keywords: taskgroup, structured concurrency, concurrent tasks, parallel execution Solves:
- How do I run multiple async tasks concurrently?
- Replace asyncio.gather with TaskGroup
- Handle exceptions from multiple tasks
timeout-patterns
Keywords: timeout, asyncio.timeout, cancel, deadline Solves:
- How do I add timeouts to async operations?
- Timeout multiple concurrent operations
- Cancel tasks after deadline
semaphore-limiting
Keywords: semaphore, rate limit, concurrency limit, throttle Solves:
- How do I limit concurrent async operations?
- Rate limit HTTP requests
- Prevent connection pool exhaustion
exception-groups
Keywords: ExceptionGroup, except*, multiple exceptions, error handling Solves:
- How do I handle multiple task failures?
- Collect errors from concurrent operations
- Python 3.11+ exception group patterns
sync-async-bridge
Keywords: to_thread, run_in_executor, sync to async, blocking code Solves:
- How do I call sync code from async?
- Run CPU-bound code without blocking
- Bridge sync libraries to async context
スコア
総合スコア
リポジトリの品質指標に基づく評価
SKILL.mdファイルが含まれている
ライセンスが設定されている
100文字以上の説明がある
GitHub Stars 100以上
1ヶ月以内に更新
10回以上フォークされている
オープンIssueが50未満
プログラミング言語が設定されている
1つ以上のタグが設定されている
レビュー
レビュー機能は近日公開予定です
