Back to Backend Development

rust-async-patterns

RustasyncTokioconcurrencyerror handlingbackend developmentperformancesystem programming
⭐ 36.8kπŸ“„ MITπŸ•’ 2026-06-16Source β†—

Install this skill

npx skills add wshobson/agents

Works across Claude Code, Cursor, Codex, Copilot & Antigravity

The rust-async-patterns skill provides a structured approach to managing non-blocking execution using the Tokio runtime. It covers the mechanics of future polling and state transitions, moving beyond simple async fn syntax into memory-safe concurrency. Developers gain command over task orchestration, inter-task communication via typed channels, and strategies for backpressure and error propagation. By utilizing tools like JoinSet for parallel execution and specialized channels like oneshot or broadcast, this skill minimizes common deadlocks and resource leaks in network-heavy applications. It emphasizes predictable behavior for state sharing and cleanup in long-running services. This repository of patterns ensures that developers maintain clear control over task lifecycles, ensuring that background workers do not outlive their context or starve the underlying executor of CPU time.

When to Use This Skill

  • β€’Building high-throughput microservices that fetch data from multiple external APIs concurrently
  • β€’Developing event-driven systems requiring state synchronization between different background workers
  • β€’Implementing server components that prioritize the fastest response among several redundant providers
  • β€’Standardizing error handling across distributed system boundaries

How to Invoke This Skill

Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:

  • β€œShow me how to run multiple tokio tasks in parallel
  • β€œHow do I communicate between async tasks in Rust
  • β€œImplement a concurrency limit for my async HTTP requests
  • β€œWhat is the best way to handle errors in async Rust code
  • β€œExplain how to use JoinSet for task management

Pro Tips

  • πŸ’‘Always design your async boundaries carefully: prefer `spawn` for top-level tasks and `join!` for awaiting multiple related futures.
  • πŸ’‘Leverage structured concurrency patterns (like `tokio::select!`) to manage the lifecycle of concurrent operations and prevent leaked tasks.
  • πŸ’‘Utilize `tracing` for observable async code; it's invaluable for debugging complex concurrent flows and understanding performance bottlenecks.

What this skill does

  • β€’Orchestrating concurrent execution with JoinSet and select!
  • β€’Implementing communication protocols using MPSC, broadcast, and oneshot channels
  • β€’Managing state propagation via watch channels
  • β€’Structuring complex error types with thiserror and anyhow
  • β€’Limiting request throughput using buffer_unordered

When not to use it

  • βœ•Small CLI tools where simple blocking I/O is sufficient and easier to maintain
  • βœ•Applications targeting constrained embedded hardware where the Tokio runtime footprint is too large

Example workflow

  1. Define a structured error enum using thiserror for domain-specific faults
  2. Initialize the Tokio runtime with tracing for observability
  3. Spawn distinct task groups using JoinSet to handle parallel API requests
  4. Use a channel to aggregate successful results back to the main thread
  5. Select over the result channel to implement a request timeout or race condition

Prerequisites

  • –Rust toolchain installed
  • –Working knowledge of Future trait mechanics
  • –Familiarity with Cargo.toml dependency management

Pitfalls & limitations

  • !Forgetting to await a spawned task, which leaves it orphaned
  • !Unbounded channel usage leading to uncontrolled memory growth under load
  • !Blocking the executor thread with heavy CPU-bound computations

FAQ

Why use JoinSet over creating multiple tokio::spawn handles?
JoinSet simplifies task collection and allows for cleaner cleanup and completion management compared to manually tracking a vector of JoinHandles.
What is the difference between broadcast and mpsc channels?
MPSC is for single-consumer workloads where each message goes to one recipient, while broadcast allows multiple listeners to receive copies of the same message.
Does async Rust guarantee that all tasks will finish?
No, if the main function or the runtime terminates, all spawned tasks will be dropped immediately. Use clean-up patterns if tasks must finish.

How it compares

Generic prompts often suggest basic spawn/await syntax, whereas these patterns provide production-hardened orchestration techniques for error handling and concurrency limits.

Source & trust

