Back to list
yonatangross

temporal-io

by yonatangross

The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.

29🍴 4📅 Jan 23, 2026

SKILL.md


name: temporal-io description: Temporal.io workflow orchestration for durable, fault-tolerant distributed applications. Use when implementing long-running workflows, saga patterns, microservice orchestration, or systems requiring exactly-once execution guarantees. context: fork agent: workflow-architect version: 1.0.0 tags: [temporal, workflow, orchestration, durable-execution, saga, microservices, 2026] author: OrchestKit user-invocable: false

Temporal.io Workflow Orchestration

Durable execution engine for reliable distributed applications.

Overview

  • Long-running business processes (days/weeks/months)
  • Saga patterns requiring compensation/rollback
  • Microservice orchestration with retries
  • Systems requiring exactly-once execution guarantees
  • Complex state machines with human-in-the-loop
  • Scheduled and recurring workflows

Workflow Definition

from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta

@workflow.defn
class OrderWorkflow:
    def __init__(self):
        self._status = "pending"
        self._order_id: str | None = None

    @workflow.run
    async def run(self, order_data: OrderInput) -> OrderResult:
        self._order_id = await workflow.execute_activity(
            create_order, order_data,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=1)),
        )
        self._status = "processing"

        # Parallel activities
        payment, inventory = await asyncio.gather(
            workflow.execute_activity(process_payment, PaymentInput(order_id=self._order_id), start_to_close_timeout=timedelta(minutes=5)),
            workflow.execute_activity(reserve_inventory, InventoryInput(order_id=self._order_id), start_to_close_timeout=timedelta(minutes=2)),
        )

        self._status = "completed"
        return OrderResult(order_id=self._order_id, payment_id=payment.id)

    @workflow.query
    def get_status(self) -> str:
        return self._status

    @workflow.signal
    async def cancel_order(self, reason: str):
        self._status = "cancelling"
        await workflow.execute_activity(cancel_order_activity, CancelInput(order_id=self._order_id), start_to_close_timeout=timedelta(seconds=30))
        self._status = "cancelled"

Activity Definition

from temporalio import activity
from temporalio.exceptions import ApplicationError

@activity.defn
async def process_payment(input: PaymentInput) -> PaymentResult:
    activity.logger.info(f"Processing payment for order {input.order_id}")
    try:
        async with httpx.AsyncClient() as client:
            response = await client.post("https://payments.example.com/charge", json={"order_id": input.order_id, "amount": input.amount})
            response.raise_for_status()
            return PaymentResult(**response.json())
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 402:
            raise ApplicationError("Payment declined", non_retryable=True, type="PaymentDeclined")
        raise

@activity.defn
async def send_notification(input: NotificationInput) -> None:
    for i, recipient in enumerate(input.recipients):
        activity.heartbeat(f"Sending {i+1}/{len(input.recipients)}")  # For long operations
        await send_email(recipient, input.subject, input.body)

Worker and Client

from temporalio.client import Client
from temporalio.worker import Worker

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
        client,
        task_queue="order-processing",
        workflows=[OrderWorkflow],
        activities=[create_order, process_payment, reserve_inventory, cancel_order_activity],
    )
    await worker.run()

async def start_order_workflow(order_data: OrderInput) -> str:
    client = await Client.connect("localhost:7233")
    handle = await client.start_workflow(
        OrderWorkflow.run, order_data,
        id=f"order-{order_data.order_id}",
        task_queue="order-processing",
    )
    return handle.id

async def get_order_status(workflow_id: str) -> str:
    client = await Client.connect("localhost:7233")
    handle = client.get_workflow_handle(workflow_id)
    return await handle.query(OrderWorkflow.get_status)

Saga Pattern with Compensation

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order: OrderInput) -> OrderResult:
        compensations: list[tuple[Callable, Any]] = []

        try:
            reservation = await workflow.execute_activity(reserve_inventory, order.items, start_to_close_timeout=timedelta(minutes=2))
            compensations.append((release_inventory, reservation.id))

            payment = await workflow.execute_activity(charge_payment, PaymentInput(order_id=order.id), start_to_close_timeout=timedelta(minutes=5))
            compensations.append((refund_payment, payment.id))

            shipment = await workflow.execute_activity(create_shipment, ShipmentInput(order_id=order.id), start_to_close_timeout=timedelta(minutes=3))
            return OrderResult(order_id=order.id, payment_id=payment.id, shipment_id=shipment.id)

        except Exception:
            workflow.logger.warning(f"Saga failed, running {len(compensations)} compensations")
            for compensate_fn, compensate_arg in reversed(compensations):
                try:
                    await workflow.execute_activity(compensate_fn, compensate_arg, start_to_close_timeout=timedelta(minutes=2))
                except Exception as e:
                    workflow.logger.error(f"Compensation failed: {e}")
            raise

Timers and Scheduling

@workflow.defn
class TimeoutWorkflow:
    @workflow.run
    async def run(self, input: TaskInput) -> TaskResult:
        try:
            await workflow.wait_condition(lambda: self._approved is not None, timeout=timedelta(hours=24))
        except asyncio.TimeoutError:
            return TaskResult(status="auto_rejected")
        return TaskResult(status="approved" if self._approved else "rejected")

    @workflow.signal
    async def approve(self, approved: bool):
        self._approved = approved

Testing

import pytest
from temporalio.testing import WorkflowEnvironment

@pytest.fixture
async def workflow_env():
    async with await WorkflowEnvironment.start_local() as env:
        yield env

@pytest.mark.asyncio
async def test_order_workflow(workflow_env):
    async with Worker(workflow_env.client, task_queue="test", workflows=[OrderWorkflow], activities=[create_order, process_payment]):
        result = await workflow_env.client.execute_workflow(
            OrderWorkflow.run, OrderInput(id="test-1", total=100),
            id="test-order-1", task_queue="test",
        )
        assert result.order_id == "test-1"

Key Decisions

DecisionRecommendation
Workflow IDBusiness-meaningful, idempotent (e.g., order-{order_id})
Task queuePer-service or per-workflow-type
Activity timeoutstart_to_close for most cases
Retry policy3 attempts default, exponential backoff
HeartbeatingRequired for activities > 60s

Anti-Patterns (FORBIDDEN)

# NEVER do non-deterministic operations in workflows
if random.random() > 0.5:  # Different on replay!
if datetime.now() > deadline:  # Different on replay!

# CORRECT: Use workflow APIs
if await workflow.random() > 0.5:
if workflow.now() > deadline:

# NEVER make network calls directly in workflows
response = await httpx.get("https://api.example.com")  # WRONG!

# CORRECT: Use activities for I/O
response = await workflow.execute_activity(fetch_data, ...)

# NEVER ignore activity idempotency - use upsert with order_id as key
  • saga-patterns - Distributed transaction patterns
  • message-queues - Event-driven integration
  • resilience-patterns - Retry and circuit breaker patterns

Score

Total Score

75/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

+10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon