Back to Architecture & Design Patterns

cqrs-implementation

CQRSarchitecture patternsevent sourcingbackendscalabilitydistributed systemsmicroservicesdata modeling
⭐ 36.8kπŸ“„ MITπŸ•’ 2026-06-16Source β†—

Install this skill

npx skills add wshobson/agents

Works across Claude Code, Cursor, Codex, Copilot & Antigravity

The CQRS (Command Query Responsibility Segregation) implementation skill provides a structured framework to split read and write logic within an application. By treating state mutations (commands) as distinct from state retrieval (queries), this pattern avoids the performance bottlenecks inherent in monolithic models where the same object handles both operational tasks and data fetching. The implementation requires defining distinct command buses, specific handlers for state transitions, and a projection mechanism to keep read models synchronized with underlying event stores or write databases. This separation allows developers to scale read throughput independently from write operations and provides the flexibility to create read-optimized schemas for complex reporting or UI views, ultimately decoupling the data model from the application's intent.

When to Use This Skill

  • β€’High-traffic e-commerce systems where read volume significantly exceeds write volume
  • β€’Complex domain models requiring multiple, customized views of the same data
  • β€’Applications implementing audit logging or event-driven state history
  • β€’Systems requiring specialized reporting databases separated from operational transaction stores

How to Invoke This Skill

Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:

  • β€œImplement CQRS architecture for my service
  • β€œCreate a command handler pattern for handling state changes
  • β€œSet up a command bus and query dispatcher
  • β€œGenerate code for event sourcing and projection
  • β€œRefactor my monolithic service to follow command and query separation

Pro Tips

  • πŸ’‘Start with a bounded context: Apply CQRS to specific, complex domains rather than the entire application to manage complexity effectively.
  • πŸ’‘Consider eventual consistency: Understand that read models might lag slightly behind write models and design your UI/UX accordingly to handle this.
  • πŸ’‘Choose your persistence wisely: Use appropriate data stores for your read models (e.g., NoSQL for speed) and write models (e.g., relational for transactions).

What this skill does

  • β€’Defines a structured Command Bus for routing state change requests
  • β€’Implements asynchronous projection patterns to update read models from write-side events
  • β€’Separates business validation logic from retrieval logic using specific handler classes
  • β€’Supports event sourcing to maintain a granular history of system changes
  • β€’Decouples data schemas into specialized read-optimized structures

When not to use it

  • βœ•Simple CRUD applications with low complexity
  • βœ•Small-scale projects where extra architectural layers increase development overhead without performance benefits

Example workflow

  1. Define a concrete Command class for the desired state change
  2. Implement a CommandHandler that validates the input and modifies the aggregate
  3. Publish an event confirming the state change to the event store
  4. Configure a Projector to consume the event and update the query-optimized read model
  5. Dispatch the command via the CommandBus to initiate the flow

Prerequisites

  • –Knowledge of domain-driven design principles
  • –Basic understanding of event-driven architecture
  • –A messaging or event bus implementation

Pitfalls & limitations

  • !Eventual consistency issues between the write model and read models
  • !Increased architectural complexity and code volume
  • !Debugging difficulty when tracing logic across async boundaries

FAQ

Why is there a delay between a command and the corresponding update in the query model?
CQRS often uses asynchronous projection to update read models, leading to eventual consistency where the data takes a few milliseconds to catch up to the state change.
Can I use CQRS without event sourcing?
Yes. While CQRS and event sourcing are frequently paired, you can implement CQRS by simply segregating your command and query databases without storing events.
Does CQRS require a specific database for reads?
Not strictly, but it excels when using different storage technologies, such as a relational database for commands and a search-optimized or NoSQL store for queries.

How it compares

Unlike manual CRUD, which couples read and write models, this skill forces a rigid separation that enables independent scaling and specialized schema optimization for read-heavy operations.

Source & trust

⭐ 37k starsπŸ“„ MITπŸ•’ Updated 2026-06-16
πŸ“„ Full skill instructions β€” original source: wshobson/agents
# CQRS Implementation

Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns.

## When to Use This Skill

