rust-async-patterns
Install this skill
npx skills add wshobson/agentsWorks across Claude Code, Cursor, Codex, Copilot & Antigravity
The rust-async-patterns skill provides a structured approach to managing non-blocking execution using the Tokio runtime. It covers the mechanics of future polling and state transitions, moving beyond simple async fn syntax into memory-safe concurrency. Developers gain command over task orchestration, inter-task communication via typed channels, and strategies for backpressure and error propagation. By utilizing tools like JoinSet for parallel execution and specialized channels like oneshot or broadcast, this skill minimizes common deadlocks and resource leaks in network-heavy applications. It emphasizes predictable behavior for state sharing and cleanup in long-running services. This repository of patterns ensures that developers maintain clear control over task lifecycles, ensuring that background workers do not outlive their context or starve the underlying executor of CPU time.
When to Use This Skill
- β’Building high-throughput microservices that fetch data from multiple external APIs concurrently
- β’Developing event-driven systems requiring state synchronization between different background workers
- β’Implementing server components that prioritize the fastest response among several redundant providers
- β’Standardizing error handling across distributed system boundaries
How to Invoke This Skill
Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:
- βShow me how to run multiple tokio tasks in parallel
- βHow do I communicate between async tasks in Rust
- βImplement a concurrency limit for my async HTTP requests
- βWhat is the best way to handle errors in async Rust code
- βExplain how to use JoinSet for task management
Pro Tips
- π‘Always design your async boundaries carefully: prefer `spawn` for top-level tasks and `join!` for awaiting multiple related futures.
- π‘Leverage structured concurrency patterns (like `tokio::select!`) to manage the lifecycle of concurrent operations and prevent leaked tasks.
- π‘Utilize `tracing` for observable async code; it's invaluable for debugging complex concurrent flows and understanding performance bottlenecks.
What this skill does
- β’Orchestrating concurrent execution with JoinSet and select!
- β’Implementing communication protocols using MPSC, broadcast, and oneshot channels
- β’Managing state propagation via watch channels
- β’Structuring complex error types with thiserror and anyhow
- β’Limiting request throughput using buffer_unordered
When not to use it
- βSmall CLI tools where simple blocking I/O is sufficient and easier to maintain
- βApplications targeting constrained embedded hardware where the Tokio runtime footprint is too large
Example workflow
- Define a structured error enum using thiserror for domain-specific faults
- Initialize the Tokio runtime with tracing for observability
- Spawn distinct task groups using JoinSet to handle parallel API requests
- Use a channel to aggregate successful results back to the main thread
- Select over the result channel to implement a request timeout or race condition
Prerequisites
- βRust toolchain installed
- βWorking knowledge of Future trait mechanics
- βFamiliarity with Cargo.toml dependency management
Pitfalls & limitations
- !Forgetting to await a spawned task, which leaves it orphaned
- !Unbounded channel usage leading to uncontrolled memory growth under load
- !Blocking the executor thread with heavy CPU-bound computations
FAQ
How it compares
Generic prompts often suggest basic spawn/await syntax, whereas these patterns provide production-hardened orchestration techniques for error handling and concurrency limits.
π Full skill instructions β original source: wshobson/agents
Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.
## When to Use This Skill
- Building async Rust applications
- Implementing concurrent network services
- Using Tokio for async I/O
- Handling async errors properly
- Debugging async code issues
- Optimizing async performance
## Core Concepts
### 1. Async Execution Model
Future (lazy) β poll() β Ready(value) | Pending
β β
Waker β Runtime schedules### 2. Key Abstractions
| Concept | Purpose |
| ---------- | ---------------------------------------- |
|
Future | Lazy computation that may complete later ||
async fn | Function returning impl Future ||
await | Suspend until future completes ||
Task | Spawned future running concurrently ||
Runtime | Executor that polls futures |## Quick Start
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"use tokio::time::{sleep, Duration};
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing
tracing_subscriber::fmt::init();
// Async operations
let result = fetch_data("https://api.example.com").await?;
println!("Got: {}", result);
Ok(())
}
async fn fetch_data(url: &str) -> Result<String> {
// Simulated async operation
sleep(Duration::from_millis(100)).await;
Ok(format!("Data from {}", url))
}## Patterns
### Pattern 1: Concurrent Task Execution
use tokio::task::JoinSet;
use anyhow::Result;
// Spawn multiple concurrent tasks
async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
let mut set = JoinSet::new();
for url in urls {
set.spawn(async move {
fetch_data(&url).await
});
}
let mut results = Vec::new();
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => tracing::error!("Task failed: {}", e),
Err(e) => tracing::error!("Join error: {}", e),
}
}
Ok(results)
}
// With concurrency limit
use futures::stream::{self, StreamExt};
async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
stream::iter(urls)
.map(|url| async move { fetch_data(&url).await })
.buffer_unordered(limit) // Max concurrent tasks
.collect()
.await
}
// Select first to complete
use tokio::select;
async fn race_requests(url1: &str, url2: &str) -> Result<String> {
select! {
result = fetch_data(url1) => result,
result = fetch_data(url2) => result,
}
}### Pattern 2: Channels for Communication
use tokio::sync::{mpsc, broadcast, oneshot, watch};
// Multi-producer, single-consumer
async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel::<String>(100);
// Spawn producer
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send("Hello".to_string()).await.unwrap();
});
// Consume
while let Some(msg) = rx.recv().await {
println!("Got: {}", msg);
}
}
// Broadcast: multi-producer, multi-consumer
async fn broadcast_example() {
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("Event".to_string()).unwrap();
// Both receivers get the message
let _ = rx1.recv().await;
let _ = rx2.recv().await;
}
// Oneshot: single value, single use
async fn oneshot_example() -> String {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tx.send("Result".to_string()).unwrap();
});
rx.await.unwrap()
}
// Watch: single producer, multi-consumer, latest value
async fn watch_example() {
let (tx, mut rx) = watch::channel("initial".to_string());
tokio::spawn(async move {
loop {
// Wait for changes
rx.changed().await.unwrap();
println!("New value: {}", *rx.borrow());
}
});
tx.send("updated".to_string()).unwrap();
}### Pattern 3: Async Error Handling
use anyhow::{Context, Result, bail};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ServiceError {
#[error("Network error: {0}")]
Network(#[from] reqwest::Error),
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Not found: {0}")]
NotFound(String),
#[error("Timeout after {0:?}")]
Timeout(std::time::Duration),
}
// Using anyhow for application errors
async fn process_request(id: &str) -> Result<Response> {
let data = fetch_data(id)
.await
.context("Failed to fetch data")?;
let parsed = parse_response(&data)
.context("Failed to parse response")?;
Ok(parsed)
}
// Using custom errors for library code
async fn get_user(id: &str) -> Result<User, ServiceError> {
let result = db.query(id).await?;
match result {
Some(user) => Ok(user),
None => Err(ServiceError::NotFound(id.to_string())),
}
}
// Timeout wrapper
use tokio::time::timeout;
async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
where
F: std::future::Future<Output = Result<T, ServiceError>>,
{
timeout(duration, future)
.await
.map_err(|_| ServiceError::Timeout(duration))?
}### Pattern 4: Graceful Shutdown
use tokio::signal;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
async fn run_server() -> Result<()> {
// Method 1: CancellationToken
let token = CancellationToken::new();
let token_clone = token.clone();
// Spawn task that respects cancellation
tokio::spawn(async move {
loop {
tokio::select! {
_ = token_clone.cancelled() => {
tracing::info!("Task shutting down");
break;
}
_ = do_work() => {}
}
}
});
// Wait for shutdown signal
signal::ctrl_c().await?;
tracing::info!("Shutdown signal received");
// Cancel all tasks
token.cancel();
// Give tasks time to cleanup
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
// Method 2: Broadcast channel for shutdown
async fn run_with_broadcast() -> Result<()> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let mut rx = shutdown_tx.subscribe();
tokio::spawn(async move {
tokio::select! {
_ = rx.recv() => {
tracing::info!("Received shutdown");
}
_ = async { loop { do_work().await } } => {}
}
});
signal::ctrl_c().await?;
let _ = shutdown_tx.send(());
Ok(())
}### Pattern 5: Async Traits
use async_trait::async_trait;
#[async_trait]
pub trait Repository {
async fn get(&self, id: &str) -> Result<Entity>;
async fn save(&self, entity: &Entity) -> Result<()>;
async fn delete(&self, id: &str) -> Result<()>;
}
pub struct PostgresRepository {
pool: sqlx::PgPool,
}
#[async_trait]
impl Repository for PostgresRepository {
async fn get(&self, id: &str) -> Result<Entity> {
sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
.fetch_one(&self.pool)
.await
.map_err(Into::into)
}
async fn save(&self, entity: &Entity) -> Result<()> {
sqlx::query!(
"INSERT INTO entities (id, data) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET data = $2",
entity.id,
entity.data
)
.execute(&self.pool)
.await?;
Ok(())
}
async fn delete(&self, id: &str) -> Result<()> {
sqlx::query!("DELETE FROM entities WHERE id = $1", id)
.execute(&self.pool)
.await?;
Ok(())
}
}
// Trait object usage
async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
let entity = repo.get(id).await?;
// Process...
repo.save(&entity).await
}### Pattern 6: Streams and Async Iteration
use futures::stream::{self, Stream, StreamExt};
use async_stream::stream;
// Create stream from async iterator
fn numbers_stream() -> impl Stream<Item = i32> {
stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
yield i;
}
}
}
// Process stream
async fn process_stream() {
let stream = numbers_stream();
// Map and filter
let processed: Vec<_> = stream
.filter(|n| futures::future::ready(*n % 2 == 0))
.map(|n| n * 2)
.collect()
.await;
println!("{:?}", processed);
}
// Chunked processing
async fn process_in_chunks() {
let stream = numbers_stream();
let mut chunks = stream.chunks(3);
while let Some(chunk) = chunks.next().await {
println!("Processing chunk: {:?}", chunk);
}
}
// Merge multiple streams
async fn merge_streams() {
let stream1 = numbers_stream();
let stream2 = numbers_stream();
let merged = stream::select(stream1, stream2);
merged
.for_each(|n| async move {
println!("Got: {}", n);
})
.await;
}### Pattern 7: Resource Management
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore};
// Shared state with RwLock (prefer for read-heavy)
struct Cache {
data: RwLock<HashMap<String, String>>,
}
impl Cache {
async fn get(&self, key: &str) -> Option<String> {
self.data.read().await.get(key).cloned()
}
async fn set(&self, key: String, value: String) {
self.data.write().await.insert(key, value);
}
}
// Connection pool with semaphore
struct Pool {
semaphore: Semaphore,
connections: Mutex<Vec<Connection>>,
}
impl Pool {
fn new(size: usize) -> Self {
Self {
semaphore: Semaphore::new(size),
connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
}
}
async fn acquire(&self) -> PooledConnection<'_> {
let permit = self.semaphore.acquire().await.unwrap();
let conn = self.connections.lock().await.pop().unwrap();
PooledConnection { pool: self, conn: Some(conn), _permit: permit }
}
}
struct PooledConnection<'a> {
pool: &'a Pool,
conn: Option<Connection>,
_permit: tokio::sync::SemaphorePermit<'a>,
}
impl Drop for PooledConnection<'_> {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let pool = self.pool;
tokio::spawn(async move {
pool.connections.lock().await.push(conn);
});
}
}
}## Debugging Tips
// Enable tokio-console for runtime debugging
// Cargo.toml: tokio = { features = ["tracing"] }
// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run
// Then: tokio-console
// Instrument async functions
use tracing::instrument;
#[instrument(skip(pool))]
async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
tracing::debug!("Fetching user");
// ...
}
// Track task spawning
let span = tracing::info_span!("worker", id = %worker_id);
tokio::spawn(async move {
// Enters span when polled
}.instrument(span));## Best Practices
### Do's
- **Use
tokio::select!** - For racing futures- **Prefer channels** - Over shared state when possible
- **Use
JoinSet** - For managing multiple tasks- **Instrument with tracing** - For debugging async code
- **Handle cancellation** - Check
CancellationToken### Don'ts
- **Don't block** - Never use
std::thread::sleep in async- **Don't hold locks across awaits** - Causes deadlocks
- **Don't spawn unboundedly** - Use semaphores for limits
- **Don't ignore errors** - Propagate with
? or log- **Don't forget Send bounds** - For spawned futures
## Resources
- [Tokio Tutorial](https://tokio.rs/tokio/tutorial)
- [Async Book](https://rust-lang.github.io/async-book/)
- [Tokio Console](https://github.com/tokio-rs/console)
How to Use This Skill Unit
Option A: Project-Specific (Recommended)
- Click "Download" above
- In your project, create the directory:
.agent/skills/rust-async-patterns/ - Save the file as
SKILL.md - The agent will automatically discover the skill based on its description.
Option B: Global Installation (All Agents)
Save the file to these locations to make it available across all projects:
- Claude Code:
~/.claude/skills/wshobson/agents/rust-async-patterns/SKILL.md - Cursor:
~/.cursor/skills/wshobson/agents/rust-async-patterns/SKILL.md - Antigravity:
~/.gemini/antigravity/skills/wshobson/agents/rust-async-patterns/SKILL.md
π Install with CLI:npx skills add wshobson/agents