
outbox-pattern
by yonatangross
The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.
SKILL.md
name: outbox-pattern description: Transactional outbox pattern for reliable event publishing. Use when implementing atomic writes with event delivery, ensuring exactly-once semantics, or building event-driven microservices. context: fork agent: event-driven-architect version: 2.0.0 tags: [event-driven, outbox, transactions, reliability, microservices, cdc, idempotency, 2026] allowed-tools: [Read, Write, Grep, Glob, Bash] author: OrchestKit user-invocable: false
Outbox Pattern (2026)
Ensure atomic state changes and event publishing by writing both to a database transaction, then publishing asynchronously.
Overview
- Ensuring database writes and event publishing are atomic
- Building reliable event-driven microservices
- Implementing exactly-once message delivery semantics
- Avoiding dual-write problems (DB + message broker)
- Decoupling domain logic from message infrastructure
- High-throughput systems needing CDC-based publishing
Quick Reference
Outbox Table Schema
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
idempotency_key VARCHAR(255) UNIQUE, -- For consumer deduplication
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
retry_count INT DEFAULT 0,
last_error TEXT
);
-- Index for polling unpublished messages
CREATE INDEX idx_outbox_unpublished ON outbox(created_at)
WHERE published_at IS NULL;
-- Index for aggregate ordering
CREATE INDEX idx_outbox_aggregate ON outbox(aggregate_id, created_at);
-- Index for idempotency key lookups
CREATE INDEX idx_outbox_idempotency ON outbox(idempotency_key)
WHERE idempotency_key IS NOT NULL;
SQLAlchemy Model
from sqlalchemy.dialects.postgresql import UUID, JSONB
import hashlib
class OutboxMessage(Base):
__tablename__ = "outbox"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
aggregate_type = Column(String(100), nullable=False)
aggregate_id = Column(UUID(as_uuid=True), nullable=False, index=True)
event_type = Column(String(100), nullable=False)
payload = Column(JSONB, nullable=False)
idempotency_key = Column(String(255), unique=True, nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
published_at = Column(DateTime, nullable=True)
retry_count = Column(Integer, default=0)
last_error = Column(Text, nullable=True)
@staticmethod
def generate_idempotency_key(aggregate_id: str, event_type: str, payload: dict) -> str:
"""Generate deterministic idempotency key for deduplication."""
content = f"{aggregate_id}:{event_type}:{json.dumps(payload, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()[:32]
Write to Outbox in Transaction
from sqlalchemy.ext.asyncio import AsyncSession
class OrderService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_order(self, order_data: OrderCreate) -> Order:
"""Create order AND outbox message in single transaction."""
# Create the order
order = Order(**order_data.model_dump())
self.db.add(order)
# Create outbox message in SAME transaction
outbox_msg = OutboxMessage(
aggregate_type="Order",
aggregate_id=order.id,
event_type="OrderCreated",
payload={
"order_id": str(order.id),
"customer_id": str(order.customer_id),
"total": order.total,
},
idempotency_key=OutboxMessage.generate_idempotency_key(
str(order.id), "OrderCreated", {"total": order.total}
),
)
self.db.add(outbox_msg)
await self.db.flush() # Both written atomically
return order
Polling Publisher
class OutboxPublisher:
"""Polls outbox and publishes to message broker."""
def __init__(self, session_factory, producer):
self.session_factory = session_factory
self.producer = producer
async def publish_pending(self, batch_size: int = 100) -> int:
async with self.session_factory() as session:
stmt = (
select(OutboxMessage)
.where(OutboxMessage.published_at.is_(None))
.order_by(OutboxMessage.created_at)
.limit(batch_size)
.with_for_update(skip_locked=True) # Prevent duplicate processing
)
result = await session.execute(stmt)
messages = result.scalars().all()
published = 0
for msg in messages:
try:
await self.producer.publish(
topic=f"{msg.aggregate_type.lower()}-events",
key=str(msg.aggregate_id),
value={
"type": msg.event_type,
"idempotency_key": msg.idempotency_key,
**msg.payload
},
)
msg.published_at = datetime.now(timezone.utc)
published += 1
except Exception as e:
msg.retry_count += 1
msg.last_error = str(e)
await session.commit()
return published
CDC with Debezium (High-Throughput)
# docker-compose.yml - Debezium connector
version: '3.8'
services:
debezium:
image: debezium/connect:2.5
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: outbox-connector
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
# Register outbox connector
# POST http://debezium:8083/connectors
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "app",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "${routedByValue}-events"
}
}
Idempotent Consumer
class IdempotentConsumer:
"""Consumer with deduplication using idempotency keys."""
def __init__(self, db: AsyncSession, redis: Redis):
self.db = db
self.redis = redis
async def process(self, event: dict, handler) -> bool:
"""Process event idempotently - returns False if duplicate."""
idempotency_key = event.get("idempotency_key")
if not idempotency_key:
# No key = always process (but risky)
await handler(event)
return True
# Check Redis cache first (fast path)
if await self.redis.exists(f"processed:{idempotency_key}"):
return False # Already processed
# Check database (slow path, but durable)
exists = await self.db.execute(
select(ProcessedEvent)
.where(ProcessedEvent.idempotency_key == idempotency_key)
)
if exists.scalar_one_or_none():
# Cache for future fast lookups
await self.redis.setex(f"processed:{idempotency_key}", 86400, "1")
return False
# Process and record
async with self.db.begin():
await handler(event)
self.db.add(ProcessedEvent(idempotency_key=idempotency_key))
await self.db.flush()
# Cache the processed key
await self.redis.setex(f"processed:{idempotency_key}", 86400, "1")
return True
class ProcessedEvent(Base):
"""Track processed events for idempotency."""
__tablename__ = "processed_events"
idempotency_key = Column(String(255), primary_key=True)
processed_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
Dapr Outbox Integration
# Using Dapr's built-in outbox support
from dapr.clients import DaprClient
async def create_order_with_dapr(order_data: dict):
"""Dapr handles outbox automatically."""
async with DaprClient() as client:
# Single call - Dapr ensures atomicity
await client.save_state(
store_name="statestore",
key=f"order-{order_data['id']}",
value=order_data,
state_metadata={
"outbox.publish": "true",
"outbox.topic": "orders",
}
)
Key Decisions
| Decision | Option A | Option B | Recommendation |
|---|---|---|---|
| Delivery | Polling | CDC (Debezium) | Polling for simplicity, CDC for > 10K msg/s |
| Batch size | Small (10-50) | Large (100-500) | Start with 100, tune based on latency |
| Retry | Fixed delay | Exponential backoff | Exponential with max 5 retries |
| Cleanup | Delete published | Archive to cold storage | Archive for audit, delete after 30 days |
| Ordering | Per-aggregate | Global | Per-aggregate via partition key |
| Idempotency | Consumer-side | Built-in key | Always include idempotency key |
| Tool | Custom | Dapr | Dapr if K8s, custom otherwise |
Outbox vs CDC Trade-offs
┌────────────────────────────────────────────────────────────────────────┐
│ POLLING vs CDC COMPARISON │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ POLLING (OutboxPublisher) CDC (Debezium) │
│ ────────────────────────── ──────────────── │
│ ✓ Simple to implement ✓ Higher throughput (100K+ msg/s) │
│ ✓ No extra infrastructure ✓ Lower latency (sub-second) │
│ ✓ Easy to debug ✓ No polling overhead │
│ ✗ Polling overhead ✗ Complex infrastructure │
│ ✗ Higher latency (1-5s) ✗ Harder to debug │
│ ✗ Limited throughput (~10K/s) ✗ Requires Kafka Connect │
│ │
│ USE WHEN: USE WHEN: │
│ - Starting out - High throughput required │
│ - Simple architecture - Sub-second latency needed │
│ - < 10K events/second - Already using Kafka │
│ │
└────────────────────────────────────────────────────────────────────────┘
Anti-Patterns (FORBIDDEN)
# NEVER publish before commit - dual-write problem
await producer.publish(event) # May succeed but commit may fail!
await session.commit()
# NEVER delete without publishing - events lost
await session.execute(delete(OutboxMessage).where(...))
# NEVER process without locking - causes duplicates
messages = await session.execute(select(OutboxMessage)) # No lock!
# NEVER store large payloads - use URL references instead
OutboxMessage(payload={"file": large_binary_data})
# NEVER ignore ordering - use aggregate_id as partition key
await producer.publish(event_a) # May arrive out of order!
# NEVER skip idempotency keys
OutboxMessage(payload=event_data) # No idempotency_key = duplicate risk
# NEVER process without idempotency check on consumer
async def handle(event):
await process(event) # Duplicate processing on retry!
Related Skills
message-queues- Kafka, RabbitMQ, Redis Streams integrationevent-sourcing- Full event-sourced architecture patternsdatabase-schema-designer- Schema design and migrationssqlalchemy-2-async- Async database session patterns
Capability Details
outbox-schema
Keywords: outbox table, transactional outbox, event table, schema, idempotency Solves:
- How do I design an outbox table?
- What indexes are needed for outbox?
- Outbox table best practices
- Idempotency key generation
polling-publisher
Keywords: polling, publish outbox, background worker, relay Solves:
- How do I publish from the outbox?
- Batch publishing patterns
- Handling publish failures
- FOR UPDATE SKIP LOCKED pattern
cdc-debezium
Keywords: cdc, debezium, change data capture, kafka connect Solves:
- When to use CDC vs polling?
- Debezium connector configuration
- High-throughput event publishing
- Outbox event routing
idempotent-consumer
Keywords: idempotent, deduplication, exactly-once, processed events Solves:
- How to handle duplicate messages?
- Idempotency key patterns
- Consumer deduplication strategies
- Redis + DB deduplication
atomic-operations
Keywords: atomic, transaction, dual-write, consistency Solves:
- How to avoid dual-write problems?
- Ensuring atomicity between DB and events
- Exactly-once delivery patterns
Score
Total Score
Based on repository quality metrics
SKILL.mdファイルが含まれている
ライセンスが設定されている
100文字以上の説明がある
GitHub Stars 100以上
1ヶ月以内に更新
10回以上フォークされている
オープンIssueが50未満
プログラミング言語が設定されている
1つ以上のタグが設定されている
Reviews
Reviews coming soon