- Separating read and write concerns
- Scaling reads independently from writes
- Building event-sourced systems
- Optimizing complex query scenarios
- Different read/write data models needed
- High-performance reporting requirements

## Core Concepts

### 1. CQRS Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Client β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ β”‚
β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Commands β”‚ β”‚ Queries β”‚
β”‚ API β”‚ β”‚ API β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚
β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Command β”‚ β”‚ Query β”‚
β”‚ Handlers β”‚ β”‚ Handlers β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚
β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Write │─────────►│ Read β”‚
β”‚ Model β”‚ Events β”‚ Model β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜


### 2. Key Components

| Component | Responsibility |
| ------------------- | ------------------------------- |
| **Command** | Intent to change state |
| **Command Handler** | Validates and executes commands |
| **Event** | Record of state change |
| **Query** | Request for data |
| **Query Handler** | Retrieves data from read model |
| **Projector** | Updates read model from events |

## Templates

### Template 1: Command Infrastructure

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, Dict, Any, Type
from datetime import datetime
import uuid

# Command base
@dataclass
class Command:
command_id: str = None
timestamp: datetime = None

def __post_init__(self):
self.command_id = self.command_id or str(uuid.uuid4())
self.timestamp = self.timestamp or datetime.utcnow()


# Concrete commands
@dataclass
class CreateOrder(Command):
customer_id: str
items: list
shipping_address: dict


@dataclass
class AddOrderItem(Command):
order_id: str
product_id: str
quantity: int
price: float


@dataclass
class CancelOrder(Command):
order_id: str
reason: str


# Command handler base
T = TypeVar('T', bound=Command)

class CommandHandler(ABC, Generic[T]):
@abstractmethod
async def handle(self, command: T) -> Any:
pass


# Command bus
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}

def register(self, command_type: Type[Command], handler: CommandHandler):
self._handlers[command_type] = handler

async def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command).__name__}")
return await handler.handle(command)


# Command handler implementation
class CreateOrderHandler(CommandHandler[CreateOrder]):
def __init__(self, order_repository, event_store):
self.order_repository = order_repository
self.event_store = event_store

async def handle(self, command: CreateOrder) -> str:
# Validate
if not command.items:
raise ValueError("Order must have at least one item")

# Create aggregate
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address
)

# Persist events
await self.event_store.append_events(
stream_id=f"Order-{order.id}",
stream_type="Order",
events=order.uncommitted_events
)

return order.id


### Template 2: Query Infrastructure

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, List, Optional

# Query base
@dataclass
class Query:
pass


# Concrete queries
@dataclass
class GetOrderById(Query):
order_id: str


@dataclass
class GetCustomerOrders(Query):
customer_id: str
status: Optional[str] = None
page: int = 1
page_size: int = 20


@dataclass
class SearchOrders(Query):
query: str
filters: dict = None
sort_by: str = "created_at"
sort_order: str = "desc"


# Query result types
@dataclass
class OrderView:
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: datetime
shipped_at: Optional[datetime] = None


@dataclass
class PaginatedResult(Generic[T]):
items: List[T]
total: int
page: int
page_size: int

@property
def total_pages(self) -> int:
return (self.total + self.page_size - 1) // self.page_size


# Query handler base
T = TypeVar('T', bound=Query)
R = TypeVar('R')

class QueryHandler(ABC, Generic[T, R]):
@abstractmethod
async def handle(self, query: T) -> R:
pass


# Query bus
class QueryBus:
def __init__(self):
self._handlers: Dict[Type[Query], QueryHandler] = {}

def register(self, query_type: Type[Query], handler: QueryHandler):
self._handlers[query_type] = handler

async def dispatch(self, query: Query) -> Any:
handler = self._handlers.get(type(query))
if not handler:
raise ValueError(f"No handler for {type(query).__name__}")
return await handler.handle(query)


# Query handler implementation
class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]):
def __init__(self, read_db):
self.read_db = read_db

async def handle(self, query: GetOrderById) -> Optional[OrderView]:
async with self.read_db.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT order_id, customer_id, status, total_amount,
item_count, created_at, shipped_at
FROM order_views
WHERE order_id = $1
""",
query.order_id
)
if row:
return OrderView(**dict(row))
return None


class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]):
def __init__(self, read_db):
self.read_db = read_db

async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
async with self.read_db.acquire() as conn:
# Build query with optional status filter
where_clause = "customer_id = $1"
params = [query.customer_id]

if query.status:
where_clause += " AND status = $2"
params.append(query.status)

# Get total count
total = await conn.fetchval(
f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
*params
)

# Get paginated results
offset = (query.page - 1) * query.page_size
rows = await conn.fetch(
f"""
SELECT order_id, customer_id, status, total_amount,
item_count, created_at, shipped_at
FROM order_views
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
""",
*params, query.page_size, offset
)

return PaginatedResult(
items=[OrderView(**dict(row)) for row in rows],
total=total,
page=query.page,
page_size=query.page_size
)


### Template 3: FastAPI CQRS Application

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()

# Request/Response models
class CreateOrderRequest(BaseModel):
customer_id: str
items: List[dict]
shipping_address: dict


class OrderResponse(BaseModel):
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: datetime


# Dependency injection
def get_command_bus() -> CommandBus:
return app.state.command_bus


def get_query_bus() -> QueryBus:
return app.state.query_bus


# Command endpoints (POST, PUT, DELETE)
@app.post("/orders", response_model=dict)
async def create_order(
request: CreateOrderRequest,
command_bus: CommandBus = Depends(get_command_bus)
):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items,
shipping_address=request.shipping_address
)
order_id = await command_bus.dispatch(command)
return {"order_id": order_id}


@app.post("/orders/{order_id}/items")
async def add_item(
order_id: str,
product_id: str,
quantity: int,
price: float,
command_bus: CommandBus = Depends(get_command_bus)
):
command = AddOrderItem(
order_id=order_id,
product_id=product_id,
quantity=quantity,
price=price
)
await command_bus.dispatch(command)
return {"status": "item_added"}


@app.delete("/orders/{order_id}")
async def cancel_order(
order_id: str,
reason: str,
command_bus: CommandBus = Depends(get_command_bus)
):
command = CancelOrder(order_id=order_id, reason=reason)
await command_bus.dispatch(command)
return {"status": "cancelled"}


# Query endpoints (GET)
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: str,
query_bus: QueryBus = Depends(get_query_bus)
):
query = GetOrderById(order_id=order_id)
result = await query_bus.dispatch(query)
if not result:
raise HTTPException(status_code=404, detail="Order not found")
return result


@app.get("/customers/{customer_id}/orders")
async def get_customer_orders(
customer_id: str,
status: Optional[str] = None,
page: int = 1,
page_size: int = 20,
query_bus: QueryBus = Depends(get_query_bus)
):
query = GetCustomerOrders(
customer_id=customer_id,
status=status,
page=page,
page_size=page_size
)
return await query_bus.dispatch(query)


@app.get("/orders/search")
async def search_orders(
q: str,
sort_by: str = "created_at",
query_bus: QueryBus = Depends(get_query_bus)
):
query = SearchOrders(query=q, sort_by=sort_by)
return await query_bus.dispatch(query)


### Template 4: Read Model Synchronization

class ReadModelSynchronizer:
"""Keeps read models in sync with events."""

def __init__(self, event_store, read_db, projections: List[Projection]):
self.event_store = event_store
self.read_db = read_db
self.projections = {p.name: p for p in projections}

async def run(self):
"""Continuously sync read models."""
while True:
for name, projection in self.projections.items():
await self._sync_projection(projection)
await asyncio.sleep(0.1)

async def _sync_projection(self, projection: Projection):
checkpoint = await self._get_checkpoint(projection.name)

events = await self.event_store.read_all(
from_position=checkpoint,
limit=100
)

for event in events:
if event.event_type in projection.handles():
try:
await projection.apply(event)
except Exception as e:
# Log error, possibly retry or skip
logger.error(f"Projection error: {e}")
continue

await self._save_checkpoint(projection.name, event.global_position)

async def rebuild_projection(self, projection_name: str):
"""Rebuild a projection from scratch."""
projection = self.projections[projection_name]

# Clear existing data
await projection.clear()

# Reset checkpoint
await self._save_checkpoint(projection_name, 0)

# Rebuild
while True:
checkpoint = await self._get_checkpoint(projection_name)
events = await self.event_store.read_all(checkpoint, 1000)

if not events:
break

for event in events:
if event.event_type in projection.handles():
await projection.apply(event)

await self._save_checkpoint(
projection_name,
events[-1].global_position
)


### Template 5: Eventual Consistency Handling

class ConsistentQueryHandler:
"""Query handler that can wait for consistency."""

def __init__(self, read_db, event_store):
self.read_db = read_db
self.event_store = event_store

async def query_after_command(
self,
query: Query,
expected_version: int,
stream_id: str,
timeout: float = 5.0
):
"""
Execute query, ensuring read model is at expected version.
Used for read-your-writes consistency.
"""
start_time = time.time()

while time.time() - start_time < timeout:
# Check if read model is caught up
projection_version = await self._get_projection_version(stream_id)

if projection_version >= expected_version:
return await self.execute_query(query)

# Wait a bit and retry
await asyncio.sleep(0.1)

# Timeout - return stale data with warning
return {
"data": await self.execute_query(query),
"_warning": "Data may be stale"
}

async def _get_projection_version(self, stream_id: str) -> int:
"""Get the last processed event version for a stream."""
async with self.read_db.acquire() as conn:
return await conn.fetchval(
"SELECT last_event_version FROM projection_state WHERE stream_id = $1",
stream_id
) or 0


## Best Practices

### Do's

- **Separate command and query models** - Different needs
- **Use eventual consistency** - Accept propagation delay
- **Validate in command handlers** - Before state change
- **Denormalize read models** - Optimize for queries
- **Version your events** - For schema evolution

### Don'ts

- **Don't query in commands** - Use only for writes
- **Don't couple read/write schemas** - Independent evolution
- **Don't over-engineer** - Start simple
- **Don't ignore consistency SLAs** - Define acceptable lag

## Resources

- [CQRS Pattern](https://martinfowler.com/bliki/CQRS.html)
- [Microsoft CQRS Guidance](https://docs.microsoft.com/en-us/azure/architecture/patterns/cqrs)

How to Use This Skill Unit

Option A: Project-Specific (Recommended)

  1. Click "Download" above
  2. In your project, create the directory: .agent/skills/cqrs-implementation/
  3. Save the file as SKILL.md
  4. The agent will automatically discover the skill based on its description.

Option B: Global Installation (All Agents)

Save the file to these locations to make it available across all projects:

  • Claude Code: ~/.claude/skills/wshobson/agents/cqrs-implementation/SKILL.md
  • Cursor: ~/.cursor/skills/wshobson/agents/cqrs-implementation/SKILL.md
  • Antigravity: ~/.gemini/antigravity/skills/wshobson/agents/cqrs-implementation/SKILL.md

πŸš€ Install with CLI:
npx skills add wshobson/agents

Read the Master Guide: Mastering Agent Skills β†’

Recommended Rules

View more rules β†’

Recommended Workflows

View more workflows β†’

Recommended MCP Servers

View more MCP servers β†’

Take It Further

Maximize your productivity with these powerful resources

πŸ“‹

Define Your Standards

Set up coding standards to ensure this workflow produces consistent, high-quality results.

Browse Rules Library
πŸ“–

Master Workflows

Learn how to create custom workflows, use Turbo Mode, and build your automation library.

Complete Guide

How to use this Skill in Claude Code & Cursor

For Claude Code (CLI)

To use this skill in Claude Code, copy the rule content into your project's custom instructions or follow our Add-Skill CLI guide. This ensures Claude follows your standards during every code generation.

For Cursor & Windsurf

For Cursor or Windsurf, individual skills are best used in the "Rules for AI" section. This specific unit helps the agent avoid architecture & design patterns issues, leading to cleaner, more efficient code.

Why the skill format matters: the standardized Agent Skills format lets your AI agent load detailed instructions only when they are relevant, keeping your prompt clean while improving results.

Source & attribution

This skill is categorized under Architecture & Design Patterns and is published by W. Shobson, maintained in wshobson/agents.

← Browse All Agent Skills
Sponsored AI assistant. Recommendations may be paid.