npx skills add https://github.com/wshobson/agents --skill rust-async-patternsHow Rust Async Patterns fits into a Paperclip company.
Rust Async Patterns drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.
Pre-configured AI company — 18 agents, 18 skills, one-time purchase.
SKILL.md513 linesExpandCollapse
---name: rust-async-patternsdescription: Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code.--- # Rust Async Patterns Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling. ## When to Use This Skill - Building async Rust applications- Implementing concurrent network services- Using Tokio for async I/O- Handling async errors properly- Debugging async code issues- Optimizing async performance ## Core Concepts ### 1. Async Execution Model ```Future (lazy) → poll() → Ready(value) | Pending ↑ ↓ Waker ← Runtime schedules``` ### 2. Key Abstractions | Concept | Purpose || ---------- | ---------------------------------------- || `Future` | Lazy computation that may complete later || `async fn` | Function returning impl Future || `await` | Suspend until future completes || `Task` | Spawned future running concurrently || `Runtime` | Executor that polls futures | ## Quick Start ```toml# Cargo.toml[dependencies]tokio = { version = "1", features = ["full"] }futures = "0.3"async-trait = "0.1"anyhow = "1.0"tracing = "0.1"tracing-subscriber = "0.3"``` ```rustuse tokio::time::{sleep, Duration};use anyhow::Result; #[tokio::main]async fn main() -> Result<()> { // Initialize tracing tracing_subscriber::fmt::init(); // Async operations let result = fetch_data("https://api.example.com").await?; println!("Got: {}", result); Ok(())} async fn fetch_data(url: &str) -> Result<String> { // Simulated async operation sleep(Duration::from_millis(100)).await; Ok(format!("Data from {}", url))}``` ## Patterns ### Pattern 1: Concurrent Task Execution ```rustuse tokio::task::JoinSet;use anyhow::Result; // Spawn multiple concurrent tasksasync fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> { let mut set = JoinSet::new(); for url in urls { set.spawn(async move { fetch_data(&url).await }); } let mut results = Vec::new(); while let Some(res) = set.join_next().await { match res { Ok(Ok(data)) => results.push(data), Ok(Err(e)) => tracing::error!("Task failed: {}", e), Err(e) => tracing::error!("Join error: {}", e), } } Ok(results)} // With concurrency limituse futures::stream::{self, StreamExt}; async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> { stream::iter(urls) .map(|url| async move { fetch_data(&url).await }) .buffer_unordered(limit) // Max concurrent tasks .collect() .await} // Select first to completeuse tokio::select; async fn race_requests(url1: &str, url2: &str) -> Result<String> { select! { result = fetch_data(url1) => result, result = fetch_data(url2) => result, }}``` ### Pattern 2: Channels for Communication ```rustuse tokio::sync::{mpsc, broadcast, oneshot, watch}; // Multi-producer, single-consumerasync fn mpsc_example() { let (tx, mut rx) = mpsc::channel::<String>(100); // Spawn producer let tx2 = tx.clone(); tokio::spawn(async move { tx2.send("Hello".to_string()).await.unwrap(); }); // Consume while let Some(msg) = rx.recv().await { println!("Got: {}", msg); }} // Broadcast: multi-producer, multi-consumerasync fn broadcast_example() { let (tx, _) = broadcast::channel::<String>(100); let mut rx1 = tx.subscribe(); let mut rx2 = tx.subscribe(); tx.send("Event".to_string()).unwrap(); // Both receivers get the message let _ = rx1.recv().await; let _ = rx2.recv().await;} // Oneshot: single value, single useasync fn oneshot_example() -> String { let (tx, rx) = oneshot::channel::<String>(); tokio::spawn(async move { tx.send("Result".to_string()).unwrap(); }); rx.await.unwrap()} // Watch: single producer, multi-consumer, latest valueasync fn watch_example() { let (tx, mut rx) = watch::channel("initial".to_string()); tokio::spawn(async move { loop { // Wait for changes rx.changed().await.unwrap(); println!("New value: {}", *rx.borrow()); } }); tx.send("updated".to_string()).unwrap();}``` ### Pattern 3: Async Error Handling ```rustuse anyhow::{Context, Result, bail};use thiserror::Error; #[derive(Error, Debug)]pub enum ServiceError { #[error("Network error: {0}")] Network(#[from] reqwest::Error), #[error("Database error: {0}")] Database(#[from] sqlx::Error), #[error("Not found: {0}")] NotFound(String), #[error("Timeout after {0:?}")] Timeout(std::time::Duration),} // Using anyhow for application errorsasync fn process_request(id: &str) -> Result<Response> { let data = fetch_data(id) .await .context("Failed to fetch data")?; let parsed = parse_response(&data) .context("Failed to parse response")?; Ok(parsed)} // Using custom errors for library codeasync fn get_user(id: &str) -> Result<User, ServiceError> { let result = db.query(id).await?; match result { Some(user) => Ok(user), None => Err(ServiceError::NotFound(id.to_string())), }} // Timeout wrapperuse tokio::time::timeout; async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>where F: std::future::Future<Output = Result<T, ServiceError>>,{ timeout(duration, future) .await .map_err(|_| ServiceError::Timeout(duration))?}``` ### Pattern 4: Graceful Shutdown ```rustuse tokio::signal;use tokio::sync::broadcast;use tokio_util::sync::CancellationToken; async fn run_server() -> Result<()> { // Method 1: CancellationToken let token = CancellationToken::new(); let token_clone = token.clone(); // Spawn task that respects cancellation tokio::spawn(async move { loop { tokio::select! { _ = token_clone.cancelled() => { tracing::info!("Task shutting down"); break; } _ = do_work() => {} } } }); // Wait for shutdown signal signal::ctrl_c().await?; tracing::info!("Shutdown signal received"); // Cancel all tasks token.cancel(); // Give tasks time to cleanup tokio::time::sleep(Duration::from_secs(5)).await; Ok(())} // Method 2: Broadcast channel for shutdownasync fn run_with_broadcast() -> Result<()> { let (shutdown_tx, _) = broadcast::channel::<()>(1); let mut rx = shutdown_tx.subscribe(); tokio::spawn(async move { tokio::select! { _ = rx.recv() => { tracing::info!("Received shutdown"); } _ = async { loop { do_work().await } } => {} } }); signal::ctrl_c().await?; let _ = shutdown_tx.send(()); Ok(())}``` ### Pattern 5: Async Traits ```rustuse async_trait::async_trait; #[async_trait]pub trait Repository { async fn get(&self, id: &str) -> Result<Entity>; async fn save(&self, entity: &Entity) -> Result<()>; async fn delete(&self, id: &str) -> Result<()>;} pub struct PostgresRepository { pool: sqlx::PgPool,} #[async_trait]impl Repository for PostgresRepository { async fn get(&self, id: &str) -> Result<Entity> { sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id) .fetch_one(&self.pool) .await .map_err(Into::into) } async fn save(&self, entity: &Entity) -> Result<()> { sqlx::query!( "INSERT INTO entities (id, data) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET data = $2", entity.id, entity.data ) .execute(&self.pool) .await?; Ok(()) } async fn delete(&self, id: &str) -> Result<()> { sqlx::query!("DELETE FROM entities WHERE id = $1", id) .execute(&self.pool) .await?; Ok(()) }} // Trait object usageasync fn process(repo: &dyn Repository, id: &str) -> Result<()> { let entity = repo.get(id).await?; // Process... repo.save(&entity).await}``` ### Pattern 6: Streams and Async Iteration ```rustuse futures::stream::{self, Stream, StreamExt};use async_stream::stream; // Create stream from async iteratorfn numbers_stream() -> impl Stream<Item = i32> { stream! { for i in 0..10 { tokio::time::sleep(Duration::from_millis(100)).await; yield i; } }} // Process streamasync fn process_stream() { let stream = numbers_stream(); // Map and filter let processed: Vec<_> = stream .filter(|n| futures::future::ready(*n % 2 == 0)) .map(|n| n * 2) .collect() .await; println!("{:?}", processed);} // Chunked processingasync fn process_in_chunks() { let stream = numbers_stream(); let mut chunks = stream.chunks(3); while let Some(chunk) = chunks.next().await { println!("Processing chunk: {:?}", chunk); }} // Merge multiple streamsasync fn merge_streams() { let stream1 = numbers_stream(); let stream2 = numbers_stream(); let merged = stream::select(stream1, stream2); merged .for_each(|n| async move { println!("Got: {}", n); }) .await;}``` ### Pattern 7: Resource Management ```rustuse std::sync::Arc;use tokio::sync::{Mutex, RwLock, Semaphore}; // Shared state with RwLock (prefer for read-heavy)struct Cache { data: RwLock<HashMap<String, String>>,} impl Cache { async fn get(&self, key: &str) -> Option<String> { self.data.read().await.get(key).cloned() } async fn set(&self, key: String, value: String) { self.data.write().await.insert(key, value); }} // Connection pool with semaphorestruct Pool { semaphore: Semaphore, connections: Mutex<Vec<Connection>>,} impl Pool { fn new(size: usize) -> Self { Self { semaphore: Semaphore::new(size), connections: Mutex::new((0..size).map(|_| Connection::new()).collect()), } } async fn acquire(&self) -> PooledConnection<'_> { let permit = self.semaphore.acquire().await.unwrap(); let conn = self.connections.lock().await.pop().unwrap(); PooledConnection { pool: self, conn: Some(conn), _permit: permit } }} struct PooledConnection<'a> { pool: &'a Pool, conn: Option<Connection>, _permit: tokio::sync::SemaphorePermit<'a>,} impl Drop for PooledConnection<'_> { fn drop(&mut self) { if let Some(conn) = self.conn.take() { let pool = self.pool; tokio::spawn(async move { pool.connections.lock().await.push(conn); }); } }}``` ## Debugging Tips ```rust// Enable tokio-console for runtime debugging// Cargo.toml: tokio = { features = ["tracing"] }// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run// Then: tokio-console // Instrument async functionsuse tracing::instrument; #[instrument(skip(pool))]async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> { tracing::debug!("Fetching user"); // ...} // Track task spawninglet span = tracing::info_span!("worker", id = %worker_id);tokio::spawn(async move { // Enters span when polled}.instrument(span));``` ## Best Practices ### Do's - **Use `tokio::select!`** - For racing futures- **Prefer channels** - Over shared state when possible- **Use `JoinSet`** - For managing multiple tasks- **Instrument with tracing** - For debugging async code- **Handle cancellation** - Check `CancellationToken` ### Don'ts - **Don't block** - Never use `std::thread::sleep` in async- **Don't hold locks across awaits** - Causes deadlocks- **Don't spawn unboundedly** - Use semaphores for limits- **Don't ignore errors** - Propagate with `?` or log- **Don't forget Send bounds** - For spawned futuresAccessibility Compliance
This walks you through implementing proper WCAG 2.2 compliance with real code patterns for screen readers, keyboard navigation, and mobile accessibility. It cov
Airflow Dag Patterns
If you're building data pipelines with Airflow, this skill gives you production-ready DAG patterns that actually work in the real world. It covers TaskFlow API
Angular Migration
Migrating from AngularJS to Angular is notoriously painful, and this skill tackles the practical stuff that makes or breaks these projects. It covers hybrid app