Back to list
yonatangross

cqrs-patterns

by yonatangross

The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.

29🍴 4📅 Jan 23, 2026

SKILL.md


name: cqrs-patterns description: CQRS (Command Query Responsibility Segregation) patterns for separating read and write models. Use when optimizing read-heavy systems, implementing event sourcing, or building systems with different read/write scaling requirements. context: fork agent: event-driven-architect version: 1.0.0 tags: [cqrs, command-query, read-model, write-model, projection, event-sourcing, 2026] author: OrchestKit user-invocable: false

CQRS Patterns

Separate read and write concerns for optimized data access.

Overview

  • Read-heavy workloads with complex queries
  • Different scaling requirements for reads vs writes
  • Event sourcing implementations
  • Multiple read model representations of same data
  • Complex domain models with simple read requirements

When NOT to Use

  • Simple CRUD applications
  • Strong consistency requirements everywhere
  • Small datasets with simple queries

Architecture Overview

┌─────────────────┐         ┌─────────────────┐
│   Write Side    │         │   Read Side     │
├─────────────────┤         ├─────────────────┤
│  ┌───────────┐  │         │  ┌───────────┐  │
│  │ Commands  │  │         │  │  Queries  │  │
│  └─────┬─────┘  │         │  └─────┬─────┘  │
│  ┌─────▼─────┐  │         │  ┌─────▼─────┐  │
│  │ Aggregate │  │         │  │Read Model │  │
│  └─────┬─────┘  │         │  └───────────┘  │
│  ┌─────▼─────┐  │         │        ▲        │
│  │  Events   │──┼─────────┼────────┘        │
│  └───────────┘  │ Publish │   Project       │
└─────────────────┘         └─────────────────┘

Command Side (Write Model)

Command and Handler

from pydantic import BaseModel, Field
from uuid import UUID, uuid4
from datetime import datetime, timezone
from abc import ABC, abstractmethod

class Command(BaseModel):
    """Base command with metadata."""
    command_id: UUID = Field(default_factory=uuid4)
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    user_id: UUID | None = None

class CreateOrder(Command):
    customer_id: UUID
    items: list[OrderItem]
    shipping_address: Address

class CommandHandler(ABC):
    @abstractmethod
    async def handle(self, command: Command) -> list["DomainEvent"]:
        pass

class CreateOrderHandler(CommandHandler):
    def __init__(self, order_repo, inventory_service):
        self.order_repo = order_repo
        self.inventory = inventory_service

    async def handle(self, command: CreateOrder) -> list[DomainEvent]:
        for item in command.items:
            if not await self.inventory.check_availability(item.product_id, item.quantity):
                raise InsufficientInventoryError(item.product_id)

        order = Order.create(
            customer_id=command.customer_id,
            items=command.items,
            shipping_address=command.shipping_address,
        )
        await self.order_repo.save(order)
        return order.pending_events

class CommandBus:
    def __init__(self):
        self._handlers: dict[type, CommandHandler] = {}

    def register(self, command_type: type, handler: CommandHandler):
        self._handlers[command_type] = handler

    async def dispatch(self, command: Command) -> list[DomainEvent]:
        handler = self._handlers.get(type(command))
        if not handler:
            raise NoHandlerFoundError(type(command))
        events = await handler.handle(command)
        for event in events:
            await self.event_publisher.publish(event)
        return events

Write Model (Aggregate)

from dataclasses import dataclass, field

@dataclass
class Order:
    id: UUID
    customer_id: UUID
    items: list[OrderItem]
    status: OrderStatus
    _pending_events: list[DomainEvent] = field(default_factory=list)

    @classmethod
    def create(cls, customer_id: UUID, items: list, shipping_address: Address) -> "Order":
        order = cls(id=uuid4(), customer_id=customer_id, items=[], status=OrderStatus.PENDING)
        for item in items:
            order.items.append(item)
            order._raise_event(OrderItemAdded(order_id=order.id, product_id=item.product_id))
        order._raise_event(OrderCreated(order_id=order.id, customer_id=customer_id))
        return order

    def cancel(self, reason: str):
        if self.status == OrderStatus.SHIPPED:
            raise InvalidOperationError("Cannot cancel shipped order")
        self.status = OrderStatus.CANCELLED
        self._raise_event(OrderCancelled(order_id=self.id, reason=reason))

    def _raise_event(self, event: DomainEvent):
        self._pending_events.append(event)

    @property
    def pending_events(self) -> list[DomainEvent]:
        events = self._pending_events.copy()
        self._pending_events.clear()
        return events