⭐ 37k starsπŸ“„ MITπŸ•’ Updated 2026-06-16
πŸ“„ Full skill instructions β€” original source: wshobson/agents
# Rust Async Patterns

Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.

## When to Use This Skill

- Building async Rust applications
- Implementing concurrent network services
- Using Tokio for async I/O
- Handling async errors properly
- Debugging async code issues
- Optimizing async performance

## Core Concepts

### 1. Async Execution Model

Future (lazy) β†’ poll() β†’ Ready(value) | Pending
↑ ↓
Waker ← Runtime schedules


### 2. Key Abstractions

| Concept | Purpose |
| ---------- | ---------------------------------------- |
| Future | Lazy computation that may complete later |
| async fn | Function returning impl Future |
| await | Suspend until future completes |
| Task | Spawned future running concurrently |
| Runtime | Executor that polls futures |

## Quick Start

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"


use tokio::time::{sleep, Duration};
use anyhow::Result;

#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing
tracing_subscriber::fmt::init();

// Async operations
let result = fetch_data("https://api.example.com").await?;
println!("Got: {}", result);

Ok(())
}

async fn fetch_data(url: &str) -> Result<String> {
// Simulated async operation
sleep(Duration::from_millis(100)).await;
Ok(format!("Data from {}", url))
}


## Patterns

### Pattern 1: Concurrent Task Execution

use tokio::task::JoinSet;
use anyhow::Result;

// Spawn multiple concurrent tasks
async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
let mut set = JoinSet::new();

for url in urls {
set.spawn(async move {
fetch_data(&url).await
});
}

let mut results = Vec::new();
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => tracing::error!("Task failed: {}", e),
Err(e) => tracing::error!("Join error: {}", e),
}
}

Ok(results)
}

// With concurrency limit
use futures::stream::{self, StreamExt};

async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
stream::iter(urls)
.map(|url| async move { fetch_data(&url).await })
.buffer_unordered(limit) // Max concurrent tasks
.collect()
.await
}

// Select first to complete
use tokio::select;

async fn race_requests(url1: &str, url2: &str) -> Result<String> {
select! {
result = fetch_data(url1) => result,
result = fetch_data(url2) => result,
}
}


### Pattern 2: Channels for Communication

use tokio::sync::{mpsc, broadcast, oneshot, watch};

// Multi-producer, single-consumer
async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel::<String>(100);

// Spawn producer
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send("Hello".to_string()).await.unwrap();
});

// Consume
while let Some(msg) = rx.recv().await {
println!("Got: {}", msg);
}
}

// Broadcast: multi-producer, multi-consumer
async fn broadcast_example() {
let (tx, _) = broadcast::channel::<String>(100);

let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();

tx.send("Event".to_string()).unwrap();

// Both receivers get the message
let _ = rx1.recv().await;
let _ = rx2.recv().await;
}

// Oneshot: single value, single use
async fn oneshot_example() -> String {
let (tx, rx) = oneshot::channel::<String>();

tokio::spawn(async move {
tx.send("Result".to_string()).unwrap();
});

rx.await.unwrap()
}

// Watch: single producer, multi-consumer, latest value
async fn watch_example() {
let (tx, mut rx) = watch::channel("initial".to_string());

tokio::spawn(async move {
loop {
// Wait for changes
rx.changed().await.unwrap();
println!("New value: {}", *rx.borrow());
}
});

tx.send("updated".to_string()).unwrap();
}


### Pattern 3: Async Error Handling

