Back to list
aiskillstore

py-async-patterns

by aiskillstore

Security-audited skills for Claude, Codex & Claude Code. One-click install, quality verified.

102🍴 3📅 Jan 23, 2026

SKILL.md


name: py-async-patterns description: Async/await patterns for FastAPI and SQLAlchemy. Use when working with async code, database sessions, concurrent operations, or debugging async issues in Python.

Python Async Patterns

Problem Statement

Async Python is powerful but error-prone. Race conditions, session leaks, and connection pool issues are common pitfalls in async codebases.


Pattern: AsyncSession Lifecycle

Problem: Session must be scoped to request. Leaking sessions causes stale data and connection exhaustion.

# ✅ CORRECT: Session scoped to request via dependency
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session
        # Session automatically closed after request

# Usage in endpoint
@router.get("/users/{user_id}")
async def get_user(
    user_id: UUID,
    session: AsyncSession = Depends(get_session),
) -> UserRead:
    result = await session.execute(select(User).where(User.id == user_id))
    return result.scalar_one()

# ❌ WRONG: Global session (stale data, connection leaks)
_global_session = None  # NEVER do this

async def get_user(user_id: UUID):
    result = await _global_session.execute(...)  # Stale, shared state

Why it matters: Each request needs isolated database state. Shared sessions see stale data and can't be safely committed.


Pattern: Concurrent vs Sequential Queries

Problem: Running independent queries sequentially wastes time. But dependent queries must be sequential.

# ✅ CORRECT: Concurrent independent queries
async def get_dashboard_data(user_id: UUID, session: AsyncSession):
    # These don't depend on each other - run in parallel
    user_result, stats_result, recent_result = await asyncio.gather(
        session.execute(select(User).where(User.id == user_id)),
        session.execute(select(UserStats).where(UserStats.user_id == user_id)),
        session.execute(
            select(Activity)
            .where(Activity.user_id == user_id)
            .order_by(Activity.created_at.desc())
            .limit(10)
        ),
    )
    
    return {
        "user": user_result.scalar_one(),
        "stats": stats_result.scalar_one_or_none(),
        "recent": recent_result.scalars().all(),
    }

# ❌ WRONG: Sequential when parallel is safe
async def get_dashboard_data_slow(user_id: UUID, session: AsyncSession):
    user = await session.execute(...)      # Wait...
    stats = await session.execute(...)     # Wait more...
    recent = await session.execute(...)    # Even more waiting
    # Total time = sum of all queries

# ✅ CORRECT: Sequential when queries depend on each other
async def get_user_with_team(user_id: UUID, session: AsyncSession):
    # Must get user first to know team_id
    user_result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = user_result.scalar_one()
    
    # Now we can query team
    team_result = await session.execute(
        select(Team).where(Team.id == user.team_id)
    )
    return user, team_result.scalar_one()

Decision framework:

Queries share data?Use
No (independent)asyncio.gather()
Yes (dependent)Sequential await

Pattern: Transaction Boundaries

Problem: Knowing when to commit, rollback, and refresh.

# ✅ CORRECT: Explicit transaction for multi-step operations
async def transfer_player(
    player_id: UUID,
    from_team_id: UUID,
    to_team_id: UUID,
    session: AsyncSession,
):
    try:
        # All operations in one transaction
        player = await session.get(Player, player_id)
        player.team_id = to_team_id
        
        from_team = await session.get(Team, from_team_id)
        from_team.player_count -= 1
        
        to_team = await session.get(Team, to_team_id)
        to_team.player_count += 1
        
        await session.commit()
    except Exception:
        await session.rollback()
        raise

# ✅ CORRECT: Using context manager
async with session.begin():
    # All operations here are in a transaction
    # Auto-commits on success, auto-rollbacks on exception
    player.team_id = to_team_id
    from_team.player_count -= 1
    to_team.player_count += 1

# ✅ CORRECT: Refresh after commit to get DB-generated values
await session.commit()
await session.refresh(new_entity)  # Get id, created_at, etc.
return new_entity

When to use what:

ScenarioPattern
Single create/updatesession.add() + commit() at request end
Multi-step operationExplicit begin() / commit() / rollback()
Need DB-generated valuesrefresh() after commit
Read-only queryNo commit needed

Pattern: Connection Pool Management

Problem: Exhausting connection pool causes requests to hang.

# This codebase uses NullPool for async - understand why
engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # No connection pooling
)

# NullPool: Each request gets new connection, closes after
# Why: Avoids issues with asyncpg + connection reuse
# Tradeoff: Slightly more connection overhead

