← スキル一覧に戻る

rate-limiting
by yonatangross
The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.
⭐ 29🍴 4📅 2026年1月23日
SKILL.md
name: rate-limiting description: API rate limiting with token bucket, sliding window, and Redis distributed patterns. Use when implementing rate limits, throttling requests, handling 429 Too Many Requests, protecting against API abuse, or configuring SlowAPI with Redis. context: fork agent: backend-system-architect version: 1.0.0 tags: [rate-limiting, redis, token-bucket, fastapi, security, 2026] author: OrchestKit user-invocable: false
Rate Limiting Patterns
Protect APIs with distributed rate limiting using Redis and modern algorithms.
Overview
- Protecting public APIs from abuse
- Implementing tiered rate limits (free/pro/enterprise)
- Scaling rate limiting across multiple instances
- Preventing brute force attacks on auth endpoints
- Managing third-party API consumption
Algorithm Selection
| Algorithm | Use Case | Burst Handling |
|---|---|---|
| Token Bucket | General API, allows bursts | Excellent |
| Sliding Window | Precise, no burst spikes | Good |
| Leaky Bucket | Steady rate, queue excess | None |
| Fixed Window | Simple, some edge issues | Moderate |
SlowAPI + Redis (FastAPI)
Basic Setup
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.middleware import SlowAPIMiddleware
limiter = Limiter(
key_func=get_remote_address,
storage_uri="redis://localhost:6379",
strategy="moving-window", # sliding window
)
app = FastAPI()
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
Endpoint Limits
from slowapi import Limiter
@router.post("/api/v1/auth/login")
@limiter.limit("10/minute") # Strict for auth
async def login(request: Request, credentials: LoginRequest):
...
@router.get("/api/v1/analyses")
@limiter.limit("100/minute") # Normal for reads
async def list_analyses(request: Request):
...
@router.post("/api/v1/analyses")
@limiter.limit("20/minute") # Moderate for writes
async def create_analysis(request: Request, data: AnalysisCreate):
...
User-Based Limits
def get_user_identifier(request: Request) -> str:
"""Rate limit by user ID if authenticated, else IP."""
if hasattr(request.state, "user"):
return f"user:{request.state.user.id}"
return f"ip:{get_remote_address(request)}"
limiter = Limiter(key_func=get_user_identifier)
Token Bucket with Redis (Custom)
import redis.asyncio as redis
from datetime import datetime, timezone
class TokenBucketLimiter:
def __init__(
self,
redis_client: redis.Redis,
capacity: int = 100,
refill_rate: float = 10.0, # tokens per second
):
self.redis = redis_client
self.capacity = capacity
self.refill_rate = refill_rate
async def is_allowed(self, key: str, tokens: int = 1) -> bool:
"""Check if request is allowed, consume tokens atomically."""
lua_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local tokens_requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local current_tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
-- Calculate refill
local elapsed = now - last_update
local refill = elapsed * refill_rate
current_tokens = math.min(capacity, current_tokens + refill)
-- Check and consume
if current_tokens >= tokens_requested then
current_tokens = current_tokens - tokens_requested
redis.call('HMSET', key, 'tokens', current_tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return 1
else
return 0
end
"""
now = datetime.now(timezone.utc).timestamp()
result = await self.redis.eval(
lua_script, 1, key,
self.capacity, self.refill_rate, tokens, now
)
return result == 1
Sliding Window Counter
class SlidingWindowLimiter:
def __init__(self, redis_client: redis.Redis, window_seconds: int = 60):
self.redis = redis_client
self.window = window_seconds
async def is_allowed(self, key: str, limit: int) -> tuple[bool, int]:
"""Returns (allowed, remaining)."""
now = datetime.now(timezone.utc).timestamp()
window_start = now - self.window
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current window
pipe.zcard(key)
# Add this request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, self.window * 2)
results = await pipe.execute()
current_count = results[1]
if current_count < limit:
return True, limit - current_count - 1
return False, 0
Tiered Rate Limits
from enum import Enum
class UserTier(Enum):
FREE = "free"
PRO = "pro"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {"requests": 100, "window": 3600}, # 100/hour
UserTier.PRO: {"requests": 1000, "window": 3600}, # 1000/hour
UserTier.ENTERPRISE: {"requests": 10000, "window": 3600}, # 10000/hour
}
async def get_rate_limit(user: User) -> str:
limits = TIER_LIMITS[user.tier]
return f"{limits['requests']}/{limits['window']}seconds"
@router.get("/api/v1/data")
@limiter.limit(get_rate_limit)
async def get_data(request: Request, user: User = Depends(get_current_user)):
...
Response Headers (RFC 6585)
from fastapi import Response
async def add_rate_limit_headers(
response: Response,
limit: int,
remaining: int,
reset_at: datetime,
):
response.headers["X-RateLimit-Limit"] = str(limit)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(int(reset_at.timestamp()))
response.headers["Retry-After"] = str(int((reset_at - datetime.now(timezone.utc)).seconds))
Error Response (429)
from fastapi import HTTPException
from fastapi.responses import JSONResponse
def rate_limit_exceeded_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=429,
content={
"type": "https://api.example.com/errors/rate-limit-exceeded",
"title": "Too Many Requests",
"status": 429,
"detail": "Rate limit exceeded. Please retry after the reset time.",
"instance": str(request.url),
},
headers={
"Retry-After": "60",
"X-RateLimit-Limit": "100",
"X-RateLimit-Remaining": "0",
}
)
Anti-Patterns (FORBIDDEN)
# NEVER use in-memory counters in distributed systems
request_counts = {} # Lost on restart, not shared across instances
# NEVER skip rate limiting on internal APIs (defense in depth)
@router.get("/internal/admin")
async def admin_endpoint(): # No rate limit = vulnerable
...
# NEVER use fixed window without considering edge spikes
# A user can hit 100 at 0:59 and 100 at 1:01 = 200 in 2 seconds
Key Decisions
| Decision | Recommendation |
|---|---|
| Storage | Redis (distributed, atomic) |
| Algorithm | Token bucket for most APIs |
| Key | User ID if auth, else IP + fingerprint |
| Auth endpoints | 10/min (strict) |
| Read endpoints | 100-1000/min (based on tier) |
| Write endpoints | 20-100/min (moderate) |
Related Skills
auth-patterns- Authentication integrationresilience-patterns- Circuit breakersobservability-monitoring- Rate limit metrics
Capability Details
token-bucket
Keywords: token bucket, rate limit, burst, capacity Solves:
- How do I implement token bucket rate limiting?
- Allow bursts while limiting rate
sliding-window
Keywords: sliding window, moving window, rate limit Solves:
- How to implement precise rate limiting?
- Avoid fixed window edge cases
slowapi-redis
Keywords: slowapi, fastapi rate limit, redis limiter Solves:
- How to add rate limiting to FastAPI?
- Distributed rate limiting
tiered-limits
Keywords: tiered, user tier, free pro enterprise Solves:
- Different rate limits per subscription tier
- User-based rate limiting
スコア
総合スコア
75/100
リポジトリの品質指標に基づく評価
✓SKILL.md
SKILL.mdファイルが含まれている
+20
✓LICENSE
ライセンスが設定されている
+10
✓説明文
100文字以上の説明がある
+10
○人気
GitHub Stars 100以上
0/15
✓最近の活動
1ヶ月以内に更新
+10
○フォーク
10回以上フォークされている
0/5
✓Issue管理
オープンIssueが50未満
+5
✓言語
プログラミング言語が設定されている
+5
✓タグ
1つ以上のタグが設定されている
+5
レビュー
💬
レビュー機能は近日公開予定です