use anyhow::{Context, Result, bail};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum ServiceError {
#[error("Network error: {0}")]
Network(#[from] reqwest::Error),

#[error("Database error: {0}")]
Database(#[from] sqlx::Error),

#[error("Not found: {0}")]
NotFound(String),

#[error("Timeout after {0:?}")]
Timeout(std::time::Duration),
}

// Using anyhow for application errors
async fn process_request(id: &str) -> Result<Response> {
let data = fetch_data(id)
.await
.context("Failed to fetch data")?;

let parsed = parse_response(&data)
.context("Failed to parse response")?;

Ok(parsed)
}

// Using custom errors for library code
async fn get_user(id: &str) -> Result<User, ServiceError> {
let result = db.query(id).await?;

match result {
Some(user) => Ok(user),
None => Err(ServiceError::NotFound(id.to_string())),
}
}

// Timeout wrapper
use tokio::time::timeout;

async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
where
F: std::future::Future<Output = Result<T, ServiceError>>,
{
timeout(duration, future)
.await
.map_err(|_| ServiceError::Timeout(duration))?
}


### Pattern 4: Graceful Shutdown

use tokio::signal;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;

async fn run_server() -> Result<()> {
// Method 1: CancellationToken
let token = CancellationToken::new();
let token_clone = token.clone();

// Spawn task that respects cancellation
tokio::spawn(async move {
loop {
tokio::select! {
_ = token_clone.cancelled() => {
tracing::info!("Task shutting down");
break;
}
_ = do_work() => {}
}
}
});

// Wait for shutdown signal
signal::ctrl_c().await?;
tracing::info!("Shutdown signal received");

// Cancel all tasks
token.cancel();

// Give tasks time to cleanup
tokio::time::sleep(Duration::from_secs(5)).await;

Ok(())
}

// Method 2: Broadcast channel for shutdown
async fn run_with_broadcast() -> Result<()> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);

let mut rx = shutdown_tx.subscribe();
tokio::spawn(async move {
tokio::select! {
_ = rx.recv() => {
tracing::info!("Received shutdown");
}
_ = async { loop { do_work().await } } => {}
}
});

signal::ctrl_c().await?;
let _ = shutdown_tx.send(());

Ok(())
}


### Pattern 5: Async Traits

use async_trait::async_trait;

#[async_trait]
pub trait Repository {
async fn get(&self, id: &str) -> Result<Entity>;
async fn save(&self, entity: &Entity) -> Result<()>;
async fn delete(&self, id: &str) -> Result<()>;
}

pub struct PostgresRepository {
pool: sqlx::PgPool,
}

#[async_trait]
impl Repository for PostgresRepository {
async fn get(&self, id: &str) -> Result<Entity> {
sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
.fetch_one(&self.pool)
.await
.map_err(Into::into)
}

async fn save(&self, entity: &Entity) -> Result<()> {
sqlx::query!(
"INSERT INTO entities (id, data) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET data = $2",
entity.id,
entity.data
)
.execute(&self.pool)
.await?;
Ok(())
}

async fn delete(&self, id: &str) -> Result<()> {
sqlx::query!("DELETE FROM entities WHERE id = $1", id)
.execute(&self.pool)
.await?;
Ok(())
}
}

// Trait object usage
async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
let entity = repo.get(id).await?;
// Process...
repo.save(&entity).await
}


### Pattern 6: Streams and Async Iteration

use futures::stream::{self, Stream, StreamExt};
use async_stream::stream;

// Create stream from async iterator
fn numbers_stream() -> impl Stream<Item = i32> {
stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
yield i;
}
}
}

// Process stream
async fn process_stream() {
let stream = numbers_stream();

// Map and filter
let processed: Vec<_> = stream
.filter(|n| futures::future::ready(*n % 2 == 0))
.map(|n| n * 2)
.collect()
.await;

println!("{:?}", processed);
}

// Chunked processing
async fn process_in_chunks() {
let stream = numbers_stream();

let mut chunks = stream.chunks(3);

while let Some(chunk) = chunks.next().await {
println!("Processing chunk: {:?}", chunk);
}
}

// Merge multiple streams
async fn merge_streams() {
let stream1 = numbers_stream();
let stream2 = numbers_stream();

let merged = stream::select(stream1, stream2);

merged
.for_each(|n| async move {
println!("Got: {}", n);
})
.await;
}


### Pattern 7: Resource Management

use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore};

// Shared state with RwLock (prefer for read-heavy)
struct Cache {
data: RwLock<HashMap<String, String>>,
}

impl Cache {
async fn get(&self, key: &str) -> Option<String> {
self.data.read().await.get(key).cloned()
}

async fn set(&self, key: String, value: String) {
self.data.write().await.insert(key, value);
}
}

// Connection pool with semaphore
struct Pool {
semaphore: Semaphore,
connections: Mutex<Vec<Connection>>,
}

impl Pool {
fn new(size: usize) -> Self {
Self {
semaphore: Semaphore::new(size),
connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
}
}

async fn acquire(&self) -> PooledConnection<'_> {
let permit = self.semaphore.acquire().await.unwrap();
let conn = self.connections.lock().await.pop().unwrap();
PooledConnection { pool: self, conn: Some(conn), _permit: permit }
}
}

struct PooledConnection<'a> {
pool: &'a Pool,
conn: Option<Connection>,
_permit: tokio::sync::SemaphorePermit<'a>,
}

impl Drop for PooledConnection<'_> {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let pool = self.pool;
tokio::spawn(async move {
pool.connections.lock().await.push(conn);
});
}
}
}


## Debugging Tips

// Enable tokio-console for runtime debugging
// Cargo.toml: tokio = { features = ["tracing"] }
// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run
// Then: tokio-console

// Instrument async functions
use tracing::instrument;

#[instrument(skip(pool))]
async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
tracing::debug!("Fetching user");
// ...
}

// Track task spawning
let span = tracing::info_span!("worker", id = %worker_id);
tokio::spawn(async move {
// Enters span when polled
}.instrument(span));


## Best Practices

### Do's

- **Use tokio::select!** - For racing futures
- **Prefer channels** - Over shared state when possible
- **Use JoinSet** - For managing multiple tasks
- **Instrument with tracing** - For debugging async code
- **Handle cancellation** - Check CancellationToken

### Don'ts

- **Don't block** - Never use std::thread::sleep in async
- **Don't hold locks across awaits** - Causes deadlocks
- **Don't spawn unboundedly** - Use semaphores for limits
- **Don't ignore errors** - Propagate with ? or log
- **Don't forget Send bounds** - For spawned futures

## Resources

- [Tokio Tutorial](https://tokio.rs/tokio/tutorial)
- [Async Book](https://rust-lang.github.io/async-book/)
- [Tokio Console](https://github.com/tokio-rs/console)

How to Use This Skill Unit

Option A: Project-Specific (Recommended)

  1. Click "Download" above
  2. In your project, create the directory: .agent/skills/rust-async-patterns/
  3. Save the file as SKILL.md
  4. The agent will automatically discover the skill based on its description.

Option B: Global Installation (All Agents)

Save the file to these locations to make it available across all projects:

  • Claude Code: ~/.claude/skills/wshobson/agents/rust-async-patterns/SKILL.md
  • Cursor: ~/.cursor/skills/wshobson/agents/rust-async-patterns/SKILL.md
  • Antigravity: ~/.gemini/antigravity/skills/wshobson/agents/rust-async-patterns/SKILL.md

πŸš€ Install with CLI:
npx skills add wshobson/agents

Read the Master Guide: Mastering Agent Skills β†’

Recommended Rules

View more rules β†’

Recommended Workflows

View more workflows β†’

Recommended MCP Servers

View more MCP servers β†’

Take It Further

Maximize your productivity with these powerful resources

πŸ“‹

Define Your Standards

Set up coding standards to ensure this workflow produces consistent, high-quality results.

Browse Rules Library
πŸ“–

Master Workflows

Learn how to create custom workflows, use Turbo Mode, and build your automation library.

Complete Guide

How to use this Skill in Claude Code & Cursor

For Claude Code (CLI)

To use this skill in Claude Code, copy the rule content into your project's custom instructions or follow our Add-Skill CLI guide. This ensures Claude follows your standards during every code generation.

For Cursor & Windsurf

For Cursor or Windsurf, individual skills are best used in the "Rules for AI" section. This specific unit helps the agent avoid backend development issues, leading to cleaner, more efficient code.

Why the skill format matters: the standardized Agent Skills format lets your AI agent load detailed instructions only when they are relevant, keeping your prompt clean while improving results.

Source & attribution

This skill is categorized under Backend Development and is published by W. Shobson, maintained in wshobson/agents.

← Browse All Agent Skills
Sponsored AI assistant. Recommendations may be paid.