Back to Architecture & Design Patterns

projection-patterns

event sourcingCQRSread modelsprojectionsmaterialized viewsevent-driven architecturedata optimizationbackend development
⭐ 36.8kπŸ“„ MITπŸ•’ 2026-06-16Source β†—

Install this skill

npx skills add wshobson/agents

Works across Claude Code, Cursor, Codex, Copilot & Antigravity

Projection patterns transform raw event streams into query-optimized read models, decoupling write-heavy event stores from read-heavy interfaces. This architectural style manages state representation by consuming events through projecters that update relational or document-based views. By tracking global positions via checkpoints, the system maintains synchronization between the event log and the materialized view, enabling efficient data retrieval for complex domain logic. This approach addresses the inherent latency of event sourcing by decoupling the write model from the read model, allowing read models to be rebuilt or discarded as indexing requirements evolve. The framework provides a structured pathway for handling event-driven state transitions, ensuring that distributed systems remain consistent, queryable, and performant without impacting the underlying event store logic.

When to Use This Skill

  • β€’Generating order summary dashboards from distributed event streams
  • β€’Building indexed search views from complex event-sourced entities
  • β€’Creating real-time reporting models across independent microservices
  • β€’Optimizing data retrieval by denormalizing event attributes into relational views

How to Invoke This Skill

Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:

  • β€œCreate a read model for my event-sourced system
  • β€œSet up an event projector to handle order status updates
  • β€œHow to rebuild my projection table from event history
  • β€œImplement checkpointing for an asynchronous event handler
  • β€œDesign a materialization pattern for raw events

Pro Tips

  • πŸ’‘Always design your projectors to be idempotent to safely reprocess events, crucial for rebuilding read models or recovering from errors.
  • πŸ’‘Consider schema versioning for your events and read models to manage evolution gracefully over time, preventing breaking changes.
  • πŸ’‘Optimize your read model database with appropriate indexes for frequently queried fields to ensure peak performance for user-facing applications.

What this skill does

  • β€’Materializes event streams into query-optimized database tables
  • β€’Maintains checkpoint-based state tracking for resumable indexing
  • β€’Supports live, catchup, and persistent projection modes
  • β€’Separates read-side schema definitions from domain event structures
  • β€’Facilitates the rebuilding of read models from historical event logs

When not to use it

  • βœ•Simple applications that do not require an event-sourced architecture
  • βœ•Scenarios requiring immediate, strict transactionality across the write and read models

Example workflow

  1. Define a concrete class inheriting from the base Projection interface
  2. Implement the apply method to handle specific domain event types
  3. Register the projection within the Projector runner instance
  4. Initialize the checkpoint store to track the last processed event position
  5. Start the runner loop to process events and update the database schema

Prerequisites

  • –Existing event store implementation
  • –Persistence layer for read models and checkpoints
  • –Asynchronous processing capabilities

Pitfalls & limitations

  • !Eventual consistency delays between event emission and view updates
  • !Increased operational complexity when managing multiple projection versions
  • !Potential performance bottlenecks if the projection logic performs heavy external I/O

FAQ

How does this differ from standard CRUD updates?
Projections derive data from historical event logs rather than direct entity manipulation, allowing the read model to be recreated at any time.
Can I have multiple projections for one event stream?
Yes, you can register multiple independent projections that listen to the same stream to build different types of read models.
What happens if a projector fails?
Because projectors use checkpointing, they can resume from the last successful event processed once the service restarts.

How it compares

Unlike manual SQL queries against an event table, this pattern creates specialized, optimized schema views that prevent complex join operations at read-time.

Source & trust

⭐ 37k starsπŸ“„ MITπŸ•’ Updated 2026-06-16
πŸ“„ Full skill instructions β€” original source: wshobson/agents
# Projection Patterns

Comprehensive guide to building projections and read models for event-sourced systems.

## When to Use This Skill