# ✅ CORRECT: Always close sessions (handled by Depends)
async with async_session() as session:
    # Work with session
    pass  # Session closed here

# ❌ WRONG: Forgetting to close
session = async_session()
result = await session.execute(query)
# Session never closed - connection leak!

Pattern: Background Tasks

Problem: Long-running work shouldn't block the response.

from fastapi import BackgroundTasks

# ✅ CORRECT: FastAPI BackgroundTasks for request-scoped work
@router.post("/assessments/{id}/submit")
async def submit_assessment(
    id: UUID,
    session: AsyncSession = Depends(get_session),
    background_tasks: BackgroundTasks,
) -> AssessmentResult:
    # Quick work - return response
    result = await process_submission(id, session)
    
    # Slow work - do after response
    background_tasks.add_task(send_completion_email, result.user_email)
    background_tasks.add_task(update_analytics, result)
    
    return result

# ✅ CORRECT: asyncio.create_task for fire-and-forget
async def process_with_side_effect():
    result = await main_operation()
    
    # Fire and forget - don't await
    asyncio.create_task(log_to_external_service(result))
    
    return result

# ❌ WRONG: Awaiting non-critical slow operations
async def slow_endpoint():
    result = await main_operation()
    await send_email(result)           # User waits for email...
    await update_analytics(result)     # User still waiting...
    return result

When to use what:

ScenarioPattern
Post-response cleanupBackgroundTasks
Fire-and-forget loggingasyncio.create_task()
Must complete before responseDirect await

Pattern: Avoiding Deadlocks

Problem: Concurrent operations acquiring locks in different order.

# ❌ WRONG: Potential deadlock
async def transfer_both_ways():
    # Task 1: Lock A, then B
    # Task 2: Lock B, then A
    # = Deadlock if interleaved
    pass

# ✅ CORRECT: Consistent lock ordering
async def transfer_credits(
    from_id: UUID,
    to_id: UUID,
    amount: int,
    session: AsyncSession,
):
    # Always lock in consistent order (e.g., by UUID)
    first_id, second_id = sorted([from_id, to_id])
    
    # Lock in consistent order
    first = await session.get(Account, first_id, with_for_update=True)
    second = await session.get(Account, second_id, with_for_update=True)
    
    # Now safe to modify
    if from_id == first_id:
        first.balance -= amount
        second.balance += amount
    else:
        second.balance -= amount
        first.balance += amount
    
    await session.commit()

Pattern: Post-Condition Validation

Same principle as frontend - verify async operations succeeded:

# ✅ CORRECT: Validate after async operations
async def create_assessment(data: AssessmentCreate, session: AsyncSession):
    assessment = Assessment(**data.model_dump())
    session.add(assessment)
    await session.commit()
    await session.refresh(assessment)
    
    # Validate post-condition
    if assessment.id is None:
        raise RuntimeError("Assessment creation failed - no ID assigned")
    
    return assessment

# ✅ CORRECT: Validate data was actually loaded
async def get_user_or_fail(user_id: UUID, session: AsyncSession) -> User:
    result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    
    if user is None:
        raise HTTPException(404, f"User {user_id} not found")
    
    return user

Pattern: Logging Async Operations

import structlog

logger = structlog.get_logger()

async def complex_operation(user_id: UUID, session: AsyncSession):
    logger.info("complex_operation.start", user_id=str(user_id))
    
    try:
        result = await step_one(session)
        logger.debug("complex_operation.step_one_complete", result_count=len(result))
        
        await step_two(result, session)
        logger.debug("complex_operation.step_two_complete")
        
        await session.commit()
        logger.info("complex_operation.success", user_id=str(user_id))
        
    except Exception as e:
        logger.error("complex_operation.failed", 
            user_id=str(user_id), 
            error=str(e),
            step="unknown"
        )
        raise

Common Issues

IssueLikely CauseSolution
"Session is closed"Using session after request endsKeep session in request scope
Connection timeoutPool exhaustedCheck for session leaks
Stale dataShared session or missing refreshScope session to request, refresh after commit
DeadlockInconsistent lock orderingAlways acquire locks in same order
Slow endpointSequential queries that could be parallelUse asyncio.gather()

Detection Commands

# Find potential session leaks (global sessions)
grep -rn "async_session()" --include="*.py" | grep -v "async with\|Depends"

# Find sequential queries that might be parallelizable
grep -rn "await session.execute" --include="*.py" -A2 | grep -B1 "await session.execute"

# Find missing awaits
ruff check --select=RUF006  # asyncio dangling task

Score

Total Score

60/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

0/10
説明文

100文字以上の説明がある

0/10
人気

GitHub Stars 100以上

+5
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon