npx skills add https://github.com/wshobson/agents --skill microservices-patternsHow Microservices Patterns fits into a Paperclip company.
Microservices Patterns drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.
Pre-configured AI company — 18 agents, 18 skills, one-time purchase.
SKILL.md564 linesExpandCollapse
---name: microservices-patternsdescription: Design microservices architectures with service boundaries, event-driven communication, and resilience patterns. Use when building distributed systems, decomposing monoliths, or implementing microservices.--- # Microservices Patterns Master microservices architecture patterns including service boundaries, inter-service communication, data management, and resilience patterns for building distributed systems. ## When to Use This Skill - Decomposing monoliths into microservices- Designing service boundaries and contracts- Implementing inter-service communication- Managing distributed data and transactions- Building resilient distributed systems- Implementing service discovery and load balancing- Designing event-driven architectures ## Core Concepts ### 1. Service Decomposition Strategies **By Business Capability** - Organize services around business functions- Each service owns its domain- Example: OrderService, PaymentService, InventoryService **By Subdomain (DDD)** - Core domain, supporting subdomains- Bounded contexts map to services- Clear ownership and responsibility **Strangler Fig Pattern** - Gradually extract from monolith- New functionality as microservices- Proxy routes to old/new systems ### 2. Communication Patterns **Synchronous (Request/Response)** - REST APIs- gRPC- GraphQL **Asynchronous (Events/Messages)** - Event streaming (Kafka)- Message queues (RabbitMQ, SQS)- Pub/Sub patterns ### 3. Data Management **Database Per Service** - Each service owns its data- No shared databases- Loose coupling **Saga Pattern** - Distributed transactions- Compensating actions- Eventual consistency ### 4. Resilience Patterns **Circuit Breaker** - Fail fast on repeated errors- Prevent cascade failures **Retry with Backoff** - Transient fault handling- Exponential backoff **Bulkhead** - Isolate resources- Limit impact of failures ## Service Decomposition Patterns ### Pattern 1: By Business Capability ```python# E-commerce example # Order Serviceclass OrderService: """Handles order lifecycle.""" async def create_order(self, order_data: dict) -> Order: order = Order.create(order_data) # Publish event for other services await self.event_bus.publish( OrderCreatedEvent( order_id=order.id, customer_id=order.customer_id, items=order.items, total=order.total ) ) return order # Payment Service (separate service)class PaymentService: """Handles payment processing.""" async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult: # Process payment result = await self.payment_gateway.charge( amount=payment_request.amount, customer=payment_request.customer_id ) if result.success: await self.event_bus.publish( PaymentCompletedEvent( order_id=payment_request.order_id, transaction_id=result.transaction_id ) ) return result # Inventory Service (separate service)class InventoryService: """Handles inventory management.""" async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult: # Check availability for item in items: available = await self.inventory_repo.get_available(item.product_id) if available < item.quantity: return ReservationResult( success=False, error=f"Insufficient inventory for {item.product_id}" ) # Reserve items reservation = await self.create_reservation(order_id, items) await self.event_bus.publish( InventoryReservedEvent( order_id=order_id, reservation_id=reservation.id ) ) return ReservationResult(success=True, reservation=reservation)``` ### Pattern 2: API Gateway ```pythonfrom fastapi import FastAPI, HTTPException, Dependsimport httpxfrom circuitbreaker import circuit app = FastAPI() class APIGateway: """Central entry point for all client requests.""" def __init__(self): self.order_service_url = "http://order-service:8000" self.payment_service_url = "http://payment-service:8001" self.inventory_service_url = "http://inventory-service:8002" self.http_client = httpx.AsyncClient(timeout=5.0) @circuit(failure_threshold=5, recovery_timeout=30) async def call_order_service(self, path: str, method: str = "GET", **kwargs): """Call order service with circuit breaker.""" response = await self.http_client.request( method, f"{self.order_service_url}{path}", **kwargs ) response.raise_for_status() return response.json() async def create_order_aggregate(self, order_id: str) -> dict: """Aggregate data from multiple services.""" # Parallel requests order, payment, inventory = await asyncio.gather( self.call_order_service(f"/orders/{order_id}"), self.call_payment_service(f"/payments/order/{order_id}"), self.call_inventory_service(f"/reservations/order/{order_id}"), return_exceptions=True ) # Handle partial failures result = {"order": order} if not isinstance(payment, Exception): result["payment"] = payment if not isinstance(inventory, Exception): result["inventory"] = inventory return result @app.post("/api/orders")async def create_order( order_data: dict, gateway: APIGateway = Depends()): """API Gateway endpoint.""" try: # Route to order service order = await gateway.call_order_service( "/orders", method="POST", json=order_data ) return {"order": order} except httpx.HTTPError as e: raise HTTPException(status_code=503, detail="Order service unavailable")``` ## Communication Patterns ### Pattern 1: Synchronous REST Communication ```python# Service A calls Service Bimport httpxfrom tenacity import retry, stop_after_attempt, wait_exponential class ServiceClient: """HTTP client with retries and timeout.""" def __init__(self, base_url: str): self.base_url = base_url self.client = httpx.AsyncClient( timeout=httpx.Timeout(5.0, connect=2.0), limits=httpx.Limits(max_keepalive_connections=20) ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) async def get(self, path: str, **kwargs): """GET with automatic retries.""" response = await self.client.get(f"{self.base_url}{path}", **kwargs) response.raise_for_status() return response.json() async def post(self, path: str, **kwargs): """POST request.""" response = await self.client.post(f"{self.base_url}{path}", **kwargs) response.raise_for_status() return response.json() # Usagepayment_client = ServiceClient("http://payment-service:8001")result = await payment_client.post("/payments", json=payment_data)``` ### Pattern 2: Asynchronous Event-Driven ```python# Event-driven communication with Kafkafrom aiokafka import AIOKafkaProducer, AIOKafkaConsumerimport jsonfrom dataclasses import dataclass, asdictfrom datetime import datetime @dataclassclass DomainEvent: event_id: str event_type: str aggregate_id: str occurred_at: datetime data: dict class EventBus: """Event publishing and subscription.""" def __init__(self, bootstrap_servers: List[str]): self.bootstrap_servers = bootstrap_servers self.producer = None async def start(self): self.producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode() ) await self.producer.start() async def publish(self, event: DomainEvent): """Publish event to Kafka topic.""" topic = event.event_type await self.producer.send_and_wait( topic, value=asdict(event), key=event.aggregate_id.encode() ) async def subscribe(self, topic: str, handler: callable): """Subscribe to events.""" consumer = AIOKafkaConsumer( topic, bootstrap_servers=self.bootstrap_servers, value_deserializer=lambda v: json.loads(v.decode()), group_id="my-service" ) await consumer.start() try: async for message in consumer: event_data = message.value await handler(event_data) finally: await consumer.stop() # Order Service publishes eventasync def create_order(order_data: dict): order = await save_order(order_data) event = DomainEvent( event_id=str(uuid.uuid4()), event_type="OrderCreated", aggregate_id=order.id, occurred_at=datetime.now(), data={ "order_id": order.id, "customer_id": order.customer_id, "total": order.total } ) await event_bus.publish(event) # Inventory Service listens for OrderCreatedasync def handle_order_created(event_data: dict): """React to order creation.""" order_id = event_data["data"]["order_id"] items = event_data["data"]["items"] # Reserve inventory await reserve_inventory(order_id, items)``` ### Pattern 3: Saga Pattern (Distributed Transactions) ```python# Saga orchestration for order fulfillmentfrom enum import Enumfrom typing import List, Callable class SagaStep: """Single step in saga.""" def __init__( self, name: str, action: Callable, compensation: Callable ): self.name = name self.action = action self.compensation = compensation class SagaStatus(Enum): PENDING = "pending" COMPLETED = "completed" COMPENSATING = "compensating" FAILED = "failed" class OrderFulfillmentSaga: """Orchestrated saga for order fulfillment.""" def __init__(self): self.steps: List[SagaStep] = [ SagaStep( "create_order", action=self.create_order, compensation=self.cancel_order ), SagaStep( "reserve_inventory", action=self.reserve_inventory, compensation=self.release_inventory ), SagaStep( "process_payment", action=self.process_payment, compensation=self.refund_payment ), SagaStep( "confirm_order", action=self.confirm_order, compensation=self.cancel_order_confirmation ) ] async def execute(self, order_data: dict) -> SagaResult: """Execute saga steps.""" completed_steps = [] context = {"order_data": order_data} try: for step in self.steps: # Execute step result = await step.action(context) if not result.success: # Compensate await self.compensate(completed_steps, context) return SagaResult( status=SagaStatus.FAILED, error=result.error ) completed_steps.append(step) context.update(result.data) return SagaResult(status=SagaStatus.COMPLETED, data=context) except Exception as e: # Compensate on error await self.compensate(completed_steps, context) return SagaResult(status=SagaStatus.FAILED, error=str(e)) async def compensate(self, completed_steps: List[SagaStep], context: dict): """Execute compensating actions in reverse order.""" for step in reversed(completed_steps): try: await step.compensation(context) except Exception as e: # Log compensation failure print(f"Compensation failed for {step.name}: {e}") # Step implementations async def create_order(self, context: dict) -> StepResult: order = await order_service.create(context["order_data"]) return StepResult(success=True, data={"order_id": order.id}) async def cancel_order(self, context: dict): await order_service.cancel(context["order_id"]) async def reserve_inventory(self, context: dict) -> StepResult: result = await inventory_service.reserve( context["order_id"], context["order_data"]["items"] ) return StepResult( success=result.success, data={"reservation_id": result.reservation_id} ) async def release_inventory(self, context: dict): await inventory_service.release(context["reservation_id"]) async def process_payment(self, context: dict) -> StepResult: result = await payment_service.charge( context["order_id"], context["order_data"]["total"] ) return StepResult( success=result.success, data={"transaction_id": result.transaction_id}, error=result.error ) async def refund_payment(self, context: dict): await payment_service.refund(context["transaction_id"])``` ## Resilience Patterns ### Circuit Breaker Pattern ```pythonfrom enum import Enumfrom datetime import datetime, timedeltafrom typing import Callable, Any class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject requests HALF_OPEN = "half_open" # Testing if recovered class CircuitBreaker: """Circuit breaker for service calls.""" def __init__( self, failure_threshold: int = 5, recovery_timeout: int = 30, success_threshold: int = 2 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.success_threshold = success_threshold self.failure_count = 0 self.success_count = 0 self.state = CircuitState.CLOSED self.opened_at = None async def call(self, func: Callable, *args, **kwargs) -> Any: """Execute function with circuit breaker.""" if self.state == CircuitState.OPEN: if self._should_attempt_reset(): self.state = CircuitState.HALF_OPEN else: raise CircuitBreakerOpenError("Circuit breaker is open") try: result = await func(*args, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): """Handle successful call.""" self.failure_count = 0 if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: self.state = CircuitState.CLOSED self.success_count = 0 def _on_failure(self): """Handle failed call.""" self.failure_count += 1 if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN self.opened_at = datetime.now() if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.OPEN self.opened_at = datetime.now() def _should_attempt_reset(self) -> bool: """Check if enough time passed to try again.""" return ( datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout) ) # Usagebreaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30) async def call_payment_service(payment_data: dict): return await breaker.call( payment_client.process_payment, payment_data )```Accessibility Compliance
This walks you through implementing proper WCAG 2.2 compliance with real code patterns for screen readers, keyboard navigation, and mobile accessibility. It cov
Airflow Dag Patterns
If you're building data pipelines with Airflow, this skill gives you production-ready DAG patterns that actually work in the real world. It covers TaskFlow API
Angular Migration
Migrating from AngularJS to Angular is notoriously painful, and this skill tackles the practical stuff that makes or breaks these projects. It covers hybrid app