- Building CQRS read models
- Creating materialized views from events
- Optimizing query performance
- Implementing real-time dashboards
- Building search indexes from events
- Aggregating data across streams

## Core Concepts

### 1. Projection Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Event Store │────►│ Projector │────►│ Read Model β”‚
β”‚ β”‚ β”‚ β”‚ β”‚ (Database) β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Events β”‚ β”‚ β”‚ β”‚ Handler β”‚ β”‚ β”‚ β”‚ Tables β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ Logic β”‚ β”‚ β”‚ β”‚ Views β”‚ β”‚
β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ Cache β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜


### 2. Projection Types

| Type | Description | Use Case |
| -------------- | --------------------------- | ---------------------- |
| **Live** | Real-time from subscription | Current state queries |
| **Catchup** | Process historical events | Rebuilding read models |
| **Persistent** | Stores checkpoint | Resume after restart |
| **Inline** | Same transaction as write | Strong consistency |

## Templates

### Template 1: Basic Projector

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, Callable, List
import asyncpg

@dataclass
class Event:
stream_id: str
event_type: str
data: dict
version: int
global_position: int


class Projection(ABC):
"""Base class for projections."""

@property
@abstractmethod
def name(self) -> str:
"""Unique projection name for checkpointing."""
pass

@abstractmethod
def handles(self) -> List[str]:
"""List of event types this projection handles."""
pass

@abstractmethod
async def apply(self, event: Event) -> None:
"""Apply event to the read model."""
pass


class Projector:
"""Runs projections from event store."""

def __init__(self, event_store, checkpoint_store):
self.event_store = event_store
self.checkpoint_store = checkpoint_store
self.projections: List[Projection] = []

def register(self, projection: Projection):
self.projections.append(projection)

async def run(self, batch_size: int = 100):
"""Run all projections continuously."""
while True:
for projection in self.projections:
await self._run_projection(projection, batch_size)
await asyncio.sleep(0.1)

async def _run_projection(self, projection: Projection, batch_size: int):
checkpoint = await self.checkpoint_store.get(projection.name)
position = checkpoint or 0

events = await self.event_store.read_all(position, batch_size)

for event in events:
if event.event_type in projection.handles():
await projection.apply(event)

await self.checkpoint_store.save(
projection.name,
event.global_position
)

async def rebuild(self, projection: Projection):
"""Rebuild a projection from scratch."""
await self.checkpoint_store.delete(projection.name)
# Optionally clear read model tables
await self._run_projection(projection, batch_size=1000)


### Template 2: Order Summary Projection

class OrderSummaryProjection(Projection):
"""Projects order events to a summary read model."""

def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool

@property
def name(self) -> str:
return "order_summary"

def handles(self) -> List[str]:
return [
"OrderCreated",
"OrderItemAdded",
"OrderItemRemoved",
"OrderShipped",
"OrderCompleted",
"OrderCancelled"
]

async def apply(self, event: Event) -> None:
handlers = {
"OrderCreated": self._handle_created,
"OrderItemAdded": self._handle_item_added,
"OrderItemRemoved": self._handle_item_removed,
"OrderShipped": self._handle_shipped,
"OrderCompleted": self._handle_completed,
"OrderCancelled": self._handle_cancelled,
}

handler = handlers.get(event.event_type)
if handler:
await handler(event)

async def _handle_created(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO order_summaries
(order_id, customer_id, status, total_amount, item_count, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
""",
event.data['order_id'],
event.data['customer_id'],
'pending',
0,
0,
event.data['created_at']
)

async def _handle_item_added(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET total_amount = total_amount + $2,
item_count = item_count + 1,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['price'] * event.data['quantity']
)

async def _handle_item_removed(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET total_amount = total_amount - $2,
item_count = item_count - 1,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['price'] * event.data['quantity']
)

async def _handle_shipped(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'shipped',
shipped_at = $2,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['shipped_at']
)

async def _handle_completed(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'completed',
completed_at = $2,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['completed_at']
)

async def _handle_cancelled(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'cancelled',
cancelled_at = $2,
cancellation_reason = $3,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['cancelled_at'],
event.data.get('reason')
)


### Template 3: Elasticsearch Search Projection

from elasticsearch import AsyncElasticsearch

class ProductSearchProjection(Projection):
"""Projects product events to Elasticsearch for full-text search."""

def __init__(self, es_client: AsyncElasticsearch):
self.es = es_client
self.index = "products"

@property
def name(self) -> str:
return "product_search"

def handles(self) -> List[str]:
return [
"ProductCreated",
"ProductUpdated",
"ProductPriceChanged",
"ProductDeleted"
]

async def apply(self, event: Event) -> None:
if event.event_type == "ProductCreated":
await self.es.index(
index=self.index,
id=event.data['product_id'],
document={
'name': event.data['name'],
'description': event.data['description'],
'category': event.data['category'],
'price': event.data['price'],
'tags': event.data.get('tags', []),
'created_at': event.data['created_at']
}
)

elif event.event_type == "ProductUpdated":
await self.es.update(
index=self.index,
id=event.data['product_id'],
doc={
'name': event.data['name'],
'description': event.data['description'],
'category': event.data['category'],
'tags': event.data.get('tags', []),
'updated_at': event.data['updated_at']
}
)

elif event.event_type == "ProductPriceChanged":
await self.es.update(
index=self.index,
id=event.data['product_id'],
doc={
'price': event.data['new_price'],
'price_updated_at': event.data['changed_at']
}
)

elif event.event_type == "ProductDeleted":
await self.es.delete(
index=self.index,
id=event.data['product_id']
)


### Template 4: Aggregating Projection

class DailySalesProjection(Projection):
"""Aggregates sales data by day for reporting."""

def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool

@property
def name(self) -> str:
return "daily_sales"

def handles(self) -> List[str]:
return ["OrderCompleted", "OrderRefunded"]

async def apply(self, event: Event) -> None:
if event.event_type == "OrderCompleted":
await self._increment_sales(event)
elif event.event_type == "OrderRefunded":
await self._decrement_sales(event)

async def _increment_sales(self, event: Event):
date = event.data['completed_at'][:10] # YYYY-MM-DD
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO daily_sales (date, total_orders, total_revenue, total_items)
VALUES ($1, 1, $2, $3)
ON CONFLICT (date) DO UPDATE SET
total_orders = daily_sales.total_orders + 1,
total_revenue = daily_sales.total_revenue + $2,
total_items = daily_sales.total_items + $3,
updated_at = NOW()
""",
date,
event.data['total_amount'],
event.data['item_count']
)

async def _decrement_sales(self, event: Event):
date = event.data['original_completed_at'][:10]
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE daily_sales SET
total_orders = total_orders - 1,
total_revenue = total_revenue - $2,
total_refunds = total_refunds + $2,
updated_at = NOW()
WHERE date = $1
""",
date,
event.data['refund_amount']
)


### Template 5: Multi-Table Projection

class CustomerActivityProjection(Projection):
"""Projects customer activity across multiple tables."""

def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool

@property
def name(self) -> str:
return "customer_activity"

def handles(self) -> List[str]:
return [
"CustomerCreated",
"OrderCompleted",
"ReviewSubmitted",
"CustomerTierChanged"
]

async def apply(self, event: Event) -> None:
async with self.pool.acquire() as conn:
async with conn.transaction():
if event.event_type == "CustomerCreated":
# Insert into customers table
await conn.execute(
"""
INSERT INTO customers (customer_id, email, name, tier, created_at)
VALUES ($1, $2, $3, 'bronze', $4)
""",
event.data['customer_id'],
event.data['email'],
event.data['name'],
event.data['created_at']
)
# Initialize activity summary
await conn.execute(
"""
INSERT INTO customer_activity_summary
(customer_id, total_orders, total_spent, total_reviews)
VALUES ($1, 0, 0, 0)
""",
event.data['customer_id']
)

elif event.event_type == "OrderCompleted":
# Update activity summary
await conn.execute(
"""
UPDATE customer_activity_summary SET
total_orders = total_orders + 1,
total_spent = total_spent + $2,
last_order_at = $3
WHERE customer_id = $1
""",
event.data['customer_id'],
event.data['total_amount'],
event.data['completed_at']
)
# Insert into order history
await conn.execute(
"""
INSERT INTO customer_order_history
(customer_id, order_id, amount, completed_at)
VALUES ($1, $2, $3, $4)
""",
event.data['customer_id'],
event.data['order_id'],
event.data['total_amount'],
event.data['completed_at']
)

elif event.event_type == "ReviewSubmitted":
await conn.execute(
"""
UPDATE customer_activity_summary SET
total_reviews = total_reviews + 1,
last_review_at = $2
WHERE customer_id = $1
""",
event.data['customer_id'],
event.data['submitted_at']
)

elif event.event_type == "CustomerTierChanged":
await conn.execute(
"""
UPDATE customers SET tier = $2, updated_at = NOW()
WHERE customer_id = $1
""",
event.data['customer_id'],
event.data['new_tier']
)


## Best Practices

### Do's

- **Make projections idempotent** - Safe to replay
- **Use transactions** - For multi-table updates
- **Store checkpoints** - Resume after failures
- **Monitor lag** - Alert on projection delays
- **Plan for rebuilds** - Design for reconstruction

### Don'ts

- **Don't couple projections** - Each is independent
- **Don't skip error handling** - Log and alert on failures
- **Don't ignore ordering** - Events must be processed in order
- **Don't over-normalize** - Denormalize for query patterns

## Resources

- [CQRS Pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/cqrs)
- [Projection Building Blocks](https://zimarev.com/blog/event-sourcing/projections/)

How to Use This Skill Unit

Option A: Project-Specific (Recommended)

  1. Click "Download" above
  2. In your project, create the directory: .agent/skills/projection-patterns/
  3. Save the file as SKILL.md
  4. The agent will automatically discover the skill based on its description.

Option B: Global Installation (All Agents)

Save the file to these locations to make it available across all projects:

  • Claude Code: ~/.claude/skills/wshobson/agents/projection-patterns/SKILL.md
  • Cursor: ~/.cursor/skills/wshobson/agents/projection-patterns/SKILL.md
  • Antigravity: ~/.gemini/antigravity/skills/wshobson/agents/projection-patterns/SKILL.md

πŸš€ Install with CLI:
npx skills add wshobson/agents

Read the Master Guide: Mastering Agent Skills β†’

Recommended Rules

View more rules β†’

Recommended Workflows

View more workflows β†’

Recommended MCP Servers

View more MCP servers β†’

Take It Further

Maximize your productivity with these powerful resources

πŸ“‹

Define Your Standards

Set up coding standards to ensure this workflow produces consistent, high-quality results.

Browse Rules Library
πŸ“–

Master Workflows

Learn how to create custom workflows, use Turbo Mode, and build your automation library.

Complete Guide

How to use this Skill in Claude Code & Cursor

For Claude Code (CLI)

To use this skill in Claude Code, copy the rule content into your project's custom instructions or follow our Add-Skill CLI guide. This ensures Claude follows your standards during every code generation.

For Cursor & Windsurf

For Cursor or Windsurf, individual skills are best used in the "Rules for AI" section. This specific unit helps the agent avoid architecture & design patterns issues, leading to cleaner, more efficient code.

Why the skill format matters: the standardized Agent Skills format lets your AI agent load detailed instructions only when they are relevant, keeping your prompt clean while improving results.

Source & attribution

This skill is categorized under Architecture & Design Patterns and is published by W. Shobson, maintained in wshobson/agents.

← Browse All Agent Skills
Sponsored AI assistant. Recommendations may be paid.