Back to list
yonatangross

rate-limiting

by yonatangross

The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.

29🍴 4📅 Jan 23, 2026

SKILL.md


name: rate-limiting description: API rate limiting with token bucket, sliding window, and Redis distributed patterns. Use when implementing rate limits, throttling requests, handling 429 Too Many Requests, protecting against API abuse, or configuring SlowAPI with Redis. context: fork agent: backend-system-architect version: 1.0.0 tags: [rate-limiting, redis, token-bucket, fastapi, security, 2026] author: OrchestKit user-invocable: false

Rate Limiting Patterns

Protect APIs with distributed rate limiting using Redis and modern algorithms.

Overview

  • Protecting public APIs from abuse
  • Implementing tiered rate limits (free/pro/enterprise)
  • Scaling rate limiting across multiple instances
  • Preventing brute force attacks on auth endpoints
  • Managing third-party API consumption

Algorithm Selection

AlgorithmUse CaseBurst Handling
Token BucketGeneral API, allows burstsExcellent
Sliding WindowPrecise, no burst spikesGood
Leaky BucketSteady rate, queue excessNone
Fixed WindowSimple, some edge issuesModerate

SlowAPI + Redis (FastAPI)

Basic Setup

from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.middleware import SlowAPIMiddleware

limiter = Limiter(
    key_func=get_remote_address,
    storage_uri="redis://localhost:6379",
    strategy="moving-window",  # sliding window
)

app = FastAPI()
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)

Endpoint Limits

from slowapi import Limiter

@router.post("/api/v1/auth/login")
@limiter.limit("10/minute")  # Strict for auth
async def login(request: Request, credentials: LoginRequest):
    ...

@router.get("/api/v1/analyses")
@limiter.limit("100/minute")  # Normal for reads
async def list_analyses(request: Request):
    ...

@router.post("/api/v1/analyses")
@limiter.limit("20/minute")  # Moderate for writes
async def create_analysis(request: Request, data: AnalysisCreate):
    ...

User-Based Limits

def get_user_identifier(request: Request) -> str:
    """Rate limit by user ID if authenticated, else IP."""
    if hasattr(request.state, "user"):
        return f"user:{request.state.user.id}"
    return f"ip:{get_remote_address(request)}"

limiter = Limiter(key_func=get_user_identifier)

Token Bucket with Redis (Custom)

import redis.asyncio as redis
from datetime import datetime, timezone

class TokenBucketLimiter:
    def __init__(
        self,
        redis_client: redis.Redis,
        capacity: int = 100,
        refill_rate: float = 10.0,  # tokens per second
    ):
        self.redis = redis_client
        self.capacity = capacity
        self.refill_rate = refill_rate

    async def is_allowed(self, key: str, tokens: int = 1) -> bool:
        """Check if request is allowed, consume tokens atomically."""
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local tokens_requested = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
        local current_tokens = tonumber(bucket[1]) or capacity
        local last_update = tonumber(bucket[2]) or now

        -- Calculate refill
        local elapsed = now - last_update
        local refill = elapsed * refill_rate
        current_tokens = math.min(capacity, current_tokens + refill)

        -- Check and consume
        if current_tokens >= tokens_requested then
            current_tokens = current_tokens - tokens_requested
            redis.call('HMSET', key, 'tokens', current_tokens, 'last_update', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        else
            return 0
        end
        """
        now = datetime.now(timezone.utc).timestamp()
        result = await self.redis.eval(
            lua_script, 1, key,
            self.capacity, self.refill_rate, tokens, now
        )
        return result == 1

Sliding Window Counter

class SlidingWindowLimiter:
    def __init__(self, redis_client: redis.Redis, window_seconds: int = 60):
        self.redis = redis_client
        self.window = window_seconds

    async def is_allowed(self, key: str, limit: int) -> tuple[bool, int]:
        """Returns (allowed, remaining)."""
        now = datetime.now(timezone.utc).timestamp()
        window_start = now - self.window

        pipe = self.redis.pipeline()
        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)
        # Count current window
        pipe.zcard(key)
        # Add this request
        pipe.zadd(key, {str(now): now})
        # Set expiry
        pipe.expire(key, self.window * 2)

        results = await pipe.execute()
        current_count = results[1]

        if current_count < limit:
            return True, limit - current_count - 1
        return False, 0

Tiered Rate Limits

from enum import Enum

class UserTier(Enum):
    FREE = "free"
    PRO = "pro"
    ENTERPRISE = "enterprise"

TIER_LIMITS = {
    UserTier.FREE: {"requests": 100, "window": 3600},       # 100/hour
    UserTier.PRO: {"requests": 1000, "window": 3600},       # 1000/hour
    UserTier.ENTERPRISE: {"requests": 10000, "window": 3600}, # 10000/hour
}

async def get_rate_limit(user: User) -> str:
    limits = TIER_LIMITS[user.tier]
    return f"{limits['requests']}/{limits['window']}seconds"

@router.get("/api/v1/data")
@limiter.limit(get_rate_limit)
async def get_data(request: Request, user: User = Depends(get_current_user)):
    ...

Response Headers (RFC 6585)

from fastapi import Response

async def add_rate_limit_headers(
    response: Response,
    limit: int,
    remaining: int,
    reset_at: datetime,
):
    response.headers["X-RateLimit-Limit"] = str(limit)
    response.headers["X-RateLimit-Remaining"] = str(remaining)
    response.headers["X-RateLimit-Reset"] = str(int(reset_at.timestamp()))
    response.headers["Retry-After"] = str(int((reset_at - datetime.now(timezone.utc)).seconds))

Error Response (429)

from fastapi import HTTPException
from fastapi.responses import JSONResponse

def rate_limit_exceeded_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=429,
        content={
            "type": "https://api.example.com/errors/rate-limit-exceeded",
            "title": "Too Many Requests",
            "status": 429,
            "detail": "Rate limit exceeded. Please retry after the reset time.",
            "instance": str(request.url),
        },
        headers={
            "Retry-After": "60",
            "X-RateLimit-Limit": "100",
            "X-RateLimit-Remaining": "0",
        }
    )

Anti-Patterns (FORBIDDEN)

# NEVER use in-memory counters in distributed systems
request_counts = {}  # Lost on restart, not shared across instances

# NEVER skip rate limiting on internal APIs (defense in depth)
@router.get("/internal/admin")
async def admin_endpoint():  # No rate limit = vulnerable
    ...

# NEVER use fixed window without considering edge spikes
# A user can hit 100 at 0:59 and 100 at 1:01 = 200 in 2 seconds

Key Decisions

DecisionRecommendation
StorageRedis (distributed, atomic)
AlgorithmToken bucket for most APIs
KeyUser ID if auth, else IP + fingerprint
Auth endpoints10/min (strict)
Read endpoints100-1000/min (based on tier)
Write endpoints20-100/min (moderate)
  • auth-patterns - Authentication integration
  • resilience-patterns - Circuit breakers
  • observability-monitoring - Rate limit metrics

Capability Details

token-bucket

Keywords: token bucket, rate limit, burst, capacity Solves:

  • How do I implement token bucket rate limiting?
  • Allow bursts while limiting rate

sliding-window

Keywords: sliding window, moving window, rate limit Solves:

  • How to implement precise rate limiting?
  • Avoid fixed window edge cases

slowapi-redis

Keywords: slowapi, fastapi rate limit, redis limiter Solves:

  • How to add rate limiting to FastAPI?
  • Distributed rate limiting

tiered-limits

Keywords: tiered, user tier, free pro enterprise Solves:

  • Different rate limits per subscription tier
  • User-based rate limiting

Score

Total Score

75/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

+10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon