Back to Architecture & Design Patterns

saga-orchestration

distributed systemsmicroservicestransaction managementevent-driven architectureworkflow automationresilience patternsbackend development
⭐ 36.8kπŸ“„ MITπŸ•’ 2026-06-16Source β†—

Install this skill

npx skills add wshobson/agents

Works across Claude Code, Cursor, Codex, Copilot & Antigravity

Saga Orchestration provides a centralized method for managing distributed transactions that span multiple microservices. Instead of relying on atomic database locks across different systems, which creates performance bottlenecks, this pattern sequences a series of local transactions. Each transaction updates its own service state and publishes an event or command to trigger the next phase. If a failure occurs midway through the process, the orchestrator triggers predefined compensation logic to undo preceding successful steps, ensuring system data consistency. By maintaining an explicit state machine for each workflow, this skill allows developers to track, audit, and debug long-running asynchronous processes without keeping connections open across service boundaries. It provides a structured approach to managing partial failures, retries, and manual intervention steps in distributed architectures.

When to Use This Skill

  • β€’Coordinating multi-stage e-commerce order fulfillment processes
  • β€’Managing financial transfer workflows requiring approval cycles
  • β€’Synchronizing data state changes across distributed storage systems
  • β€’Implementing automated reservation systems where service booking might fail
  • β€’Handling multi-step user onboarding that requires verification of external credentials

How to Invoke This Skill

Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:

  • β€œImplement a saga orchestrator for this transaction flow
  • β€œGenerate a saga step compensation template
  • β€œHow should I handle distributed failure in my microservices?
  • β€œWrite a class for tracking multi-step business process state
  • β€œCreate a saga state machine for my order fulfillment service

Pro Tips

  • πŸ’‘Prioritize Idempotency: Design each step and its compensating transaction to be idempotent to safely handle retries and prevent unintended side effects in a distributed environment.
  • πŸ’‘Monitor Saga State Closely: Implement robust monitoring and alerting for saga states (Started, Pending, Compensating, Completed) to quickly identify and address failures, especially during compensation.
  • πŸ’‘Choose Orchestration vs. Choreography Wisely: While the skill covers both, favor orchestration for more complex sagas with many steps or dynamic branching, as it centralizes control and simplifies debugging compared to choreography.

What this skill does

  • β€’Centralized state machine management for distributed workflows
  • β€’Automated execution of compensation logic upon task failure
  • β€’Tracking and persistence of transaction status across asynchronous steps
  • β€’Decoupled communication between autonomous service actors
  • β€’Serialized execution logic for complex business requirements

When not to use it

  • βœ•Simple workflows that can be handled within a single atomic database transaction
  • βœ•Applications where low-latency synchronous response is the only requirement
  • βœ•Systems with high-frequency, short-lived operations where overhead outweighs benefit

Example workflow

  1. Define a set of saga steps containing the action and corresponding compensation
  2. Initialize an orchestrator with a persistence store and event bus
  3. Start the saga instance to trigger the first atomic step
  4. Handle step completion events to transition the state and trigger the next step
  5. Execute compensation logic in reverse order if any step reports failure

Prerequisites

  • –An established event bus or message broker
  • –A persistent data storage layer for maintaining state
  • –Defined compensating actions for every primary business action

Pitfalls & limitations

  • !Increased complexity in testing and debugging partial failures
  • !Risk of inconsistent state if compensation logic is not idempotent
  • !Operational overhead required to manage the orchestrator's state database

FAQ

How is this different from a distributed transaction?
Distributed transactions typically use two-phase commit protocols that lock resources across systems. Saga orchestration uses sequential local transactions, which avoids long-held locks and improves system scalability.
What happens if a compensation step fails?
Compensation failures usually require manual intervention or a specialized retry policy. The orchestrator records the failure, and the system must alert administrators to resolve the data discrepancy.
Must all steps be asynchronous?
While usually implemented asynchronously, the orchestration pattern can support synchronous communication. However, asynchronous event-driven flows are preferred to prevent cascading timeouts across services.

How it compares

Unlike manual procedural code which often lacks centralized failure tracking, Saga Orchestration uses a formal state machine to ensure every action has an explicit recovery path.

Source & trust

⭐ 37k starsπŸ“„ MITπŸ•’ Updated 2026-06-16
πŸ“„ Full skill instructions β€” original source: wshobson/agents
# Saga Orchestration

Patterns for managing distributed transactions and long-running business processes.

## When to Use This Skill

- Coordinating multi-service transactions
- Implementing compensating transactions
- Managing long-running business workflows
- Handling failures in distributed systems
- Building order fulfillment processes
- Implementing approval workflows

## Core Concepts

### 1. Saga Types

Choreography                    Orchestration
β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Svc A│─►│Svc B│─►│Svc Cβ”‚ β”‚ Orchestratorβ”‚
β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚ β”‚ β”‚
β–Ό β–Ό β–Ό β”Œβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”
Event Event Event β–Ό β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”
β”‚Svc1β”‚β”‚Svc2β”‚β”‚Svc3β”‚
β””β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”˜


### 2. Saga Execution States

| State | Description |
| ---------------- | ------------------------------ |
| **Started** | Saga initiated |
| **Pending** | Waiting for step completion |
| **Compensating** | Rolling back due to failure |
| **Completed** | All steps succeeded |
| **Failed** | Saga failed after compensation |

## Templates

### Template 1: Saga Orchestrator Base

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid

class SagaState(Enum):
STARTED = "started"
PENDING = "pending"
COMPENSATING = "compensating"
COMPLETED = "completed"
FAILED = "failed"


@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
result: Optional[Dict] = None
error: Optional[str] = None
executed_at: Optional[datetime] = None
compensated_at: Optional[datetime] = None


@dataclass
class Saga:
saga_id: str
saga_type: str
state: SagaState
data: Dict[str, Any]
steps: List[SagaStep]
current_step: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)


class SagaOrchestrator(ABC):
"""Base class for saga orchestrators."""

def __init__(self, saga_store, event_publisher):
self.saga_store = saga_store
self.event_publisher = event_publisher

@abstractmethod
def define_steps(self, data: Dict) -> List[SagaStep]:
"""Define the saga steps."""
pass

@property
@abstractmethod
def saga_type(self) -> str:
"""Unique saga type identifier."""
pass

async def start(self, data: Dict) -> Saga:
"""Start a new saga."""
saga = Saga(
saga_id=str(uuid.uuid4()),
saga_type=self.saga_type,
state=SagaState.STARTED,
data=data,
steps=self.define_steps(data)
)
await self.saga_store.save(saga)
await self._execute_next_step(saga)
return saga

async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
"""Handle successful step completion."""
saga = await self.saga_store.get(saga_id)

# Update step
for step in saga.steps:
if step.name == step_name:
step.status = "completed"
step.result = result
step.executed_at = datetime.utcnow()
break

saga.current_step += 1
saga.updated_at = datetime.utcnow()

# Check if saga is complete
if saga.current_step >= len(saga.steps):
saga.state = SagaState.COMPLETED
await self.saga_store.save(saga)
await self._on_saga_completed(saga)
else:
saga.state = SagaState.PENDING
await self.saga_store.save(saga)
await self._execute_next_step(saga)

async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
"""Handle step failure - start compensation."""
saga = await self.saga_store.get(saga_id)

# Mark step as failed
for step in saga.steps:
if step.name == step_name:
step.status = "failed"
step.error = error
break

saga.state = SagaState.COMPENSATING
saga.updated_at = datetime.utcnow()
await self.saga_store.save(saga)

# Start compensation from current step backwards
await self._compensate(saga)

async def _execute_next_step(self, saga: Saga):
"""Execute the next step in the saga."""
if saga.current_step >= len(saga.steps):
return

step = saga.steps[saga.current_step]
step.status = "executing"
await self.saga_store.save(saga)

# Publish command to execute step
await self.event_publisher.publish(
step.action,
{
"saga_id": saga.saga_id,
"step_name": step.name,
**saga.data
}
)

async def _compensate(self, saga: Saga):
"""Execute compensation for completed steps."""
# Compensate in reverse order
for i in range(saga.current_step - 1, -1, -1):
step = saga.steps[i]
if step.status == "completed":
step.status = "compensating"
await self.saga_store.save(saga)

await self.event_publisher.publish(
step.compensation,
{
"saga_id": saga.saga_id,
"step_name": step.name,
"original_result": step.result,
**saga.data
}
)

