npx skills add https://github.com/wshobson/agents --skill event-store-designHow Event Store Design fits into a Paperclip company.
Event Store Design drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.
Pre-configured AI company — 20 agents, 9 skills, one-time purchase.
SKILL.md431 linesExpandCollapse
---name: event-store-designdescription: Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.--- # Event Store Design Comprehensive guide to designing event stores for event-sourced applications. ## When to Use This Skill - Designing event sourcing infrastructure- Choosing between event store technologies- Implementing custom event stores- Optimizing event storage and retrieval- Setting up event store schemas- Planning for event store scaling ## Core Concepts ### 1. Event Store Architecture ```┌─────────────────────────────────────────────────────┐│ Event Store │├─────────────────────────────────────────────────────┤│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ ││ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ ││ ├─────────────┤ ├─────────────┤ ├─────────────┤ ││ │ Event 1 │ │ Event 1 │ │ Event 1 │ ││ │ Event 2 │ │ Event 2 │ │ Event 2 │ ││ │ Event 3 │ │ ... │ │ Event 3 │ ││ │ ... │ │ │ │ Event 4 │ ││ └─────────────┘ └─────────────┘ └─────────────┘ │├─────────────────────────────────────────────────────┤│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │└─────────────────────────────────────────────────────┘``` ### 2. Event Store Requirements | Requirement | Description || ----------------- | ---------------------------------- || **Append-only** | Events are immutable, only appends || **Ordered** | Per-stream and global ordering || **Versioned** | Optimistic concurrency control || **Subscriptions** | Real-time event notifications || **Idempotent** | Handle duplicate writes safely | ## Technology Comparison | Technology | Best For | Limitations || ---------------- | ------------------------- | -------------------------------- || **EventStoreDB** | Pure event sourcing | Single-purpose || **PostgreSQL** | Existing Postgres stack | Manual implementation || **Kafka** | High-throughput streaming | Not ideal for per-stream queries || **DynamoDB** | Serverless, AWS-native | Query limitations || **Marten** | .NET ecosystems | .NET specific | ## Templates ### Template 1: PostgreSQL Event Store Schema ```sql-- Events tableCREATE TABLE events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), stream_id VARCHAR(255) NOT NULL, stream_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version BIGINT NOT NULL, global_position BIGSERIAL, created_at TIMESTAMPTZ DEFAULT NOW(), CONSTRAINT unique_stream_version UNIQUE (stream_id, version)); -- Index for stream queriesCREATE INDEX idx_events_stream_id ON events(stream_id, version); -- Index for global subscriptionCREATE INDEX idx_events_global_position ON events(global_position); -- Index for event type queriesCREATE INDEX idx_events_event_type ON events(event_type); -- Index for time-based queriesCREATE INDEX idx_events_created_at ON events(created_at); -- Snapshots tableCREATE TABLE snapshots ( stream_id VARCHAR(255) PRIMARY KEY, stream_type VARCHAR(255) NOT NULL, snapshot_data JSONB NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW()); -- Subscriptions checkpoint tableCREATE TABLE subscription_checkpoints ( subscription_id VARCHAR(255) PRIMARY KEY, last_position BIGINT NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ DEFAULT NOW());``` ### Template 2: Python Event Store Implementation ```pythonfrom dataclasses import dataclass, fieldfrom datetime import datetimefrom typing import Any, Optional, Listfrom uuid import UUID, uuid4import jsonimport asyncpg @dataclassclass Event: stream_id: str event_type: str data: dict metadata: dict = field(default_factory=dict) event_id: UUID = field(default_factory=uuid4) version: Optional[int] = None global_position: Optional[int] = None created_at: datetime = field(default_factory=datetime.utcnow) class EventStore: def __init__(self, pool: asyncpg.Pool): self.pool = pool async def append_events( self, stream_id: str, stream_type: str, events: List[Event], expected_version: Optional[int] = None ) -> List[Event]: """Append events to a stream with optimistic concurrency.""" async with self.pool.acquire() as conn: async with conn.transaction(): # Check expected version if expected_version is not None: current = await conn.fetchval( "SELECT MAX(version) FROM events WHERE stream_id = $1", stream_id ) current = current or 0 if current != expected_version: raise ConcurrencyError( f"Expected version {expected_version}, got {current}" ) # Get starting version start_version = await conn.fetchval( "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1", stream_id ) # Insert events saved_events = [] for i, event in enumerate(events): event.version = start_version + i row = await conn.fetchrow( """ INSERT INTO events (id, stream_id, stream_type, event_type, event_data, metadata, version, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING global_position """, event.event_id, stream_id, stream_type, event.event_type, json.dumps(event.data), json.dumps(event.metadata), event.version, event.created_at ) event.global_position = row['global_position'] saved_events.append(event) return saved_events async def read_stream( self, stream_id: str, from_version: int = 0, limit: int = 1000 ) -> List[Event]: """Read events from a stream.""" async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT id, stream_id, event_type, event_data, metadata, version, global_position, created_at FROM events WHERE stream_id = $1 AND version >= $2 ORDER BY version LIMIT $3 """, stream_id, from_version, limit ) return [self._row_to_event(row) for row in rows] async def read_all( self, from_position: int = 0, limit: int = 1000 ) -> List[Event]: """Read all events globally.""" async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT id, stream_id, event_type, event_data, metadata, version, global_position, created_at FROM events WHERE global_position > $1 ORDER BY global_position LIMIT $2 """, from_position, limit ) return [self._row_to_event(row) for row in rows] async def subscribe( self, subscription_id: str, handler, from_position: int = 0, batch_size: int = 100 ): """Subscribe to all events from a position.""" # Get checkpoint async with self.pool.acquire() as conn: checkpoint = await conn.fetchval( """ SELECT last_position FROM subscription_checkpoints WHERE subscription_id = $1 """, subscription_id ) position = checkpoint or from_position while True: events = await self.read_all(position, batch_size) if not events: await asyncio.sleep(1) # Poll interval continue for event in events: await handler(event) position = event.global_position # Save checkpoint async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO subscription_checkpoints (subscription_id, last_position) VALUES ($1, $2) ON CONFLICT (subscription_id) DO UPDATE SET last_position = $2, updated_at = NOW() """, subscription_id, position ) def _row_to_event(self, row) -> Event: return Event( event_id=row['id'], stream_id=row['stream_id'], event_type=row['event_type'], data=json.loads(row['event_data']), metadata=json.loads(row['metadata']), version=row['version'], global_position=row['global_position'], created_at=row['created_at'] ) class ConcurrencyError(Exception): """Raised when optimistic concurrency check fails.""" pass``` ### Template 3: EventStoreDB Usage ```pythonfrom esdbclient import EventStoreDBClient, NewEvent, StreamStateimport json # Connectclient = EventStoreDBClient(uri="esdb://localhost:2113?tls=false") # Append eventsdef append_events(stream_name: str, events: list, expected_revision=None): new_events = [ NewEvent( type=event['type'], data=json.dumps(event['data']).encode(), metadata=json.dumps(event.get('metadata', {})).encode() ) for event in events ] if expected_revision is None: state = StreamState.ANY elif expected_revision == -1: state = StreamState.NO_STREAM else: state = expected_revision return client.append_to_stream( stream_name=stream_name, events=new_events, current_version=state ) # Read streamdef read_stream(stream_name: str, from_revision: int = 0): events = client.get_stream( stream_name=stream_name, stream_position=from_revision ) return [ { 'type': event.type, 'data': json.loads(event.data), 'metadata': json.loads(event.metadata) if event.metadata else {}, 'stream_position': event.stream_position, 'commit_position': event.commit_position } for event in events ] # Subscribe to allasync def subscribe_to_all(handler, from_position: int = 0): subscription = client.subscribe_to_all(commit_position=from_position) async for event in subscription: await handler({ 'type': event.type, 'data': json.loads(event.data), 'stream_id': event.stream_name, 'position': event.commit_position }) # Category projection ($ce-Category)def read_category(category: str): """Read all events for a category using system projection.""" return read_stream(f"$ce-{category}")``` ### Template 4: DynamoDB Event Store ```pythonimport boto3from boto3.dynamodb.conditions import Keyfrom datetime import datetimeimport jsonimport uuid class DynamoEventStore: def __init__(self, table_name: str): self.dynamodb = boto3.resource('dynamodb') self.table = self.dynamodb.Table(table_name) def append_events(self, stream_id: str, events: list, expected_version: int = None): """Append events with conditional write for concurrency.""" with self.table.batch_writer() as batch: for i, event in enumerate(events): version = (expected_version or 0) + i + 1 item = { 'PK': f"STREAM#{stream_id}", 'SK': f"VERSION#{version:020d}", 'GSI1PK': 'EVENTS', 'GSI1SK': datetime.utcnow().isoformat(), 'event_id': str(uuid.uuid4()), 'stream_id': stream_id, 'event_type': event['type'], 'event_data': json.dumps(event['data']), 'version': version, 'created_at': datetime.utcnow().isoformat() } batch.put_item(Item=item) return events def read_stream(self, stream_id: str, from_version: int = 0): """Read events from a stream.""" response = self.table.query( KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") & Key('SK').gte(f"VERSION#{from_version:020d}") ) return [ { 'event_type': item['event_type'], 'data': json.loads(item['event_data']), 'version': item['version'] } for item in response['Items'] ] # Table definition (CloudFormation/Terraform)"""DynamoDB Table: - PK (Partition Key): String - SK (Sort Key): String - GSI1PK, GSI1SK for global ordering Capacity: On-demand or provisioned based on throughput needs"""``` ## Best Practices ### Do's - **Use stream IDs that include aggregate type** - `Order-{uuid}`- **Include correlation/causation IDs** - For tracing- **Version events from day one** - Plan for schema evolution- **Implement idempotency** - Use event IDs for deduplication- **Index appropriately** - For your query patterns ### Don'ts - **Don't update or delete events** - They're immutable facts- **Don't store large payloads** - Keep events small- **Don't skip optimistic concurrency** - Prevents data corruption- **Don't ignore backpressure** - Handle slow consumersAccessibility Compliance
This walks you through implementing proper WCAG 2.2 compliance with real code patterns for screen readers, keyboard navigation, and mobile accessibility. It cov
Airflow Dag Patterns
If you're building data pipelines with Airflow, this skill gives you production-ready DAG patterns that actually work in the real world. It covers TaskFlow API
Angular Migration
Migrating from AngularJS to Angular is notoriously painful, and this skill tackles the practical stuff that makes or breaks these projects. It covers hybrid app