Claude Agent Skill · by Wshobson

Event Store Design

Gets you from zero to production event store with PostgreSQL schemas, Python implementations, and all the infrastructure decisions sorted. Covers stream version

Install
Terminal · npx
$npx skills add https://github.com/wshobson/agents --skill event-store-design
Works with Paperclip

How Event Store Design fits into a Paperclip company.

Event Store Design drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.

E
E-Commerce EmpirePaired

Pre-configured AI company — 20 agents, 9 skills, one-time purchase.

$59$89
Explore pack
Source file
SKILL.md431 lines
Expand
---name: event-store-designdescription: Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.--- # Event Store Design Comprehensive guide to designing event stores for event-sourced applications. ## When to Use This Skill - Designing event sourcing infrastructure- Choosing between event store technologies- Implementing custom event stores- Optimizing event storage and retrieval- Setting up event store schemas- Planning for event store scaling ## Core Concepts ### 1. Event Store Architecture ```┌─────────────────────────────────────────────────────┐│                    Event Store                       │├─────────────────────────────────────────────────────┤│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ ││  │   Stream 1   │  │   Stream 2   │  │   Stream 3   │ ││  │ (Aggregate)  │  │ (Aggregate)  │  │ (Aggregate)  │ ││  ├─────────────┤  ├─────────────┤  ├─────────────┤ ││  │ Event 1     │  │ Event 1     │  │ Event 1     │ ││  │ Event 2     │  │ Event 2     │  │ Event 2     │ ││  │ Event 3     │  │ ...         │  │ Event 3     │ ││  │ ...         │  │             │  │ Event 4     │ ││  └─────────────┘  └─────────────┘  └─────────────┘ │├─────────────────────────────────────────────────────┤│  Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ...     │└─────────────────────────────────────────────────────┘``` ### 2. Event Store Requirements | Requirement       | Description                        || ----------------- | ---------------------------------- || **Append-only**   | Events are immutable, only appends || **Ordered**       | Per-stream and global ordering     || **Versioned**     | Optimistic concurrency control     || **Subscriptions** | Real-time event notifications      || **Idempotent**    | Handle duplicate writes safely     | ## Technology Comparison | Technology       | Best For                  | Limitations                      || ---------------- | ------------------------- | -------------------------------- || **EventStoreDB** | Pure event sourcing       | Single-purpose                   || **PostgreSQL**   | Existing Postgres stack   | Manual implementation            || **Kafka**        | High-throughput streaming | Not ideal for per-stream queries || **DynamoDB**     | Serverless, AWS-native    | Query limitations                || **Marten**       | .NET ecosystems           | .NET specific                    | ## Templates ### Template 1: PostgreSQL Event Store Schema ```sql-- Events tableCREATE TABLE events (    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),    stream_id VARCHAR(255) NOT NULL,    stream_type VARCHAR(255) NOT NULL,    event_type VARCHAR(255) NOT NULL,    event_data JSONB NOT NULL,    metadata JSONB DEFAULT '{}',    version BIGINT NOT NULL,    global_position BIGSERIAL,    created_at TIMESTAMPTZ DEFAULT NOW(),     CONSTRAINT unique_stream_version UNIQUE (stream_id, version)); -- Index for stream queriesCREATE INDEX idx_events_stream_id ON events(stream_id, version); -- Index for global subscriptionCREATE INDEX idx_events_global_position ON events(global_position); -- Index for event type queriesCREATE INDEX idx_events_event_type ON events(event_type); -- Index for time-based queriesCREATE INDEX idx_events_created_at ON events(created_at); -- Snapshots tableCREATE TABLE snapshots (    stream_id VARCHAR(255) PRIMARY KEY,    stream_type VARCHAR(255) NOT NULL,    snapshot_data JSONB NOT NULL,    version BIGINT NOT NULL,    created_at TIMESTAMPTZ DEFAULT NOW()); -- Subscriptions checkpoint tableCREATE TABLE subscription_checkpoints (    subscription_id VARCHAR(255) PRIMARY KEY,    last_position BIGINT NOT NULL DEFAULT 0,    updated_at TIMESTAMPTZ DEFAULT NOW());``` ### Template 2: Python Event Store Implementation ```pythonfrom dataclasses import dataclass, fieldfrom datetime import datetimefrom typing import Any, Optional, Listfrom uuid import UUID, uuid4import jsonimport asyncpg @dataclassclass Event:    stream_id: str    event_type: str    data: dict    metadata: dict = field(default_factory=dict)    event_id: UUID = field(default_factory=uuid4)    version: Optional[int] = None    global_position: Optional[int] = None    created_at: datetime = field(default_factory=datetime.utcnow)  class EventStore:    def __init__(self, pool: asyncpg.Pool):        self.pool = pool     async def append_events(        self,        stream_id: str,        stream_type: str,        events: List[Event],        expected_version: Optional[int] = None    ) -> List[Event]:        """Append events to a stream with optimistic concurrency."""        async with self.pool.acquire() as conn:            async with conn.transaction():                # Check expected version                if expected_version is not None:                    current = await conn.fetchval(                        "SELECT MAX(version) FROM events WHERE stream_id = $1",                        stream_id                    )                    current = current or 0                    if current != expected_version:                        raise ConcurrencyError(                            f"Expected version {expected_version}, got {current}"                        )                 # Get starting version                start_version = await conn.fetchval(                    "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",                    stream_id                )                 # Insert events                saved_events = []                for i, event in enumerate(events):                    event.version = start_version + i                    row = await conn.fetchrow(                        """                        INSERT INTO events (id, stream_id, stream_type, event_type,                                          event_data, metadata, version, created_at)                        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)                        RETURNING global_position                        """,                        event.event_id,                        stream_id,                        stream_type,                        event.event_type,                        json.dumps(event.data),                        json.dumps(event.metadata),                        event.version,                        event.created_at                    )                    event.global_position = row['global_position']                    saved_events.append(event)                 return saved_events     async def read_stream(        self,        stream_id: str,        from_version: int = 0,        limit: int = 1000    ) -> List[Event]:        """Read events from a stream."""        async with self.pool.acquire() as conn:            rows = await conn.fetch(                """                SELECT id, stream_id, event_type, event_data, metadata,                       version, global_position, created_at                FROM events                WHERE stream_id = $1 AND version >= $2                ORDER BY version                LIMIT $3                """,                stream_id, from_version, limit            )            return [self._row_to_event(row) for row in rows]     async def read_all(        self,        from_position: int = 0,        limit: int = 1000    ) -> List[Event]:        """Read all events globally."""        async with self.pool.acquire() as conn:            rows = await conn.fetch(                """                SELECT id, stream_id, event_type, event_data, metadata,                       version, global_position, created_at                FROM events                WHERE global_position > $1                ORDER BY global_position                LIMIT $2                """,                from_position, limit            )            return [self._row_to_event(row) for row in rows]     async def subscribe(        self,        subscription_id: str,        handler,        from_position: int = 0,        batch_size: int = 100    ):        """Subscribe to all events from a position."""        # Get checkpoint        async with self.pool.acquire() as conn:            checkpoint = await conn.fetchval(                """                SELECT last_position FROM subscription_checkpoints                WHERE subscription_id = $1                """,                subscription_id            )            position = checkpoint or from_position         while True:            events = await self.read_all(position, batch_size)            if not events:                await asyncio.sleep(1)  # Poll interval                continue             for event in events:                await handler(event)                position = event.global_position             # Save checkpoint            async with self.pool.acquire() as conn:                await conn.execute(                    """                    INSERT INTO subscription_checkpoints (subscription_id, last_position)                    VALUES ($1, $2)                    ON CONFLICT (subscription_id)                    DO UPDATE SET last_position = $2, updated_at = NOW()                    """,                    subscription_id, position                )     def _row_to_event(self, row) -> Event:        return Event(            event_id=row['id'],            stream_id=row['stream_id'],            event_type=row['event_type'],            data=json.loads(row['event_data']),            metadata=json.loads(row['metadata']),            version=row['version'],            global_position=row['global_position'],            created_at=row['created_at']        )  class ConcurrencyError(Exception):    """Raised when optimistic concurrency check fails."""    pass``` ### Template 3: EventStoreDB Usage ```pythonfrom esdbclient import EventStoreDBClient, NewEvent, StreamStateimport json # Connectclient = EventStoreDBClient(uri="esdb://localhost:2113?tls=false") # Append eventsdef append_events(stream_name: str, events: list, expected_revision=None):    new_events = [        NewEvent(            type=event['type'],            data=json.dumps(event['data']).encode(),            metadata=json.dumps(event.get('metadata', {})).encode()        )        for event in events    ]     if expected_revision is None:        state = StreamState.ANY    elif expected_revision == -1:        state = StreamState.NO_STREAM    else:        state = expected_revision     return client.append_to_stream(        stream_name=stream_name,        events=new_events,        current_version=state    ) # Read streamdef read_stream(stream_name: str, from_revision: int = 0):    events = client.get_stream(        stream_name=stream_name,        stream_position=from_revision    )    return [        {            'type': event.type,            'data': json.loads(event.data),            'metadata': json.loads(event.metadata) if event.metadata else {},            'stream_position': event.stream_position,            'commit_position': event.commit_position        }        for event in events    ] # Subscribe to allasync def subscribe_to_all(handler, from_position: int = 0):    subscription = client.subscribe_to_all(commit_position=from_position)    async for event in subscription:        await handler({            'type': event.type,            'data': json.loads(event.data),            'stream_id': event.stream_name,            'position': event.commit_position        }) # Category projection ($ce-Category)def read_category(category: str):    """Read all events for a category using system projection."""    return read_stream(f"$ce-{category}")``` ### Template 4: DynamoDB Event Store ```pythonimport boto3from boto3.dynamodb.conditions import Keyfrom datetime import datetimeimport jsonimport uuid class DynamoEventStore:    def __init__(self, table_name: str):        self.dynamodb = boto3.resource('dynamodb')        self.table = self.dynamodb.Table(table_name)     def append_events(self, stream_id: str, events: list, expected_version: int = None):        """Append events with conditional write for concurrency."""        with self.table.batch_writer() as batch:            for i, event in enumerate(events):                version = (expected_version or 0) + i + 1                item = {                    'PK': f"STREAM#{stream_id}",                    'SK': f"VERSION#{version:020d}",                    'GSI1PK': 'EVENTS',                    'GSI1SK': datetime.utcnow().isoformat(),                    'event_id': str(uuid.uuid4()),                    'stream_id': stream_id,                    'event_type': event['type'],                    'event_data': json.dumps(event['data']),                    'version': version,                    'created_at': datetime.utcnow().isoformat()                }                batch.put_item(Item=item)        return events     def read_stream(self, stream_id: str, from_version: int = 0):        """Read events from a stream."""        response = self.table.query(            KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &                                  Key('SK').gte(f"VERSION#{from_version:020d}")        )        return [            {                'event_type': item['event_type'],                'data': json.loads(item['event_data']),                'version': item['version']            }            for item in response['Items']        ] # Table definition (CloudFormation/Terraform)"""DynamoDB Table:  - PK (Partition Key): String  - SK (Sort Key): String  - GSI1PK, GSI1SK for global ordering Capacity: On-demand or provisioned based on throughput needs"""``` ## Best Practices ### Do's - **Use stream IDs that include aggregate type** - `Order-{uuid}`- **Include correlation/causation IDs** - For tracing- **Version events from day one** - Plan for schema evolution- **Implement idempotency** - Use event IDs for deduplication- **Index appropriately** - For your query patterns ### Don'ts - **Don't update or delete events** - They're immutable facts- **Don't store large payloads** - Keep events small- **Don't skip optimistic concurrency** - Prevents data corruption- **Don't ignore backpressure** - Handle slow consumers