async def handle_compensation_completed(self, saga_id: str, step_name: str):
"""Handle compensation completion."""
saga = await self.saga_store.get(saga_id)

for step in saga.steps:
if step.name == step_name:
step.status = "compensated"
step.compensated_at = datetime.utcnow()
break

# Check if all compensations complete
all_compensated = all(
s.status in ("compensated", "pending", "failed")
for s in saga.steps
)

if all_compensated:
saga.state = SagaState.FAILED
await self._on_saga_failed(saga)

await self.saga_store.save(saga)

async def _on_saga_completed(self, saga: Saga):
"""Called when saga completes successfully."""
await self.event_publisher.publish(
f"{self.saga_type}Completed",
{"saga_id": saga.saga_id, **saga.data}
)

async def _on_saga_failed(self, saga: Saga):
"""Called when saga fails after compensation."""
await self.event_publisher.publish(
f"{self.saga_type}Failed",
{"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}
)


### Template 2: Order Fulfillment Saga

class OrderFulfillmentSaga(SagaOrchestrator):
"""Orchestrates order fulfillment across services."""

@property
def saga_type(self) -> str:
return "OrderFulfillment"

def define_steps(self, data: Dict) -> List[SagaStep]:
return [
SagaStep(
name="reserve_inventory",
action="InventoryService.ReserveItems",
compensation="InventoryService.ReleaseReservation"
),
SagaStep(
name="process_payment",
action="PaymentService.ProcessPayment",
compensation="PaymentService.RefundPayment"
),
SagaStep(
name="create_shipment",
action="ShippingService.CreateShipment",
compensation="ShippingService.CancelShipment"
),
SagaStep(
name="send_confirmation",
action="NotificationService.SendOrderConfirmation",
compensation="NotificationService.SendCancellationNotice"
)
]


# Usage
async def create_order(order_data: Dict):
saga = OrderFulfillmentSaga(saga_store, event_publisher)
return await saga.start({
"order_id": order_data["order_id"],
"customer_id": order_data["customer_id"],
"items": order_data["items"],
"payment_method": order_data["payment_method"],
"shipping_address": order_data["shipping_address"]
})


# Event handlers in each service
class InventoryService:
async def handle_reserve_items(self, command: Dict):
try:
# Reserve inventory
reservation = await self.reserve(
command["items"],
command["order_id"]
)
# Report success
await self.event_publisher.publish(
"SagaStepCompleted",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
}
)
except InsufficientInventoryError as e:
await self.event_publisher.publish(
"SagaStepFailed",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"error": str(e)
}
)

async def handle_release_reservation(self, command: Dict):
# Compensating action
await self.release_reservation(
command["original_result"]["reservation_id"]
)
await self.event_publisher.publish(
"SagaCompensationCompleted",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
}
)


### Template 3: Choreography-Based Saga

from dataclasses import dataclass
from typing import Dict, Any
import asyncio

@dataclass
class SagaContext:
"""Passed through choreographed saga events."""
saga_id: str
step: int
data: Dict[str, Any]
completed_steps: list


class OrderChoreographySaga:
"""Choreography-based saga using events."""

def __init__(self, event_bus):
self.event_bus = event_bus
self._register_handlers()

def _register_handlers(self):
self.event_bus.subscribe("OrderCreated", self._on_order_created)
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)

# Compensation handlers
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)

async def _on_order_created(self, event: Dict):
"""Step 1: Order created, reserve inventory."""
await self.event_bus.publish("ReserveInventory", {
"saga_id": event["order_id"],
"order_id": event["order_id"],
"items": event["items"]
})

async def _on_inventory_reserved(self, event: Dict):
"""Step 2: Inventory reserved, process payment."""
await self.event_bus.publish("ProcessPayment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"amount": event["total_amount"],
"reservation_id": event["reservation_id"]
})

async def _on_payment_processed(self, event: Dict):
"""Step 3: Payment done, create shipment."""
await self.event_bus.publish("CreateShipment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"payment_id": event["payment_id"]
})

async def _on_shipment_created(self, event: Dict):
"""Step 4: Complete - send confirmation."""
await self.event_bus.publish("OrderFulfilled", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"tracking_number": event["tracking_number"]
})

# Compensation handlers
async def _on_payment_failed(self, event: Dict):
"""Payment failed - release inventory."""
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"]
})
await self.event_bus.publish("OrderFailed", {
"order_id": event["order_id"],
"reason": "Payment failed"
})

async def _on_shipment_failed(self, event: Dict):
"""Shipment failed - refund payment and release inventory."""
await self.event_bus.publish("RefundPayment", {
"saga_id": event["saga_id"],
"payment_id": event["payment_id"]
})
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"]
})


### Template 4: Saga with Timeouts

class TimeoutSagaOrchestrator(SagaOrchestrator):
"""Saga orchestrator with step timeouts."""

def __init__(self, saga_store, event_publisher, scheduler):
super().__init__(saga_store, event_publisher)
self.scheduler = scheduler

async def _execute_next_step(self, saga: Saga):
if saga.current_step >= len(saga.steps):
return

step = saga.steps[saga.current_step]
step.status = "executing"
step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
await self.saga_store.save(saga)

# Schedule timeout check
await self.scheduler.schedule(
f"saga_timeout_{saga.saga_id}_{step.name}",
self._check_timeout,
{"saga_id": saga.saga_id, "step_name": step.name},
run_at=step.timeout_at
)

await self.event_publisher.publish(
step.action,
{"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
)

async def _check_timeout(self, data: Dict):
"""Check if step has timed out."""
saga = await self.saga_store.get(data["saga_id"])
step = next(s for s in saga.steps if s.name == data["step_name"])

if step.status == "executing":
# Step timed out - fail it
await self.handle_step_failed(
data["saga_id"],
data["step_name"],
"Step timed out"
)


## Best Practices

### Do's

- **Make steps idempotent** - Safe to retry
- **Design compensations carefully** - They must work
- **Use correlation IDs** - For tracing across services
- **Implement timeouts** - Don't wait forever
- **Log everything** - For debugging failures

### Don'ts

- **Don't assume instant completion** - Sagas take time
- **Don't skip compensation testing** - Most critical part
- **Don't couple services** - Use async messaging
- **Don't ignore partial failures** - Handle gracefully

## Resources

- [Saga Pattern](https://microservices.io/patterns/data/saga.html)
- [Designing Data-Intensive Applications](https://dataintensive.net/)

How to Use This Skill Unit

Option A: Project-Specific (Recommended)

  1. Click "Download" above
  2. In your project, create the directory: .agent/skills/saga-orchestration/
  3. Save the file as SKILL.md
  4. The agent will automatically discover the skill based on its description.

Option B: Global Installation (All Agents)

Save the file to these locations to make it available across all projects:

  • Claude Code: ~/.claude/skills/wshobson/agents/saga-orchestration/SKILL.md
  • Cursor: ~/.cursor/skills/wshobson/agents/saga-orchestration/SKILL.md
  • Antigravity: ~/.gemini/antigravity/skills/wshobson/agents/saga-orchestration/SKILL.md

πŸš€ Install with CLI:
npx skills add wshobson/agents

Read the Master Guide: Mastering Agent Skills β†’

Recommended Rules

View more rules β†’

Recommended Workflows

View more workflows β†’

Recommended MCP Servers

View more MCP servers β†’

Take It Further

Maximize your productivity with these powerful resources

πŸ“‹

Define Your Standards

Set up coding standards to ensure this workflow produces consistent, high-quality results.

Browse Rules Library
πŸ“–

Master Workflows

Learn how to create custom workflows, use Turbo Mode, and build your automation library.

Complete Guide

How to use this Skill in Claude Code & Cursor

For Claude Code (CLI)

To use this skill in Claude Code, copy the rule content into your project's custom instructions or follow our Add-Skill CLI guide. This ensures Claude follows your standards during every code generation.

For Cursor & Windsurf

For Cursor or Windsurf, individual skills are best used in the "Rules for AI" section. This specific unit helps the agent avoid architecture & design patterns issues, leading to cleaner, more efficient code.

Why the skill format matters: the standardized Agent Skills format lets your AI agent load detailed instructions only when they are relevant, keeping your prompt clean while improving results.

Source & attribution

This skill is categorized under Architecture & Design Patterns and is published by W. Shobson, maintained in wshobson/agents.

← Browse All Agent Skills
Sponsored AI assistant. Recommendations may be paid.