← Back to list

cqrs-patterns
by yonatangross
The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.
⭐ 29🍴 4📅 Jan 23, 2026
SKILL.md
name: cqrs-patterns description: CQRS (Command Query Responsibility Segregation) patterns for separating read and write models. Use when optimizing read-heavy systems, implementing event sourcing, or building systems with different read/write scaling requirements. context: fork agent: event-driven-architect version: 1.0.0 tags: [cqrs, command-query, read-model, write-model, projection, event-sourcing, 2026] author: OrchestKit user-invocable: false
CQRS Patterns
Separate read and write concerns for optimized data access.
Overview
- Read-heavy workloads with complex queries
- Different scaling requirements for reads vs writes
- Event sourcing implementations
- Multiple read model representations of same data
- Complex domain models with simple read requirements
When NOT to Use
- Simple CRUD applications
- Strong consistency requirements everywhere
- Small datasets with simple queries
Architecture Overview
┌─────────────────┐ ┌─────────────────┐
│ Write Side │ │ Read Side │
├─────────────────┤ ├─────────────────┤
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Commands │ │ │ │ Queries │ │
│ └─────┬─────┘ │ │ └─────┬─────┘ │
│ ┌─────▼─────┐ │ │ ┌─────▼─────┐ │
│ │ Aggregate │ │ │ │Read Model │ │
│ └─────┬─────┘ │ │ └───────────┘ │
│ ┌─────▼─────┐ │ │ ▲ │
│ │ Events │──┼─────────┼────────┘ │
│ └───────────┘ │ Publish │ Project │
└─────────────────┘ └─────────────────┘
Command Side (Write Model)
Command and Handler
from pydantic import BaseModel, Field
from uuid import UUID, uuid4
from datetime import datetime, timezone
from abc import ABC, abstractmethod
class Command(BaseModel):
"""Base command with metadata."""
command_id: UUID = Field(default_factory=uuid4)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
user_id: UUID | None = None
class CreateOrder(Command):
customer_id: UUID
items: list[OrderItem]
shipping_address: Address
class CommandHandler(ABC):
@abstractmethod
async def handle(self, command: Command) -> list["DomainEvent"]:
pass
class CreateOrderHandler(CommandHandler):
def __init__(self, order_repo, inventory_service):
self.order_repo = order_repo
self.inventory = inventory_service
async def handle(self, command: CreateOrder) -> list[DomainEvent]:
for item in command.items:
if not await self.inventory.check_availability(item.product_id, item.quantity):
raise InsufficientInventoryError(item.product_id)
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address,
)
await self.order_repo.save(order)
return order.pending_events
class CommandBus:
def __init__(self):
self._handlers: dict[type, CommandHandler] = {}
def register(self, command_type: type, handler: CommandHandler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> list[DomainEvent]:
handler = self._handlers.get(type(command))
if not handler:
raise NoHandlerFoundError(type(command))
events = await handler.handle(command)
for event in events:
await self.event_publisher.publish(event)
return events
Write Model (Aggregate)
from dataclasses import dataclass, field
@dataclass
class Order:
id: UUID
customer_id: UUID
items: list[OrderItem]
status: OrderStatus
_pending_events: list[DomainEvent] = field(default_factory=list)
@classmethod
def create(cls, customer_id: UUID, items: list, shipping_address: Address) -> "Order":
order = cls(id=uuid4(), customer_id=customer_id, items=[], status=OrderStatus.PENDING)
for item in items:
order.items.append(item)
order._raise_event(OrderItemAdded(order_id=order.id, product_id=item.product_id))
order._raise_event(OrderCreated(order_id=order.id, customer_id=customer_id))
return order
def cancel(self, reason: str):
if self.status == OrderStatus.SHIPPED:
raise InvalidOperationError("Cannot cancel shipped order")
self.status = OrderStatus.CANCELLED
self._raise_event(OrderCancelled(order_id=self.id, reason=reason))
def _raise_event(self, event: DomainEvent):
self._pending_events.append(event)
@property
def pending_events(self) -> list[DomainEvent]:
events = self._pending_events.copy()
self._pending_events.clear()
return events
Query Side (Read Model)
Query Handler
class Query(BaseModel):
pass
class GetOrderById(Query):
order_id: UUID
class GetOrdersByCustomer(Query):
customer_id: UUID
status: OrderStatus | None = None
page: int = 1
page_size: int = 20
class GetOrderByIdHandler:
def __init__(self, read_db):
self.db = read_db
async def handle(self, query: GetOrderById) -> OrderView | None:
row = await self.db.fetchrow(
"SELECT * FROM order_summary WHERE id = $1", query.order_id
)
return OrderView(**row) if row else None
class OrderView(BaseModel):
"""Denormalized read model for orders."""
id: UUID
customer_id: UUID
customer_name: str # Denormalized
status: str
total_amount: float
item_count: int
created_at: datetime
Projections
class OrderProjection:
"""Projects events to read models."""
def __init__(self, read_db, customer_service):
self.db = read_db
self.customers = customer_service
async def handle(self, event: DomainEvent):
match event:
case OrderCreated():
await self._on_order_created(event)
case OrderItemAdded():
await self._on_item_added(event)
case OrderCancelled():
await self._on_order_cancelled(event)
async def _on_order_created(self, event: OrderCreated):
customer = await self.customers.get(event.customer_id)
await self.db.execute(
"""INSERT INTO order_summary (id, customer_id, customer_name, status, total_amount, item_count, created_at)
VALUES ($1, $2, $3, 'pending', 0.0, 0, $4)
ON CONFLICT (id) DO UPDATE SET customer_name = $3""",
event.order_id, event.customer_id, customer.name, event.timestamp,
)
async def _on_item_added(self, event: OrderItemAdded):
subtotal = event.quantity * event.unit_price
await self.db.execute(
"UPDATE order_summary SET total_amount = total_amount + $1, item_count = item_count + 1 WHERE id = $2",
subtotal, event.order_id,
)
async def _on_order_cancelled(self, event: OrderCancelled):
await self.db.execute(
"UPDATE order_summary SET status = 'cancelled' WHERE id = $1", event.order_id
)
FastAPI Integration
from fastapi import FastAPI, Depends, HTTPException
app = FastAPI()
@app.post("/api/v1/orders", status_code=201)
async def create_order(request: CreateOrderRequest, bus: CommandBus = Depends(get_command_bus)):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items,
shipping_address=request.shipping_address,
)
try:
events = await bus.dispatch(command)
return {"order_id": events[0].order_id}
except InsufficientInventoryError as e:
raise HTTPException(400, f"Insufficient inventory: {e}")
@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: UUID, bus: QueryBus = Depends(get_query_bus)):
order = await bus.dispatch(GetOrderById(order_id=order_id))
if not order:
raise HTTPException(404, "Order not found")
return order
Key Decisions
| Decision | Recommendation |
|---|---|
| Consistency | Eventual consistency between write and read models |
| Event storage | Event store for write side, denormalized tables for read |
| Projection lag | Monitor and alert on projection delay |
| Read model count | Start with one, add more for specific query needs |
| Rebuild strategy | Ability to rebuild projections from events |
Anti-Patterns (FORBIDDEN)
# NEVER query write model for reads
order = await aggregate_repo.get(order_id) # WRONG
# CORRECT: Use read model
order = await query_bus.dispatch(GetOrderById(order_id=order_id))
# NEVER modify read model directly
await read_db.execute("UPDATE orders SET status = $1", status) # WRONG
# CORRECT: Dispatch command, let projection update
await bus.dispatch(UpdateOrderStatus(order_id=order_id, status=status))
# NEVER skip projection idempotency - use UPSERT
await self.db.execute("INSERT INTO ... ON CONFLICT (id) DO UPDATE SET ...")
Related Skills
event-sourcing- Event-sourced write modelssaga-patterns- Cross-aggregate transactionsdatabase-schema-designer- Read model schema design
Score
Total Score
75/100
Based on repository quality metrics
✓SKILL.md
SKILL.mdファイルが含まれている
+20
✓LICENSE
ライセンスが設定されている
+10
✓説明文
100文字以上の説明がある
+10
○人気
GitHub Stars 100以上
0/15
✓最近の活動
1ヶ月以内に更新
+10
○フォーク
10回以上フォークされている
0/5
✓Issue管理
オープンIssueが50未満
+5
✓言語
プログラミング言語が設定されている
+5
✓タグ
1つ以上のタグが設定されている
+5
Reviews
💬
Reviews coming soon
