Back to Architecture & Design Patterns

event-store-design

event sourcingevent storearchitecturedatabasedistributed systemsmicroservicesdata persistencesystem design
⭐ 36.8kπŸ“„ MITπŸ•’ 2026-06-16Source β†—

Install this skill

npx skills add wshobson/agents

Works across Claude Code, Cursor, Codex, Copilot & Antigravity

Event store design creates a specialized persistence layer tailored for event-sourced systems, where the state is derived from an immutable sequence of domain events rather than current state snapshots. This architectural pattern treats the event log as the system of record, ensuring every state transition is recorded and replayable. By decoupling event storage from read models, developers can project data into various queryable formats, support audit trails, and maintain strict data consistency through optimistic concurrency. Implementing a functional store requires careful consideration of stream organization, global ordering, and performance-critical indexing. This skill provides structural blueprints and storage strategies to manage stream contention, enable event subscription models, and handle versioning requirements, allowing your system to maintain a high-fidelity history of operations while supporting complex analytical or operational read requirements.

When to Use This Skill

  • β€’Building audit-heavy systems like financial ledgers
  • β€’Implementing CQRS architectures requiring complex read models
  • β€’Designing domain-driven microservices with complex aggregate state
  • β€’Creating systems requiring time-travel debugging capabilities

How to Invoke This Skill

Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:

  • β€œDesign an event store for an event-sourced aggregate
  • β€œCreate a PostgreSQL schema for event sourcing
  • β€œHow to handle optimistic concurrency in an event store
  • β€œImplement an event store pattern in Python
  • β€œCompare database options for event sourcing

Pro Tips

  • πŸ’‘Prioritize immutability: Ensure that events, once stored, can never be modified or deleted. This is fundamental for auditing, consistency, and the integrity of your event-sourced system.
  • πŸ’‘Design complementary read models (projections): Event stores are write-optimized. Build specific read models tailored for various query needs, rather than querying the event store directly for application state.
  • πŸ’‘Implement robust error handling and idempotency for event append operations: Guarantee atomicity for writes and handle potential duplicates to ensure data consistency in distributed environments.

What this skill does

  • β€’Implementation of append-only, immutable event logs
  • β€’Optimistic concurrency control using stream versioning
  • β€’Global and stream-specific event sequencing
  • β€’Snapshot management to optimize aggregate hydration performance
  • β€’Subscription-based notification for downstream event projection

When not to use it

  • βœ•Simple CRUD applications where state history is irrelevant
  • βœ•High-velocity real-time processing tasks where state latency must be near-zero

Example workflow

  1. Define the aggregate event schema and metadata structure
  2. Configure an append-only table structure with versioning constraints
  3. Write the append logic to enforce optimistic concurrency on writes
  4. Implement a projection listener to populate read-only views
  5. Optimize recovery by periodically capturing aggregate snapshots

Prerequisites

  • –Solid understanding of Event Sourcing and CQRS concepts
  • –Database modeling skills for relational or document stores
  • –Working knowledge of transactional consistency models

Pitfalls & limitations

  • !Underestimating the complexity of event schema migrations over time
  • !Performance bottlenecks caused by large event streams requiring frequent hydration
  • !Ignoring the cost of eventual consistency in projected read models

FAQ

Why is snapshotting necessary?
Snapshotting prevents the performance penalty of replaying long event histories every time an aggregate needs to be reloaded into memory.
Can I use a standard RDBMS as an event store?
Yes, PostgreSQL is common, provided you model your table to handle append-only semantics and enforce concurrency through unique versioning constraints.
How do I handle schema changes in event sourcing?
You typically use upcasting logic within your event store reader to transform legacy event formats into current application models at runtime.

How it compares

Generic database design focuses on the current state; event store design forces a shift toward tracking history as the primary data source, requiring specific logic for versioning and projection.

Source & trust

⭐ 37k starsπŸ“„ MITπŸ•’ Updated 2026-06-16
πŸ“„ Full skill instructions β€” original source: wshobson/agents
# Event Store Design

Comprehensive guide to designing event stores for event-sourced applications.

## When to Use This Skill

- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Optimizing event storage and retrieval
- Setting up event store schemas
- Planning for event store scaling

## Core Concepts

### 1. Event Store Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Event Store β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Stream 1 β”‚ β”‚ Stream 2 β”‚ β”‚ Stream 3 β”‚ β”‚
β”‚ β”‚ (Aggregate) β”‚ β”‚ (Aggregate) β”‚ β”‚ (Aggregate) β”‚ β”‚
β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚
β”‚ β”‚ Event 1 β”‚ β”‚ Event 1 β”‚ β”‚ Event 1 β”‚ β”‚
β”‚ β”‚ Event 2 β”‚ β”‚ Event 2 β”‚ β”‚ Event 2 β”‚ β”‚
β”‚ β”‚ Event 3 β”‚ β”‚ ... β”‚ β”‚ Event 3 β”‚ β”‚
β”‚ β”‚ ... β”‚ β”‚ β”‚ β”‚ Event 4 β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Global Position: 1 β†’ 2 β†’ 3 β†’ 4 β†’ 5 β†’ 6 β†’ ... β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜


### 2. Event Store Requirements

| Requirement | Description |
| ----------------- | ---------------------------------- |
| **Append-only** | Events are immutable, only appends |
| **Ordered** | Per-stream and global ordering |
| **Versioned** | Optimistic concurrency control |
| **Subscriptions** | Real-time event notifications |
| **Idempotent** | Handle duplicate writes safely |

## Technology Comparison

| Technology | Best For | Limitations |
| ---------------- | ------------------------- | -------------------------------- |
| **EventStoreDB** | Pure event sourcing | Single-purpose |
| **PostgreSQL** | Existing Postgres stack | Manual implementation |
| **Kafka** | High-throughput streaming | Not ideal for per-stream queries |
| **DynamoDB** | Serverless, AWS-native | Query limitations |
| **Marten** | .NET ecosystems | .NET specific |

## Templates

### Template 1: PostgreSQL Event Store Schema

-- Events table
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),

CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);

-- Index for stream queries
CREATE INDEX idx_events_stream_id ON events(stream_id, version);

-- Index for global subscription
CREATE INDEX idx_events_global_position ON events(global_position);

-- Index for event type queries
CREATE INDEX idx_events_event_type ON events(event_type);

-- Index for time-based queries
CREATE INDEX idx_events_created_at ON events(created_at);

-- Snapshots table
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Subscriptions checkpoint table
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);


### Template 2: Python Event Store Implementation

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional, List
from uuid import UUID, uuid4
import json
import asyncpg

@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: UUID = field(default_factory=uuid4)
version: Optional[int] = None
global_position: Optional[int] = None
created_at: datetime = field(default_factory=datetime.utcnow)


class EventStore:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool

async def append_events(
self,
stream_id: str,
stream_type: str,
events: List[Event],
expected_version: Optional[int] = None
) -> List[Event]:
"""Append events to a stream with optimistic concurrency."""
async with self.pool.acquire() as conn:
async with conn.transaction():
# Check expected version
if expected_version is not None:
current = await conn.fetchval(
"SELECT MAX(version) FROM events WHERE stream_id = $1",
stream_id
)
current = current or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current}"
)

# Get starting version
start_version = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
stream_id
)

# Insert events
saved_events = []
for i, event in enumerate(events):
event.version = start_version + i
row = await conn.fetchrow(
"""
INSERT INTO events (id, stream_id, stream_type, event_type,
event_data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING global_position
""",
event.event_id,
stream_id,
stream_type,
event.event_type,
json.dumps(event.data),
json.dumps(event.metadata),
event.version,
event.created_at
)
event.global_position = row['global_position']
saved_events.append(event)

return saved_events

async def read_stream(
self,
stream_id: str,
from_version: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read events from a stream."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE stream_id = $1 AND version >= $2
ORDER BY version
LIMIT $3
""",
stream_id, from_version, limit
)
return [self._row_to_event(row) for row in rows]

async def read_all(
self,
from_position: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read all events globally."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE global_position > $1
ORDER BY global_position
LIMIT $2
""",
from_position, limit
)
return [self._row_to_event(row) for row in rows]

async def subscribe(
self,
subscription_id: str,
handler,
from_position: int = 0,
batch_size: int = 100
):
"""Subscribe to all events from a position."""
# Get checkpoint
async with self.pool.acquire() as conn:
checkpoint = await conn.fetchval(
"""
SELECT last_position FROM subscription_checkpoints
WHERE subscription_id = $1
""",
subscription_id
)
position = checkpoint or from_position

while True:
events = await self.read_all(position, batch_size)
if not events:
await asyncio.sleep(1) # Poll interval
continue

for event in events:
await handler(event)
position = event.global_position

# Save checkpoint
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO subscription_checkpoints (subscription_id, last_position)
VALUES ($1, $2)
ON CONFLICT (subscription_id)
DO UPDATE SET last_position = $2, updated_at = NOW()
""",
subscription_id, position
)

def _row_to_event(self, row) -> Event:
return Event(
event_id=row['id'],
stream_id=row['stream_id'],
event_type=row['event_type'],
data=json.loads(row['event_data']),
metadata=json.loads(row['metadata']),
version=row['version'],
global_position=row['global_position'],
created_at=row['created_at']
)


class ConcurrencyError(Exception):
"""Raised when optimistic concurrency check fails."""
pass


### Template 3: EventStoreDB Usage

from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json

# Connect
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")

# Append events
def append_events(stream_name: str, events: list, expected_revision=None):
new_events = [
NewEvent(
type=event['type'],
data=json.dumps(event['data']).encode(),
metadata=json.dumps(event.get('metadata', {})).encode()
)
for event in events
]

if expected_revision is None:
state = StreamState.ANY
elif expected_revision == -1:
state = StreamState.NO_STREAM
else:
state = expected_revision

return client.append_to_stream(
stream_name=stream_name,
events=new_events,
current_version=state
)

# Read stream
def read_stream(stream_name: str, from_revision: int = 0):
events = client.get_stream(
stream_name=stream_name,
stream_position=from_revision
)
return [
{
'type': event.type,
'data': json.loads(event.data),
'metadata': json.loads(event.metadata) if event.metadata else {},
'stream_position': event.stream_position,
'commit_position': event.commit_position
}
for event in events
]

# Subscribe to all
async def subscribe_to_all(handler, from_position: int = 0):
subscription = client.subscribe_to_all(commit_position=from_position)
async for event in subscription:
await handler({
'type': event.type,
'data': json.loads(event.data),
'stream_id': event.stream_name,
'position': event.commit_position
})

# Category projection ($ce-Category)
def read_category(category: str):
"""Read all events for a category using system projection."""
return read_stream(f"$ce-{category}")


### Template 4: DynamoDB Event Store

import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json
import uuid

class DynamoEventStore:
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)

def append_events(self, stream_id: str, events: list, expected_version: int = None):
"""Append events with conditional write for concurrency."""
with self.table.batch_writer() as batch:
for i, event in enumerate(events):
version = (expected_version or 0) + i + 1
item = {
'PK': f"STREAM#{stream_id}",
'SK': f"VERSION#{version:020d}",
'GSI1PK': 'EVENTS',
'GSI1SK': datetime.utcnow().isoformat(),
'event_id': str(uuid.uuid4()),
'stream_id': stream_id,
'event_type': event['type'],
'event_data': json.dumps(event['data']),
'version': version,
'created_at': datetime.utcnow().isoformat()
}
batch.put_item(Item=item)
return events

def read_stream(self, stream_id: str, from_version: int = 0):
"""Read events from a stream."""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
Key('SK').gte(f"VERSION#{from_version:020d}")
)
return [
{
'event_type': item['event_type'],
'data': json.loads(item['event_data']),
'version': item['version']
}
for item in response['Items']
]

# Table definition (CloudFormation/Terraform)
"""
DynamoDB Table:
- PK (Partition Key): String
- SK (Sort Key): String
- GSI1PK, GSI1SK for global ordering

Capacity: On-demand or provisioned based on throughput needs
"""


## Best Practices

### Do's

- **Use stream IDs that include aggregate type** - Order-{uuid}
- **Include correlation/causation IDs** - For tracing
- **Version events from day one** - Plan for schema evolution
- **Implement idempotency** - Use event IDs for deduplication
- **Index appropriately** - For your query patterns

### Don'ts

- **Don't update or delete events** - They're immutable facts
- **Don't store large payloads** - Keep events small
- **Don't skip optimistic concurrency** - Prevents data corruption
- **Don't ignore backpressure** - Handle slow consumers

## Resources

- [EventStoreDB](https://www.eventstore.com/)
- [Marten Events](https://martendb.io/events/)
- [Event Sourcing Pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/event-sourcing)

How to Use This Skill Unit

Option A: Project-Specific (Recommended)

  1. Click "Download" above
  2. In your project, create the directory: .agent/skills/event-store-design/
  3. Save the file as SKILL.md
  4. The agent will automatically discover the skill based on its description.

Option B: Global Installation (All Agents)

Save the file to these locations to make it available across all projects:

  • Claude Code: ~/.claude/skills/wshobson/agents/event-store-design/SKILL.md
  • Cursor: ~/.cursor/skills/wshobson/agents/event-store-design/SKILL.md
  • Antigravity: ~/.gemini/antigravity/skills/wshobson/agents/event-store-design/SKILL.md

πŸš€ Install with CLI:
npx skills add wshobson/agents

Read the Master Guide: Mastering Agent Skills β†’

Related Skill Units

Recommended Rules

View more rules β†’

Recommended Workflows

View more workflows β†’

Recommended MCP Servers

View more MCP servers β†’

Take It Further

Maximize your productivity with these powerful resources

πŸ“‹

Define Your Standards

Set up coding standards to ensure this workflow produces consistent, high-quality results.

Browse Rules Library
πŸ“–

Master Workflows

Learn how to create custom workflows, use Turbo Mode, and build your automation library.

Complete Guide

How to use this Skill in Claude Code & Cursor

For Claude Code (CLI)

To use this skill in Claude Code, copy the rule content into your project's custom instructions or follow our Add-Skill CLI guide. This ensures Claude follows your standards during every code generation.

For Cursor & Windsurf

For Cursor or Windsurf, individual skills are best used in the "Rules for AI" section. This specific unit helps the agent avoid architecture & design patterns issues, leading to cleaner, more efficient code.

Why the skill format matters: the standardized Agent Skills format lets your AI agent load detailed instructions only when they are relevant, keeping your prompt clean while improving results.

Source & attribution

This skill is categorized under Architecture & Design Patterns and is published by W. Shobson, maintained in wshobson/agents.

← Browse All Agent Skills
Sponsored AI assistant. Recommendations may be paid.