Query Side (Read Model)

Query Handler

class Query(BaseModel):
    pass

class GetOrderById(Query):
    order_id: UUID

class GetOrdersByCustomer(Query):
    customer_id: UUID
    status: OrderStatus | None = None
    page: int = 1
    page_size: int = 20

class GetOrderByIdHandler:
    def __init__(self, read_db):
        self.db = read_db

    async def handle(self, query: GetOrderById) -> OrderView | None:
        row = await self.db.fetchrow(
            "SELECT * FROM order_summary WHERE id = $1", query.order_id
        )
        return OrderView(**row) if row else None

class OrderView(BaseModel):
    """Denormalized read model for orders."""
    id: UUID
    customer_id: UUID
    customer_name: str  # Denormalized
    status: str
    total_amount: float
    item_count: int
    created_at: datetime

Projections

class OrderProjection:
    """Projects events to read models."""
    def __init__(self, read_db, customer_service):
        self.db = read_db
        self.customers = customer_service

    async def handle(self, event: DomainEvent):
        match event:
            case OrderCreated():
                await self._on_order_created(event)
            case OrderItemAdded():
                await self._on_item_added(event)
            case OrderCancelled():
                await self._on_order_cancelled(event)

    async def _on_order_created(self, event: OrderCreated):
        customer = await self.customers.get(event.customer_id)
        await self.db.execute(
            """INSERT INTO order_summary (id, customer_id, customer_name, status, total_amount, item_count, created_at)
               VALUES ($1, $2, $3, 'pending', 0.0, 0, $4)
               ON CONFLICT (id) DO UPDATE SET customer_name = $3""",
            event.order_id, event.customer_id, customer.name, event.timestamp,
        )

    async def _on_item_added(self, event: OrderItemAdded):
        subtotal = event.quantity * event.unit_price
        await self.db.execute(
            "UPDATE order_summary SET total_amount = total_amount + $1, item_count = item_count + 1 WHERE id = $2",
            subtotal, event.order_id,
        )

    async def _on_order_cancelled(self, event: OrderCancelled):
        await self.db.execute(
            "UPDATE order_summary SET status = 'cancelled' WHERE id = $1", event.order_id
        )

FastAPI Integration

from fastapi import FastAPI, Depends, HTTPException

app = FastAPI()

@app.post("/api/v1/orders", status_code=201)
async def create_order(request: CreateOrderRequest, bus: CommandBus = Depends(get_command_bus)):
    command = CreateOrder(
        customer_id=request.customer_id,
        items=request.items,
        shipping_address=request.shipping_address,
    )
    try:
        events = await bus.dispatch(command)
        return {"order_id": events[0].order_id}
    except InsufficientInventoryError as e:
        raise HTTPException(400, f"Insufficient inventory: {e}")

@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: UUID, bus: QueryBus = Depends(get_query_bus)):
    order = await bus.dispatch(GetOrderById(order_id=order_id))
    if not order:
        raise HTTPException(404, "Order not found")
    return order

Key Decisions

DecisionRecommendation
ConsistencyEventual consistency between write and read models
Event storageEvent store for write side, denormalized tables for read
Projection lagMonitor and alert on projection delay
Read model countStart with one, add more for specific query needs
Rebuild strategyAbility to rebuild projections from events

Anti-Patterns (FORBIDDEN)

# NEVER query write model for reads
order = await aggregate_repo.get(order_id)  # WRONG

# CORRECT: Use read model
order = await query_bus.dispatch(GetOrderById(order_id=order_id))

# NEVER modify read model directly
await read_db.execute("UPDATE orders SET status = $1", status)  # WRONG

# CORRECT: Dispatch command, let projection update
await bus.dispatch(UpdateOrderStatus(order_id=order_id, status=status))

# NEVER skip projection idempotency - use UPSERT
await self.db.execute("INSERT INTO ... ON CONFLICT (id) DO UPDATE SET ...")
  • event-sourcing - Event-sourced write models
  • saga-patterns - Cross-aggregate transactions
  • database-schema-designer - Read model schema design

Score

Total Score

75/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

+10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon