Claude Agent Skill · by Wshobson

Projection Patterns

If you're building event-sourced systems with CQRS, you'll eventually need to create read models from your event streams, and that's where this becomes essentia

Install
Terminal · npx
$npx skills add https://github.com/wshobson/agents --skill projection-patterns
Works with Paperclip

How Projection Patterns fits into a Paperclip company.

Projection Patterns drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.

S
SaaS FactoryPaired

Pre-configured AI company — 18 agents, 18 skills, one-time purchase.

$27$59
Explore pack
Source file
SKILL.md485 lines
Expand
---name: projection-patternsdescription: Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.--- # Projection Patterns Comprehensive guide to building projections and read models for event-sourced systems. ## When to Use This Skill - Building CQRS read models- Creating materialized views from events- Optimizing query performance- Implementing real-time dashboards- Building search indexes from events- Aggregating data across streams ## Core Concepts ### 1. Projection Architecture ```┌─────────────┐     ┌─────────────┐     ┌─────────────┐│ Event Store │────►│ Projector   │────►│ Read Model  ││             │     │             │     │ (Database)  ││ ┌─────────┐ │     │ ┌─────────┐ │     │ ┌─────────┐ ││ │ Events  │ │     │ │ Handler │ │     │ │ Tables  │ ││ └─────────┘ │     │ │ Logic   │ │     │ │ Views   │ ││             │     │ └─────────┘ │     │ │ Cache   │ │└─────────────┘     └─────────────┘     └─────────────┘``` ### 2. Projection Types | Type           | Description                 | Use Case               || -------------- | --------------------------- | ---------------------- || **Live**       | Real-time from subscription | Current state queries  || **Catchup**    | Process historical events   | Rebuilding read models || **Persistent** | Stores checkpoint           | Resume after restart   || **Inline**     | Same transaction as write   | Strong consistency     | ## Templates ### Template 1: Basic Projector ```pythonfrom abc import ABC, abstractmethodfrom dataclasses import dataclassfrom typing import Dict, Any, Callable, Listimport asyncpg @dataclassclass Event:    stream_id: str    event_type: str    data: dict    version: int    global_position: int  class Projection(ABC):    """Base class for projections."""     @property    @abstractmethod    def name(self) -> str:        """Unique projection name for checkpointing."""        pass     @abstractmethod    def handles(self) -> List[str]:        """List of event types this projection handles."""        pass     @abstractmethod    async def apply(self, event: Event) -> None:        """Apply event to the read model."""        pass  class Projector:    """Runs projections from event store."""     def __init__(self, event_store, checkpoint_store):        self.event_store = event_store        self.checkpoint_store = checkpoint_store        self.projections: List[Projection] = []     def register(self, projection: Projection):        self.projections.append(projection)     async def run(self, batch_size: int = 100):        """Run all projections continuously."""        while True:            for projection in self.projections:                await self._run_projection(projection, batch_size)            await asyncio.sleep(0.1)     async def _run_projection(self, projection: Projection, batch_size: int):        checkpoint = await self.checkpoint_store.get(projection.name)        position = checkpoint or 0         events = await self.event_store.read_all(position, batch_size)         for event in events:            if event.event_type in projection.handles():                await projection.apply(event)             await self.checkpoint_store.save(                projection.name,                event.global_position            )     async def rebuild(self, projection: Projection):        """Rebuild a projection from scratch."""        await self.checkpoint_store.delete(projection.name)        # Optionally clear read model tables        await self._run_projection(projection, batch_size=1000)``` ### Template 2: Order Summary Projection ```pythonclass OrderSummaryProjection(Projection):    """Projects order events to a summary read model."""     def __init__(self, db_pool: asyncpg.Pool):        self.pool = db_pool     @property    def name(self) -> str:        return "order_summary"     def handles(self) -> List[str]:        return [            "OrderCreated",            "OrderItemAdded",            "OrderItemRemoved",            "OrderShipped",            "OrderCompleted",            "OrderCancelled"        ]     async def apply(self, event: Event) -> None:        handlers = {            "OrderCreated": self._handle_created,            "OrderItemAdded": self._handle_item_added,            "OrderItemRemoved": self._handle_item_removed,            "OrderShipped": self._handle_shipped,            "OrderCompleted": self._handle_completed,            "OrderCancelled": self._handle_cancelled,        }         handler = handlers.get(event.event_type)        if handler:            await handler(event)     async def _handle_created(self, event: Event):        async with self.pool.acquire() as conn:            await conn.execute(                """                INSERT INTO order_summaries                (order_id, customer_id, status, total_amount, item_count, created_at)                VALUES ($1, $2, $3, $4, $5, $6)                """,                event.data['order_id'],                event.data['customer_id'],                'pending',                0,                0,                event.data['created_at']            )     async def _handle_item_added(self, event: Event):        async with self.pool.acquire() as conn:            await conn.execute(                """                UPDATE order_summaries                SET total_amount = total_amount + $2,                    item_count = item_count + 1,                    updated_at = NOW()                WHERE order_id = $1                """,                event.data['order_id'],                event.data['price'] * event.data['quantity']            )     async def _handle_item_removed(self, event: Event):        async with self.pool.acquire() as conn:            await conn.execute(                """                UPDATE order_summaries                SET total_amount = total_amount - $2,                    item_count = item_count - 1,                    updated_at = NOW()                WHERE order_id = $1                """,                event.data['order_id'],                event.data['price'] * event.data['quantity']            )     async def _handle_shipped(self, event: Event):        async with self.pool.acquire() as conn:            await conn.execute(                """                UPDATE order_summaries                SET status = 'shipped',                    shipped_at = $2,                    updated_at = NOW()                WHERE order_id = $1                """,                event.data['order_id'],                event.data['shipped_at']            )     async def _handle_completed(self, event: Event):        async with self.pool.acquire() as conn:            await conn.execute(                """                UPDATE order_summaries                SET status = 'completed',                    completed_at = $2,                    updated_at = NOW()                WHERE order_id = $1                """,                event.data['order_id'],                event.data['completed_at']            )     async def _handle_cancelled(self, event: Event):        async with self.pool.acquire() as conn:            await conn.execute(                """                UPDATE order_summaries                SET status = 'cancelled',                    cancelled_at = $2,                    cancellation_reason = $3,                    updated_at = NOW()                WHERE order_id = $1                """,                event.data['order_id'],                event.data['cancelled_at'],                event.data.get('reason')            )``` ### Template 3: Elasticsearch Search Projection ```pythonfrom elasticsearch import AsyncElasticsearch class ProductSearchProjection(Projection):    """Projects product events to Elasticsearch for full-text search."""     def __init__(self, es_client: AsyncElasticsearch):        self.es = es_client        self.index = "products"     @property    def name(self) -> str:        return "product_search"     def handles(self) -> List[str]:        return [            "ProductCreated",            "ProductUpdated",            "ProductPriceChanged",            "ProductDeleted"        ]     async def apply(self, event: Event) -> None:        if event.event_type == "ProductCreated":            await self.es.index(                index=self.index,                id=event.data['product_id'],                document={                    'name': event.data['name'],                    'description': event.data['description'],                    'category': event.data['category'],                    'price': event.data['price'],                    'tags': event.data.get('tags', []),                    'created_at': event.data['created_at']                }            )         elif event.event_type == "ProductUpdated":            await self.es.update(                index=self.index,                id=event.data['product_id'],                doc={                    'name': event.data['name'],                    'description': event.data['description'],                    'category': event.data['category'],                    'tags': event.data.get('tags', []),                    'updated_at': event.data['updated_at']                }            )         elif event.event_type == "ProductPriceChanged":            await self.es.update(                index=self.index,                id=event.data['product_id'],                doc={                    'price': event.data['new_price'],                    'price_updated_at': event.data['changed_at']                }            )         elif event.event_type == "ProductDeleted":            await self.es.delete(                index=self.index,                id=event.data['product_id']            )``` ### Template 4: Aggregating Projection ```pythonclass DailySalesProjection(Projection):    """Aggregates sales data by day for reporting."""     def __init__(self, db_pool: asyncpg.Pool):        self.pool = db_pool     @property    def name(self) -> str:        return "daily_sales"     def handles(self) -> List[str]:        return ["OrderCompleted", "OrderRefunded"]     async def apply(self, event: Event) -> None:        if event.event_type == "OrderCompleted":            await self._increment_sales(event)        elif event.event_type == "OrderRefunded":            await self._decrement_sales(event)     async def _increment_sales(self, event: Event):        date = event.data['completed_at'][:10]  # YYYY-MM-DD        async with self.pool.acquire() as conn:            await conn.execute(                """                INSERT INTO daily_sales (date, total_orders, total_revenue, total_items)                VALUES ($1, 1, $2, $3)                ON CONFLICT (date) DO UPDATE SET                    total_orders = daily_sales.total_orders + 1,                    total_revenue = daily_sales.total_revenue + $2,                    total_items = daily_sales.total_items + $3,                    updated_at = NOW()                """,                date,                event.data['total_amount'],                event.data['item_count']            )     async def _decrement_sales(self, event: Event):        date = event.data['original_completed_at'][:10]        async with self.pool.acquire() as conn:            await conn.execute(                """                UPDATE daily_sales SET                    total_orders = total_orders - 1,                    total_revenue = total_revenue - $2,                    total_refunds = total_refunds + $2,                    updated_at = NOW()                WHERE date = $1                """,                date,                event.data['refund_amount']            )``` ### Template 5: Multi-Table Projection ```pythonclass CustomerActivityProjection(Projection):    """Projects customer activity across multiple tables."""     def __init__(self, db_pool: asyncpg.Pool):        self.pool = db_pool     @property    def name(self) -> str:        return "customer_activity"     def handles(self) -> List[str]:        return [            "CustomerCreated",            "OrderCompleted",            "ReviewSubmitted",            "CustomerTierChanged"        ]     async def apply(self, event: Event) -> None:        async with self.pool.acquire() as conn:            async with conn.transaction():                if event.event_type == "CustomerCreated":                    # Insert into customers table                    await conn.execute(                        """                        INSERT INTO customers (customer_id, email, name, tier, created_at)                        VALUES ($1, $2, $3, 'bronze', $4)                        """,                        event.data['customer_id'],                        event.data['email'],                        event.data['name'],                        event.data['created_at']                    )                    # Initialize activity summary                    await conn.execute(                        """                        INSERT INTO customer_activity_summary                        (customer_id, total_orders, total_spent, total_reviews)                        VALUES ($1, 0, 0, 0)                        """,                        event.data['customer_id']                    )                 elif event.event_type == "OrderCompleted":                    # Update activity summary                    await conn.execute(                        """                        UPDATE customer_activity_summary SET                            total_orders = total_orders + 1,                            total_spent = total_spent + $2,                            last_order_at = $3                        WHERE customer_id = $1                        """,                        event.data['customer_id'],                        event.data['total_amount'],                        event.data['completed_at']                    )                    # Insert into order history                    await conn.execute(                        """                        INSERT INTO customer_order_history                        (customer_id, order_id, amount, completed_at)                        VALUES ($1, $2, $3, $4)                        """,                        event.data['customer_id'],                        event.data['order_id'],                        event.data['total_amount'],                        event.data['completed_at']                    )                 elif event.event_type == "ReviewSubmitted":                    await conn.execute(                        """                        UPDATE customer_activity_summary SET                            total_reviews = total_reviews + 1,                            last_review_at = $2                        WHERE customer_id = $1                        """,                        event.data['customer_id'],                        event.data['submitted_at']                    )                 elif event.event_type == "CustomerTierChanged":                    await conn.execute(                        """                        UPDATE customers SET tier = $2, updated_at = NOW()                        WHERE customer_id = $1                        """,                        event.data['customer_id'],                        event.data['new_tier']                    )``` ## Best Practices ### Do's - **Make projections idempotent** - Safe to replay- **Use transactions** - For multi-table updates- **Store checkpoints** - Resume after failures- **Monitor lag** - Alert on projection delays- **Plan for rebuilds** - Design for reconstruction ### Don'ts - **Don't couple projections** - Each is independent- **Don't skip error handling** - Log and alert on failures- **Don't ignore ordering** - Events must be processed in order- **Don't over-normalize** - Denormalize for query patterns