
api-dev
by aiskillstore
Security-audited skills for Claude, Codex & Claude Code. One-click install, quality verified.
SKILL.md
name: api-dev description: Modern API development patterns for building high-performance, scalable web services. Expert in async/await patterns, REST/GraphQL APIs, middleware, error handling, rate limiting, OpenAPI documentation, testing, and production optimizations. Framework-agnostic patterns that work with Python, Node.js, Go, and other languages. license: MIT
API Development Patterns & Best Practices
This skill provides comprehensive patterns for building modern APIs in 2025, focusing on async/await patterns, performance optimization, security, testing, and production-ready configurations that work across different frameworks and languages.
When to Use This Skill
Use this skill when you need to:
- Design RESTful or GraphQL APIs
- Implement async/await patterns for high performance
- Add middleware for authentication, logging, and validation
- Handle errors gracefully with proper HTTP status codes
- Implement rate limiting and throttling
- Generate OpenAPI/Swagger documentation
- Set up comprehensive testing strategies
- Optimize API performance with caching and connection pooling
- Implement API versioning and backward compatibility
- Set up monitoring and observability
Core API Design Principles
1. Async/Await Patterns for Performance
# patterns/async_patterns.py
import asyncio
import aiohttp
import aioredis
from typing import AsyncGenerator, Optional, List, Dict, Any
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import wraps
import time
@dataclass
class RequestMetrics:
"""Request metrics for monitoring"""
duration: float
status_code: int
endpoint: str
method: str
user_id: Optional[str] = None
def with_metrics(func):
"""Decorator to add request metrics"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
status_code = 200
endpoint = func.__name__
method = kwargs.get('method', 'GET')
try:
result = await func(*args, **kwargs)
if hasattr(result, 'status_code'):
status_code = result.status_code
return result
except Exception as e:
status_code = getattr(e, 'status_code', 500)
raise
finally:
duration = time.time() - start_time
metrics = RequestMetrics(
duration=duration,
status_code=status_code,
endpoint=endpoint,
method=method
)
# Send metrics to monitoring system
await send_metrics(metrics)
return wrapper
async def send_metrics(metrics: RequestMetrics):
"""Send metrics to monitoring system"""
# Implementation depends on your monitoring system
# Example: Prometheus, Datadog, or custom analytics
pass
class AsyncAPIClient:
"""Generic async API client with connection pooling"""
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: Optional[aiohttp.ClientSession] = None
self._session_lock = asyncio.Lock()
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create session with connection pooling"""
if self._session is None or self._session.closed:
async with self._session_lock:
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30, # Connections per host
force_close=False,
enable_cleanup_closed=True
)
self._session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout
)
return self._session
@with_metrics
async def get(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Make GET request with retry logic"""
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
async with session.get(
url,
params=params,
headers=headers
) as response:
response.raise_for_status()
return await response.json()
@with_metrics
async def post(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Make POST request with retry logic"""
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
async with session.post(
url,
data=data,
json=json,
headers=headers
) as response:
response.raise_for_status()
return await response.json()
async def close(self):
"""Close the session"""
if self._session:
await self._session.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
2. Circuit Breaker Pattern
# patterns/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for fault tolerance"""
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
expected_exception: Exception = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def __call__(self, func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self._reset()
return result
except self.expected_exception as e:
self._record_failure()
raise
return wrapper
def _should_attempt_reset(self) -> bool:
return time.time() - self.last_failure_time >= self.timeout
def _record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _reset(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
# Usage example
@circuit_breaker(failure_threshold=3, timeout=30)
async def external_api_call():
"""External API call with circuit breaker"""
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json()
3. Rate Limiting with Redis
# patterns/rate_limiting.py
import asyncio
import aioredis
import time
from typing import Optional
from fastapi import HTTPException, Request
from starlette.middleware.base import BaseHTTPMiddleware
class RateLimiter:
"""Rate limiter using Redis sliding window"""
def __init__(
self,
redis_url: str,
requests_per_minute: int = 100,
window_size: int = 60
):
self.redis = None
self.redis_url = redis_url
self.requests_per_minute = requests_per_minute
self.window_size = window_size
async def initialize(self):
"""Initialize Redis connection"""
self.redis = await aioredis.from_url(self.redis_url)
async def is_allowed(
self,
key: str,
limit: Optional[int] = None,
window: Optional[int] = None
) -> bool:
"""Check if request is allowed"""
if not self.redis:
await self.initialize()
limit = limit or self.requests_per_minute
window = window or self.window_size
current_time = time.time()
# Remove old requests from the sliding window
await self.redis.zremrangebyscore(
key,
0,
current_time - window
)
# Count current requests in window
current_requests = await self.redis.zcard(key)
if current_requests >= limit:
return False
# Add current request to window
await self.redis.zadd(key, {str(current_time): current_time})
await self.redis.expire(key, window)
return True
class RateLimitMiddleware(BaseHTTPMiddleware):
"""FastAPI middleware for rate limiting"""
def __init__(
self,
app,
redis_url: str,
requests_per_minute: int = 100,
identifier_func=None
):
super().__init__(app)
self.limiter = RateLimiter(redis_url, requests_per_minute)
self.identifier_func = identifier_func or self._default_identifier
def _default_identifier(self, request: Request) -> str:
"""Default identifier function"""
# Use IP address as identifier
return request.client.host
async def dispatch(self, request: Request, call_next):
identifier = self.identifier_func(request)
key = f"rate_limit:{identifier}:{request.url.path}"
if not await self.limiter.is_allowed(key):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
return await call_next(request)
4. API Versioning Strategy
# patterns/versioning.py
from enum import Enum
from typing import Optional, Dict, Any, Type
from abc import ABC, abstractmethod
class APIVersion(str, Enum):
V1 = "v1"
V2 = "v2"
V3 = "v3"
class APIVersionStrategy(ABC):
"""Base class for API versioning strategies"""
@abstractmethod
def get_version_from_request(self, request) -> Optional[APIVersion]:
"""Extract version from request"""
pass
class HeaderVersionStrategy(APIVersionStrategy):
"""Version from header strategy"""
def __init__(self, header_name: str = "API-Version"):
self.header_name = header_name
def get_version_from_request(self, request) -> Optional[APIVersion]:
version = request.headers.get(self.header_name)
return APIVersion(version) if version in APIVersion.__members__ else None
class URLPathVersionStrategy(APIVersionStrategy):
"""Version from URL path strategy"""
def __init__(self, prefix: str = "/api"):
self.prefix = prefix
def get_version_from_request(self, request) -> Optional[APIVersion]:
path = request.url.path
if not path.startswith(self.prefix):
return None
version_part = path[len(self.prefix):].split("/")[1]
return APIVersion(version_part) if version_part in APIVersion.__members__ else None
class QueryParamVersionStrategy(APIVersionStrategy):
"""Version from query parameter strategy"""
def __init__(self, param_name: str = "version"):
self.param_name = param_name
def get_version_from_request(self, request) -> Optional[APIVersion]:
version = request.query_params.get(self.param_name)
return APIVersion(version) if version in APIVersion.__members__ else None
class CompositeVersionStrategy(APIVersionStrategy):
"""Composite version strategy that tries multiple strategies"""
def __init__(self, strategies: list[APIVersionStrategy]):
self.strategies = strategies
self.default_version = APIVersion.V1
def get_version_from_request(self, request) -> Optional[APIVersion]:
for strategy in self.strategies:
version = strategy.get_version_from_request(request)
if version:
return version
return self.default_version
# Version-specific handlers
class APIHandlerRegistry:
"""Registry for version-specific API handlers"""
def __init__(self):
self.handlers: Dict[APIVersion, Dict[str, Any]] = {
version: {} for version in APIVersion
}
def register_handler(
self,
version: APIVersion,
endpoint: str,
handler: Any
):
"""Register handler for specific version"""
if version not in self.handlers:
self.handlers[version] = {}
self.handlers[version][endpoint] = handler
def get_handler(self, version: APIVersion, endpoint: str) -> Optional[Any]:
"""Get handler for version and endpoint"""
return self.handlers.get(version, {}).get(endpoint)
# Usage example
version_strategy = CompositeVersionStrategy([
HeaderVersionStrategy(),
URLPathVersionStrategy(),
QueryParamVersionStrategy()
])
5. OpenAPI Documentation Enhancement
# patterns/openapi.py
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
class ErrorSchema(BaseModel):
"""Standard error response schema"""
error: str = Field(..., description="Error type")
message: str = Field(..., description="Error message")
details: Optional[Dict[str, Any]] = Field(
None,
description="Additional error details"
)
timestamp: datetime = Field(
default_factory=datetime.utcnow,
description="Error timestamp"
)
class PaginationLinks(BaseModel):
"""Pagination links schema"""
first: Optional[str] = Field(None, description="First page link")
last: Optional[str] = Field(None, description="Last page link")
next: Optional[str] = Field(None, description="Next page link")
prev: Optional[str] = Field(None, description="Previous page link")
class PaginationMeta(BaseModel):
"""Pagination metadata schema"""
total: int = Field(..., description="Total number of items")
page: int = Field(..., description="Current page number")
per_page: int = Field(..., description="Items per page")
pages: int = Field(..., description="Total number of pages")
class PaginatedResponse(BaseModel):
"""Generic paginated response schema"""
data: List[Any] = Field(..., description="Response data")
meta: PaginationMeta = Field(..., description="Pagination metadata")
links: PaginationLinks = Field(..., description="Pagination links")
# Enhanced OpenAPI configuration
OPENAPI_CONFIG = {
"title": "Modern API",
"description": "A modern REST API with async patterns",
"version": "1.0.0",
"docs_url": "/docs",
"redoc_url": "/redoc",
"openapi_url": "/openapi.json",
"servers": [
{
"url": "https://api.example.com/v1",
"description": "Production server"
},
{
"url": "https://staging-api.example.com/v1",
"description": "Staging server"
}
],
"components": {
"schemas": {
"Error": ErrorSchema.model_json_schema(),
"PaginatedResponse": PaginatedResponse.model_json_schema()
},
"responses": {
"ValidationError": {
"description": "Validation error",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"UnauthorizedError": {
"description": "Unauthorized error",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"NotFoundError": {
"description": "Resource not found",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"RateLimitError": {
"description": "Rate limit exceeded",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
}
}
}
}
6. Testing Patterns for APIs
# patterns/testing.py
import pytest
import asyncio
from typing import AsyncGenerator
import httpx
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch
class AsyncAPITestCase:
"""Base class for async API tests"""
@pytest.fixture(scope="class")
async def async_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
"""Create async test client"""
async with httpx.AsyncClient(
app=self.app,
base_url="http://test"
) as client:
yield client
@pytest.fixture
def mock_external_service(self):
"""Mock external service"""
with patch("external_api_client.ExternalAPIClient") as mock:
client = mock.return_value
client.get.return_value = {"status": "ok"}
yield client
# Example test cases
@pytest.mark.asyncio
class TestUserAPI(AsyncAPITestCase):
"""Test user API endpoints"""
async def test_create_user(self, async_client: httpx.AsyncClient):
"""Test user creation"""
user_data = {
"email": "test@example.com",
"username": "testuser",
"password": "securepassword123"
}
response = await async_client.post(
"/api/users",
json=user_data
)
assert response.status_code == 201
data = response.json()
assert data["email"] == user_data["email"]
assert data["username"] == user_data["username"]
assert "password" not in data
async def test_get_user(self, async_client: httpx.AsyncClient):
"""Test get user"""
# First create a user
create_response = await async_client.post(
"/api/users",
json={
"email": "test2@example.com",
"username": "testuser2",
"password": "securepassword123"
}
)
user_id = create_response.json()["id"]
# Get user
response = await async_client.get(f"/api/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == user_id
async def test_list_users_with_pagination(
self,
async_client: httpx.AsyncClient
):
"""Test user listing with pagination"""
response = await async_client.get("/api/users?page=1&per_page=10")
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "meta" in data
assert "links" in data
assert data["meta"]["page"] == 1
assert data["meta"]["per_page"] == 10
async def test_rate_limiting(
self,
async_client: httpx.AsyncClient
):
"""Test rate limiting"""
# Make multiple requests quickly
responses = []
for _ in range(150): # Assuming rate limit is 100/minute
response = await async_client.get("/api/users")
responses.append(response)
# Check if rate limiting kicked in
rate_limited = any(
r.status_code == 429 for r in responses
)
assert rate_limited
# Integration tests
@pytest.mark.asyncio
async def test_full_user_flow():
"""Test complete user workflow"""
async with httpx.AsyncClient(
app=app,
base_url="http://test"
) as client:
# Create user
user_data = {
"email": "flowtest@example.com",
"username": "flowtest",
"password": "securepassword123"
}
create_resp = await client.post("/api/users", json=user_data)
assert create_resp.status_code == 201
user = create_resp.json()
user_id = user["id"]
# Get user
get_resp = await client.get(f"/api/users/{user_id}")
assert get_resp.status_code == 200
assert get_resp.json()["id"] == user_id
# Update user
update_data = {"username": "updateduser"}
update_resp = await client.patch(
f"/api/users/{user_id}",
json=update_data
)
assert update_resp.status_code == 200
assert update_resp.json()["username"] == "updateduser"
# Delete user
delete_resp = await client.delete(f"/api/users/{user_id}")
assert delete_resp.status_code == 204
# Verify user is deleted
get_resp = await client.get(f"/api/users/{user_id}")
assert get_resp.status_code == 404
7. Performance Optimization Patterns
# patterns/performance.py
import asyncio
import asyncio.cache
import aioredis
from typing import Any, Optional, Callable
from functools import wraps
import hashlib
import json
import pickle
class ResponseCache:
"""Response caching with Redis"""
def __init__(self, redis_url: str, default_ttl: int = 300):
self.redis = None
self.redis_url = redis_url
self.default_ttl = default_ttl
async def initialize(self):
"""Initialize Redis connection"""
self.redis = await aioredis.from_url(self.redis_url)
def _make_cache_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate cache key from arguments"""
key_data = {
"args": args,
"kwargs": kwargs
}
key_str = json.dumps(key_data, sort_keys=True, default=str)
key_hash = hashlib.sha256(key_str.encode()).hexdigest()
return f"{prefix}:{key_hash}"
async def get(self, key: str) -> Optional[Any]:
"""Get cached value"""
if not self.redis:
await self.initialize()
cached = await self.redis.get(key)
if cached:
return pickle.loads(cached)
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
):
"""Set cached value"""
if not self.redis:
await self.initialize()
ttl = ttl or self.default_ttl
serialized = pickle.dumps(value)
await self.redis.setex(key, ttl, serialized)
async def delete(self, key: str):
"""Delete cached value"""
if self.redis:
await self.redis.delete(key)
def cache_response(
prefix: str,
ttl: Optional[int] = None,
cache: Optional[ResponseCache] = None
):
"""Decorator to cache response"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
cache_instance = cache or ResponseCache("redis://localhost")
# Generate cache key
cache_key = cache_instance._make_cache_key(prefix, *args, **kwargs)
# Try to get from cache
cached_result = await cache_instance.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Cache result
await cache_instance.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Usage example
@cache_response(prefix="user_data", ttl=300)
async def get_user_data(user_id: int) -> Dict[str, Any]:
"""Get user data with caching"""
# Expensive database operation or API call
await asyncio.sleep(1) # Simulate slow operation
return {"id": user_id, "name": "John Doe", "email": "john@example.com"}
# Batch processing for performance
class BatchProcessor:
"""Batch processor for optimizing multiple operations"""
def __init__(self, batch_size: int = 100, flush_interval: int = 5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.queue = asyncio.Queue()
self.processing = False
async def add(self, item: Any):
"""Add item to batch queue"""
await self.queue.put(item)
if not self.processing:
self.processing = True
asyncio.create_task(self._process_batch())
async def _process_batch(self):
"""Process items in batches"""
while True:
batch = []
# Collect batch items
try:
deadline = asyncio.get_event_loop().time() + self.flush_interval
while len(batch) < self.batch_size:
try:
timeout = max(0, deadline - asyncio.get_event_loop().time())
item = await asyncio.wait_for(
self.queue.get(),
timeout=timeout
)
batch.append(item)
except asyncio.TimeoutError:
break
if batch:
await self._process_batch_items(batch)
except Exception as e:
print(f"Error processing batch: {e}")
continue
async def _process_batch_items(self, batch: list[Any]):
"""Process a batch of items"""
# Override in subclasses
# Example: batch database insert, batch API calls, etc.
print(f"Processing batch of {len(batch)} items")
8. Production Configuration Checklist
# api/production_checklist.yaml
performance:
async_patterns:
connection_pooling: true
max_connections: 100
connection_timeout: 30
keep_alive: true
caching:
redis_cache: true
default_ttl: 300
cache_headers: true
compression:
gzip: true
level: 6
threshold: 1024
rate_limiting:
enabled: true
default_limit: 100/minute
burst_limit: 200
sliding_window: true
security:
authentication:
jwt_validation: true
token_refresh: true
revoke_tokens: true
authorization:
rbac: true
rate_limit_by_role: true
validation:
input_validation: true
sql_injection_protection: true
xss_protection: true
headers:
security_headers: true
cors: true
csrf_protection: true
monitoring:
metrics:
prometheus: true
request_duration: true
error_rate: true
throughput: true
logging:
structured_logging: true
correlation_ids: true
error_tracking: true
health_checks:
liveness_probe: true
readiness_probe: true
dependency_checks: true
documentation:
openapi:
auto_generation: true
examples: true
schemas: true
versioning:
strategy: "header_path_query"
deprecation_warnings: true
backward_compatibility: true
testing:
unit_tests: true
integration_tests: true
performance_tests: true
contract_tests: true
This comprehensive API development skill provides modern patterns for building high-performance APIs in 2025, including async/await patterns, circuit breakers, rate limiting, versioning strategies, comprehensive testing, and production optimization techniques that work across different frameworks.
Score
Total Score
Based on repository quality metrics
SKILL.mdファイルが含まれている
ライセンスが設定されている
100文字以上の説明がある
GitHub Stars 100以上
1ヶ月以内に更新
10回以上フォークされている
オープンIssueが50未満
プログラミング言語が設定されている
1つ以上のタグが設定されている
Reviews
Reviews coming soon
