Back to list
yonatangross

saga-patterns

by yonatangross

The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.

29🍴 4📅 Jan 23, 2026

SKILL.md


name: saga-patterns description: Saga patterns for distributed transactions with orchestration and choreography approaches. Use when implementing multi-service transactions, handling partial failures, or building systems requiring eventual consistency with compensation. context: fork agent: event-driven-architect version: 1.0.0 tags: [saga, distributed-transactions, orchestration, choreography, compensation, microservices, 2026] author: OrchestKit user-invocable: false

Saga Patterns for Distributed Transactions

Maintain consistency across microservices without distributed locks.

Overview

  • Multi-service business transactions (order -> payment -> inventory -> shipping)
  • Operations that must eventually succeed or roll back completely
  • Long-running business processes (minutes to days)
  • Microservices avoiding 2PC (two-phase commit)

When NOT to Use

  • Single database operations (use transactions)
  • Real-time consistency requirements (use synchronous calls)
  • When eventual consistency is unacceptable

Orchestration vs Choreography

AspectOrchestrationChoreography
ControlCentral orchestratorDistributed events
CouplingServices depend on orchestratorLoosely coupled
VisibilitySingle point of observationRequires distributed tracing
Best forComplex, ordered workflowsSimple, parallel flows

Orchestration Pattern

from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any
from datetime import datetime, timezone

class SagaStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"

@dataclass
class SagaStep:
    name: str
    action: Callable
    compensation: Callable
    status: SagaStatus = SagaStatus.PENDING
    result: Any = None

@dataclass
class SagaContext:
    saga_id: str
    data: dict = field(default_factory=dict)
    steps: list[SagaStep] = field(default_factory=list)
    status: SagaStatus = SagaStatus.PENDING
    current_step: int = 0

class SagaOrchestrator:
    def __init__(self, saga_repository, event_publisher):
        self.repo = saga_repository
        self.publisher = event_publisher

    async def execute(self, saga: SagaContext) -> SagaContext:
        saga.status = SagaStatus.RUNNING
        await self.repo.save(saga)

        for i, step in enumerate(saga.steps):
            saga.current_step = i
            try:
                step.result = await step.action(saga.data)
                saga.data.update(step.result or {})
                step.status = SagaStatus.COMPLETED
            except Exception:
                step.status = SagaStatus.FAILED
                await self._compensate(saga, i)
                return saga

        saga.status = SagaStatus.COMPLETED
        await self.repo.save(saga)
        return saga

    async def _compensate(self, saga: SagaContext, failed_step: int):
        saga.status = SagaStatus.COMPENSATING
        for i in range(failed_step - 1, -1, -1):
            step = saga.steps[i]
            if step.status == SagaStatus.COMPLETED:
                try:
                    await step.compensation(saga.data)
                    step.status = SagaStatus.COMPENSATED
                except Exception as e:
                    step.error = f"Compensation failed: {e}"
        saga.status = SagaStatus.COMPENSATED
        await self.repo.save(saga)

Order Saga Example

class OrderSaga:
    def __init__(self, payment_service, inventory_service, shipping_service):
        self.payment = payment_service
        self.inventory = inventory_service
        self.shipping = shipping_service

    def create_saga(self, order: Order) -> SagaContext:
        return SagaContext(
            saga_id=f"order-{order.id}",
            data={"order": order.dict()},
            steps=[
                SagaStep("reserve_inventory", self._reserve_inventory, self._release_inventory),
                SagaStep("process_payment", self._process_payment, self._refund_payment),
                SagaStep("create_shipment", self._create_shipment, self._cancel_shipment),
            ],
        )

    async def _reserve_inventory(self, data: dict) -> dict:
        reservation = await self.inventory.reserve(items=data["order"]["items"])
        return {"reservation_id": reservation.id}

    async def _release_inventory(self, data: dict):
        await self.inventory.release(data["reservation_id"])

    async def _process_payment(self, data: dict) -> dict:
        payment = await self.payment.charge(amount=data["order"]["total"])
        return {"payment_id": payment.id}

    async def _refund_payment(self, data: dict):
        await self.payment.refund(data["payment_id"])

    async def _create_shipment(self, data: dict) -> dict:
        shipment = await self.shipping.create(order_id=data["order"]["id"])
        return {"shipment_id": shipment.id}

    async def _cancel_shipment(self, data: dict):
        if "shipment_id" in data:
            await self.shipping.cancel(data["shipment_id"])

Choreography Pattern

class OrderChoreography:
    """Event handlers for order saga choreography."""

    def __init__(self, event_bus, order_repo):
        self.bus = event_bus
        self.repo = order_repo

    async def handle_order_created(self, event):
        await self.bus.publish("inventory.reserve.requested", {
            "saga_id": event.saga_id,
            "items": event.payload["order"]["items"],
        })

    async def handle_inventory_reserved(self, event):
        await self.bus.publish("payment.charge.requested", {
            "saga_id": event.saga_id,
            "amount": event.payload["amount"],
        })

    async def handle_payment_failed(self, event):
        # Compensation: release inventory
        await self.bus.publish("inventory.release.requested", {
            "saga_id": event.saga_id,
            "reservation_id": event.payload["reservation_id"],
        })

    async def handle_shipment_created(self, event):
        order = await self.repo.get(event.payload["order_id"])
        order.status = "shipped"
        await self.repo.save(order)

Timeout and Recovery

from datetime import timedelta
import asyncio

class SagaRecovery:
    def __init__(self, saga_repo, orchestrator):
        self.repo = saga_repo
        self.orchestrator = orchestrator

    async def recover_stuck_sagas(self, timeout: timedelta = timedelta(hours=1)):
        cutoff = datetime.now(timezone.utc) - timeout
        stuck_sagas = await self.repo.find_by_status_and_age(SagaStatus.RUNNING, cutoff)

        for saga in stuck_sagas:
            try:
                await self.orchestrator.resume(saga)
            except Exception:
                await self.orchestrator._compensate(saga, saga.current_step)

    async def retry_failed_step(self, saga_id: str, max_retries: int = 3):
        saga = await self.repo.get(saga_id)
        failed_step = saga.steps[saga.current_step]

        for attempt in range(max_retries):
            try:
                failed_step.result = await failed_step.action(saga.data)
                failed_step.status = SagaStatus.COMPLETED
                await self.orchestrator.resume(saga, from_step=saga.current_step + 1)
                return
            except Exception:
                await asyncio.sleep(2 ** attempt)

        await self.orchestrator._compensate(saga, saga.current_step)

Idempotency

class IdempotentSagaStep:
    def __init__(self, step_name: str, idempotency_store):
        self.step_name = step_name
        self.store = idempotency_store

    async def execute(self, saga_id: str, action: Callable, *args, **kwargs):
        idempotency_key = f"{saga_id}:{self.step_name}"
        existing = await self.store.get(idempotency_key)
        if existing:
            return existing["result"]

        result = await action(*args, **kwargs)
        await self.store.set(idempotency_key, {"result": result}, ttl=timedelta(days=7))
        return result

Key Decisions

DecisionRecommendation
Pattern choiceOrchestration for complex flows, Choreography for simple
State storagePersistent store (PostgreSQL) for saga state
IdempotencyRequired for all saga steps
TimeoutsPer-step timeouts with recovery
CompensationAlways implement, test thoroughly
ObservabilityTrace saga ID across all services

Anti-Patterns (FORBIDDEN)

# NEVER skip compensation logic
async def _process_payment(self, data: dict):
    return await self.payment.charge(data)  # Missing compensation!

# NEVER rely on synchronous calls across services
async def _reserve_and_pay(self, data: dict):
    await self.inventory.reserve(data)
    await self.payment.charge(data)  # If fails, inventory stuck!

# NEVER ignore idempotency
async def _create_order(self, data: dict):
    return await self.db.insert(Order(**data))  # Duplicate on retry!

# NEVER use in-memory saga state
sagas = {}  # Lost on restart!

# ALWAYS persist saga state and test compensation paths
  • temporal-io - Durable workflow execution
  • event-sourcing - Event-driven state management
  • idempotency-patterns - Idempotent operations

Score

Total Score

75/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

+10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon