Claude Agent Skill · by Wshobson

Cqrs Implementation

Solid foundation for implementing CQRS patterns with complete command and query infrastructure templates. Includes command buses, handlers, event sourcing integ

Install
Terminal · npx
$npx skills add https://github.com/wshobson/agents --skill cqrs-implementation
Works with Paperclip

How Cqrs Implementation fits into a Paperclip company.

Cqrs Implementation drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.

S
SaaS FactoryPaired

Pre-configured AI company — 18 agents, 18 skills, one-time purchase.

$27$59
Explore pack
Source file
SKILL.md549 lines
Expand
---name: cqrs-implementationdescription: Implement Command Query Responsibility Segregation for scalable architectures. Use when separating read and write models, optimizing query performance, or building event-sourced systems.--- # CQRS Implementation Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns. ## When to Use This Skill - Separating read and write concerns- Scaling reads independently from writes- Building event-sourced systems- Optimizing complex query scenarios- Different read/write data models needed- High-performance reporting requirements ## Core Concepts ### 1. CQRS Architecture ```                    ┌─────────────┐                    │   Client    │                    └──────┬──────┘              ┌────────────┴────────────┐              │                         │              ▼                         ▼       ┌─────────────┐          ┌─────────────┐       │  Commands   │          │   Queries   │       │    API      │          │    API      │       └──────┬──────┘          └──────┬──────┘              │                         │              ▼                         ▼       ┌─────────────┐          ┌─────────────┐       │  Command    │          │   Query     │       │  Handlers   │          │  Handlers   │       └──────┬──────┘          └──────┬──────┘              │                         │              ▼                         ▼       ┌─────────────┐          ┌─────────────┐       │   Write     │─────────►│    Read     │       │   Model     │  Events  │   Model     │       └─────────────┘          └─────────────┘``` ### 2. Key Components | Component           | Responsibility                  || ------------------- | ------------------------------- || **Command**         | Intent to change state          || **Command Handler** | Validates and executes commands || **Event**           | Record of state change          || **Query**           | Request for data                || **Query Handler**   | Retrieves data from read model  || **Projector**       | Updates read model from events  | ## Templates ### Template 1: Command Infrastructure ```pythonfrom abc import ABC, abstractmethodfrom dataclasses import dataclassfrom typing import TypeVar, Generic, Dict, Any, Typefrom datetime import datetimeimport uuid # Command base@dataclassclass Command:    command_id: str = None    timestamp: datetime = None     def __post_init__(self):        self.command_id = self.command_id or str(uuid.uuid4())        self.timestamp = self.timestamp or datetime.utcnow()  # Concrete commands@dataclassclass CreateOrder(Command):    customer_id: str    items: list    shipping_address: dict  @dataclassclass AddOrderItem(Command):    order_id: str    product_id: str    quantity: int    price: float  @dataclassclass CancelOrder(Command):    order_id: str    reason: str  # Command handler baseT = TypeVar('T', bound=Command) class CommandHandler(ABC, Generic[T]):    @abstractmethod    async def handle(self, command: T) -> Any:        pass  # Command busclass CommandBus:    def __init__(self):        self._handlers: Dict[Type[Command], CommandHandler] = {}     def register(self, command_type: Type[Command], handler: CommandHandler):        self._handlers[command_type] = handler     async def dispatch(self, command: Command) -> Any:        handler = self._handlers.get(type(command))        if not handler:            raise ValueError(f"No handler for {type(command).__name__}")        return await handler.handle(command)  # Command handler implementationclass CreateOrderHandler(CommandHandler[CreateOrder]):    def __init__(self, order_repository, event_store):        self.order_repository = order_repository        self.event_store = event_store     async def handle(self, command: CreateOrder) -> str:        # Validate        if not command.items:            raise ValueError("Order must have at least one item")         # Create aggregate        order = Order.create(            customer_id=command.customer_id,            items=command.items,            shipping_address=command.shipping_address        )         # Persist events        await self.event_store.append_events(            stream_id=f"Order-{order.id}",            stream_type="Order",            events=order.uncommitted_events        )         return order.id``` ### Template 2: Query Infrastructure ```pythonfrom abc import ABC, abstractmethodfrom dataclasses import dataclassfrom typing import TypeVar, Generic, List, Optional # Query base@dataclassclass Query:    pass  # Concrete queries@dataclassclass GetOrderById(Query):    order_id: str  @dataclassclass GetCustomerOrders(Query):    customer_id: str    status: Optional[str] = None    page: int = 1    page_size: int = 20  @dataclassclass SearchOrders(Query):    query: str    filters: dict = None    sort_by: str = "created_at"    sort_order: str = "desc"  # Query result types@dataclassclass OrderView:    order_id: str    customer_id: str    status: str    total_amount: float    item_count: int    created_at: datetime    shipped_at: Optional[datetime] = None  @dataclassclass PaginatedResult(Generic[T]):    items: List[T]    total: int    page: int    page_size: int     @property    def total_pages(self) -> int:        return (self.total + self.page_size - 1) // self.page_size  # Query handler baseT = TypeVar('T', bound=Query)R = TypeVar('R') class QueryHandler(ABC, Generic[T, R]):    @abstractmethod    async def handle(self, query: T) -> R:        pass  # Query busclass QueryBus:    def __init__(self):        self._handlers: Dict[Type[Query], QueryHandler] = {}     def register(self, query_type: Type[Query], handler: QueryHandler):        self._handlers[query_type] = handler     async def dispatch(self, query: Query) -> Any:        handler = self._handlers.get(type(query))        if not handler:            raise ValueError(f"No handler for {type(query).__name__}")        return await handler.handle(query)  # Query handler implementationclass GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]):    def __init__(self, read_db):        self.read_db = read_db     async def handle(self, query: GetOrderById) -> Optional[OrderView]:        async with self.read_db.acquire() as conn:            row = await conn.fetchrow(                """                SELECT order_id, customer_id, status, total_amount,                       item_count, created_at, shipped_at                FROM order_views                WHERE order_id = $1                """,                query.order_id            )            if row:                return OrderView(**dict(row))            return None  class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]):    def __init__(self, read_db):        self.read_db = read_db     async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:        async with self.read_db.acquire() as conn:            # Build query with optional status filter            where_clause = "customer_id = $1"            params = [query.customer_id]             if query.status:                where_clause += " AND status = $2"                params.append(query.status)             # Get total count            total = await conn.fetchval(                f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",                *params            )             # Get paginated results            offset = (query.page - 1) * query.page_size            rows = await conn.fetch(                f"""                SELECT order_id, customer_id, status, total_amount,                       item_count, created_at, shipped_at                FROM order_views                WHERE {where_clause}                ORDER BY created_at DESC                LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}                """,                *params, query.page_size, offset            )             return PaginatedResult(                items=[OrderView(**dict(row)) for row in rows],                total=total,                page=query.page,                page_size=query.page_size            )``` ### Template 3: FastAPI CQRS Application ```pythonfrom fastapi import FastAPI, HTTPException, Dependsfrom pydantic import BaseModelfrom typing import List, Optional app = FastAPI() # Request/Response modelsclass CreateOrderRequest(BaseModel):    customer_id: str    items: List[dict]    shipping_address: dict  class OrderResponse(BaseModel):    order_id: str    customer_id: str    status: str    total_amount: float    item_count: int    created_at: datetime  # Dependency injectiondef get_command_bus() -> CommandBus:    return app.state.command_bus  def get_query_bus() -> QueryBus:    return app.state.query_bus  # Command endpoints (POST, PUT, DELETE)@app.post("/orders", response_model=dict)async def create_order(    request: CreateOrderRequest,    command_bus: CommandBus = Depends(get_command_bus)):    command = CreateOrder(        customer_id=request.customer_id,        items=request.items,        shipping_address=request.shipping_address    )    order_id = await command_bus.dispatch(command)    return {"order_id": order_id}  @app.post("/orders/{order_id}/items")async def add_item(    order_id: str,    product_id: str,    quantity: int,    price: float,    command_bus: CommandBus = Depends(get_command_bus)):    command = AddOrderItem(        order_id=order_id,        product_id=product_id,        quantity=quantity,        price=price    )    await command_bus.dispatch(command)    return {"status": "item_added"}  @app.delete("/orders/{order_id}")async def cancel_order(    order_id: str,    reason: str,    command_bus: CommandBus = Depends(get_command_bus)):    command = CancelOrder(order_id=order_id, reason=reason)    await command_bus.dispatch(command)    return {"status": "cancelled"}  # Query endpoints (GET)@app.get("/orders/{order_id}", response_model=OrderResponse)async def get_order(    order_id: str,    query_bus: QueryBus = Depends(get_query_bus)):    query = GetOrderById(order_id=order_id)    result = await query_bus.dispatch(query)    if not result:        raise HTTPException(status_code=404, detail="Order not found")    return result  @app.get("/customers/{customer_id}/orders")async def get_customer_orders(    customer_id: str,    status: Optional[str] = None,    page: int = 1,    page_size: int = 20,    query_bus: QueryBus = Depends(get_query_bus)):    query = GetCustomerOrders(        customer_id=customer_id,        status=status,        page=page,        page_size=page_size    )    return await query_bus.dispatch(query)  @app.get("/orders/search")async def search_orders(    q: str,    sort_by: str = "created_at",    query_bus: QueryBus = Depends(get_query_bus)):    query = SearchOrders(query=q, sort_by=sort_by)    return await query_bus.dispatch(query)``` ### Template 4: Read Model Synchronization ```pythonclass ReadModelSynchronizer:    """Keeps read models in sync with events."""     def __init__(self, event_store, read_db, projections: List[Projection]):        self.event_store = event_store        self.read_db = read_db        self.projections = {p.name: p for p in projections}     async def run(self):        """Continuously sync read models."""        while True:            for name, projection in self.projections.items():                await self._sync_projection(projection)            await asyncio.sleep(0.1)     async def _sync_projection(self, projection: Projection):        checkpoint = await self._get_checkpoint(projection.name)         events = await self.event_store.read_all(            from_position=checkpoint,            limit=100        )         for event in events:            if event.event_type in projection.handles():                try:                    await projection.apply(event)                except Exception as e:                    # Log error, possibly retry or skip                    logger.error(f"Projection error: {e}")                    continue             await self._save_checkpoint(projection.name, event.global_position)     async def rebuild_projection(self, projection_name: str):        """Rebuild a projection from scratch."""        projection = self.projections[projection_name]         # Clear existing data        await projection.clear()         # Reset checkpoint        await self._save_checkpoint(projection_name, 0)         # Rebuild        while True:            checkpoint = await self._get_checkpoint(projection_name)            events = await self.event_store.read_all(checkpoint, 1000)             if not events:                break             for event in events:                if event.event_type in projection.handles():                    await projection.apply(event)             await self._save_checkpoint(                projection_name,                events[-1].global_position            )``` ### Template 5: Eventual Consistency Handling ```pythonclass ConsistentQueryHandler:    """Query handler that can wait for consistency."""     def __init__(self, read_db, event_store):        self.read_db = read_db        self.event_store = event_store     async def query_after_command(        self,        query: Query,        expected_version: int,        stream_id: str,        timeout: float = 5.0    ):        """        Execute query, ensuring read model is at expected version.        Used for read-your-writes consistency.        """        start_time = time.time()         while time.time() - start_time < timeout:            # Check if read model is caught up            projection_version = await self._get_projection_version(stream_id)             if projection_version >= expected_version:                return await self.execute_query(query)             # Wait a bit and retry            await asyncio.sleep(0.1)         # Timeout - return stale data with warning        return {            "data": await self.execute_query(query),            "_warning": "Data may be stale"        }     async def _get_projection_version(self, stream_id: str) -> int:        """Get the last processed event version for a stream."""        async with self.read_db.acquire() as conn:            return await conn.fetchval(                "SELECT last_event_version FROM projection_state WHERE stream_id = $1",                stream_id            ) or 0``` ## Best Practices ### Do's - **Separate command and query models** - Different needs- **Use eventual consistency** - Accept propagation delay- **Validate in command handlers** - Before state change- **Denormalize read models** - Optimize for queries- **Version your events** - For schema evolution ### Don'ts - **Don't query in commands** - Use only for writes- **Don't couple read/write schemas** - Independent evolution- **Don't over-engineer** - Start simple- **Don't ignore consistency SLAs** - Define acceptable lag