async-python-patterns
Install this skill
npx skills add wshobson/agentsWorks across Claude Code, Cursor, Codex, Copilot & Antigravity
The async-python-patterns skill provides the structural foundation for writing non-blocking I/O code in Python. It centers on the asyncio library, enabling execution of concurrent tasks within a single-threaded event loop. By using async/await syntax, developers can suspend long-running operations—such as database queries, network requests, or file system access—without stopping the entire execution flow of the application. This skill covers the lifecycle of coroutines, the scheduling of background tasks, and the management of future objects. It standardizes how developers handle timeouts, catch concurrent exceptions, and orchestrate multiple simultaneous requests using gather or task group logic. Implementing these patterns ensures efficient resource management and responsiveness in I/O-bound systems, preventing the performance bottlenecks common in standard synchronous procedural code.
When to Use This Skill
- •Building high-concurrency web API backends
- •Writing efficient network scrapers for data collection
- •Developing real-time communication via WebSocket servers
- •Executing background batch jobs while maintaining application responsiveness
How to Invoke This Skill
Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:
- “how to run multiple async tasks in python
- “implementing asyncio for network requests
- “handling timeouts in python coroutines
- “running concurrent background tasks in python
- “safe error handling with asyncio.gather
Pro Tips
- 💡**Leverage `asyncio.gather()` for Parallel Execution:** When you have multiple independent coroutines, use `asyncio.gather()` to run them concurrently. This significantly reduces total execution time compared to awaiting them sequentially, making your application more efficient.
- 💡**Avoid Blocking Calls in Coroutines:** Never perform blocking I/O (e.g., `requests.get()`, `time.sleep()`) directly within an `async def` function. Instead, wrap them using `loop.run_in_executor()` to offload the blocking work to a separate thread pool, preventing your event loop from freezing.
- 💡**Handle Task Exceptions Gracefully:** Always ensure you await tasks created with `asyncio.create_task()`. Unawaited tasks might swallow exceptions, making debugging difficult. Consider using `try...except` blocks within your coroutines or gathering tasks to catch and manage errors effectively.
What this skill does
- •Orchestrating concurrent execution using gather or create_task
- •Managing resource lifecycles with async context managers
- •Implementing timeout constraints on volatile operations
- •Processing asynchronous streams through async iteration
- •Handling and isolating task exceptions during parallel execution
When not to use it
- ✕CPU-bound computational tasks that require genuine parallelism
- ✕Simple scripts where sequential logic is more readable and performant
Example workflow
- Define async functions for I/O operations
- Initialize the event loop via asyncio.run
- Create tasks for simultaneous execution
- Await individual or batched results
- Clean up resources using async context managers
Prerequisites
- –Foundational knowledge of Python 3.7+ syntax
- –Understanding of basic I/O concepts
Pitfalls & limitations
- !Blocking the event loop with synchronous, CPU-intensive code
- !Forgetting to await tasks, which leaves them orphaned
- !Difficulty debugging stack traces across nested coroutines
FAQ
How it compares
Unlike manual threading, this skill uses a single-threaded cooperative model that avoids the overhead and race conditions associated with preemptive thread management.
📄 Full skill instructions — original source: wshobson/agents
Comprehensive guidance for implementing asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.
## When to Use This Skill
- Building async web APIs (FastAPI, aiohttp, Sanic)
- Implementing concurrent I/O operations (database, file, network)
- Creating web scrapers with concurrent requests
- Developing real-time applications (WebSocket servers, chat systems)
- Processing multiple independent tasks simultaneously
- Building microservices with async communication
- Optimizing I/O-bound workloads
- Implementing async background tasks and queues
## Core Concepts
### 1. Event Loop
The event loop is the heart of asyncio, managing and scheduling asynchronous tasks.
**Key characteristics:**
- Single-threaded cooperative multitasking
- Schedules coroutines for execution
- Handles I/O operations without blocking
- Manages callbacks and futures
### 2. Coroutines
Functions defined with
async def that can be paused and resumed.**Syntax:**
async def my_coroutine():
result = await some_async_operation()
return result### 3. Tasks
Scheduled coroutines that run concurrently on the event loop.
### 4. Futures
Low-level objects representing eventual results of async operations.
### 5. Async Context Managers
Resources that support
async with for proper cleanup.### 6. Async Iterators
Objects that support
async for for iterating over async data sources.## Quick Start
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+
asyncio.run(main())## Fundamental Patterns
### Pattern 1: Basic Async/Await
import asyncio
async def fetch_data(url: str) -> dict:
"""Fetch data from URL asynchronously."""
await asyncio.sleep(1) # Simulate I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())### Pattern 2: Concurrent Execution with gather()
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
"""Fetch user data."""
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""Fetch multiple users concurrently."""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
async def main():
user_ids = [1, 2, 3, 4, 5]
users = await fetch_all_users(user_ids)
print(f"Fetched {len(users)} users")
asyncio.run(main())### Pattern 3: Task Creation and Management
import asyncio
async def background_task(name: str, delay: int):
"""Long-running background task."""
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} completed")
return f"Result from {name}"
async def main():
# Create tasks
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# Do other work
print("Main: doing other work")
await asyncio.sleep(0.5)
# Wait for tasks
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())### Pattern 4: Error Handling in Async Code
import asyncio
from typing import List, Optional
async def risky_operation(item_id: int) -> dict:
"""Operation that might fail."""
await asyncio.sleep(0.1)
if item_id % 3 == 0:
raise ValueError(f"Item {item_id} failed")
return {"id": item_id, "status": "success"}
async def safe_operation(item_id: int) -> Optional[dict]:
"""Wrapper with error handling."""
try:
return await risky_operation(item_id)
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids: List[int]):
"""Process multiple items with error handling."""
tasks = [safe_operation(iid) for iid in item_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out failures
successful = [r for r in results if r is not None and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful
asyncio.run(process_items([1, 2, 3, 4, 5, 6]))### Pattern 5: Timeout Handling
import asyncio
async def slow_operation(delay: int) -> str:
"""Operation that takes time."""
await asyncio.sleep(delay)
return f"Completed after {delay}s"
async def with_timeout():
"""Execute operation with timeout."""
try:
result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())## Advanced Patterns
### Pattern 6: Async Context Managers
import asyncio
from typing import Optional
class AsyncDatabaseConnection:
"""Async database connection context manager."""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection: Optional[object] = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # Simulate connection
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # Simulate cleanup
self.connection = None
async def query_database():
"""Use async context manager."""
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
print(f"Using connection: {conn}")
await asyncio.sleep(0.2) # Simulate query
return {"rows": 10}
asyncio.run(query_database())### Pattern 7: Async Iterators and Generators
import asyncio
from typing import AsyncIterator
async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
"""Async generator that yields numbers with delay."""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""Fetch paginated data asynchronously."""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # Simulate API call
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def consume_async_iterator():
"""Consume async iterator."""
async for number in async_range(1, 5):
print(f"Number: {number}")
print("\nFetching pages:")
async for page_data in fetch_pages("https://api.example.com/items", 3):
print(f"Page {page_data['page']}: {len(page_data['data'])} items")
asyncio.run(consume_async_iterator())### Pattern 8: Producer-Consumer Pattern
import asyncio
from asyncio import Queue
from typing import Optional
async def producer(queue: Queue, producer_id: int, num_items: int):
"""Produce items and put them in queue."""
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # Signal completion
async def consumer(queue: Queue, consumer_id: int):
"""Consume items from queue."""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2) # Simulate work
queue.task_done()
async def producer_consumer_example():
"""Run producer-consumer pattern."""
queue = Queue(maxsize=10)
# Create tasks
producers = [
asyncio.create_task(producer(queue, i, 5))
for i in range(2)
]
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# Wait for producers
await asyncio.gather(*producers)
# Wait for queue to be empty
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
asyncio.run(producer_consumer_example())### Pattern 9: Semaphore for Rate Limiting
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
"""Make API call with rate limiting."""
async with semaphore:
print(f"Calling {url}")
await asyncio.sleep(0.5) # Simulate API call
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
"""Make multiple requests with rate limiting."""
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
results = await rate_limited_requests(urls, max_concurrent=3)
print(f"Completed {len(results)} requests")
asyncio.run(main())### Pattern 10: Async Locks and Synchronization
import asyncio
class AsyncCounter:
"""Thread-safe async counter."""
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
"""Safely increment counter."""
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simulate work
self.value = current + 1
async def get_value(self) -> int:
"""Get current value."""
async with self.lock:
return self.value
async def worker(counter: AsyncCounter, worker_id: int):
"""Worker that increments counter."""
for _ in range(10):
await counter.increment()
print(f"Worker {worker_id} incremented")
async def test_counter():
"""Test concurrent counter."""
counter = AsyncCounter()
workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
await asyncio.gather(*workers)
final_value = await counter.get_value()
print(f"Final counter value: {final_value}")
asyncio.run(test_counter())## Real-World Applications
### Web Scraping with aiohttp
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""Fetch single URL."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
text = await response.text()
return {
"url": url,
"status": response.status,
"length": len(text)
}
except Exception as e:
return {"url": url, "error": str(e)}
async def scrape_urls(urls: List[str]) -> List[Dict]:
"""Scrape multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
]
results = await scrape_urls(urls)
for result in results:
print(result)
asyncio.run(main())### Async Database Operations
import asyncio
from typing import List, Optional
# Simulated async database client
class AsyncDB:
"""Simulated async database."""
async def execute(self, query: str) -> List[dict]:
"""Execute query."""
await asyncio.sleep(0.1)
return [{"id": 1, "name": "Example"}]
async def fetch_one(self, query: str) -> Optional[dict]:
"""Fetch single row."""
await asyncio.sleep(0.1)
return {"id": 1, "name": "Example"}
async def get_user_data(db: AsyncDB, user_id: int) -> dict:
"""Fetch user and related data concurrently."""
user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}")
profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)
return {
"user": user,
"orders": orders,
"profile": profile
}
async def main():
db = AsyncDB()
user_data = await get_user_data(db, 1)
print(user_data)
asyncio.run(main())### WebSocket Server
import asyncio
from typing import Set
# Simulated WebSocket connection
class WebSocket:
"""Simulated WebSocket."""
def __init__(self, client_id: str):
self.client_id = client_id
async def send(self, message: str):
"""Send message."""
print(f"Sending to {self.client_id}: {message}")
await asyncio.sleep(0.01)
async def recv(self) -> str:
"""Receive message."""
await asyncio.sleep(1)
return f"Message from {self.client_id}"
class WebSocketServer:
"""Simple WebSocket server."""
def __init__(self):
self.clients: Set[WebSocket] = set()
async def register(self, websocket: WebSocket):
"""Register new client."""
self.clients.add(websocket)
print(f"Client {websocket.client_id} connected")
async def unregister(self, websocket: WebSocket):
"""Unregister client."""
self.clients.remove(websocket)
print(f"Client {websocket.client_id} disconnected")
async def broadcast(self, message: str):
"""Broadcast message to all clients."""
if self.clients:
tasks = [client.send(message) for client in self.clients]
await asyncio.gather(*tasks)
async def handle_client(self, websocket: WebSocket):
"""Handle individual client connection."""
await self.register(websocket)
try:
async for message in self.message_iterator(websocket):
await self.broadcast(f"{websocket.client_id}: {message}")
finally:
await self.unregister(websocket)
async def message_iterator(self, websocket: WebSocket):
"""Iterate over messages from client."""
for _ in range(3): # Simulate 3 messages
yield await websocket.recv()## Performance Best Practices
### 1. Use Connection Pools
import asyncio
import aiohttp
async def with_connection_pool():
"""Use connection pool for efficiency."""
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
responses = await asyncio.gather(*tasks)
return responses### 2. Batch Operations
async def batch_process(items: List[str], batch_size: int = 10):
"""Process items in batches."""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
tasks = [process_item(item) for item in batch]
await asyncio.gather(*tasks)
print(f"Processed batch {i // batch_size + 1}")
async def process_item(item: str):
"""Process single item."""
await asyncio.sleep(0.1)
return f"Processed: {item}"### 3. Avoid Blocking Operations
import asyncio
import concurrent.futures
from typing import Any
def blocking_operation(data: Any) -> Any:
"""CPU-intensive blocking operation."""
import time
time.sleep(1)
return data * 2
async def run_in_executor(data: Any) -> Any:
"""Run blocking operation in thread pool."""
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_operation, data)
return result
async def main():
results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
print(results)
asyncio.run(main())## Common Pitfalls
### 1. Forgetting await
# Wrong - returns coroutine object, doesn't execute
result = async_function()
# Correct
result = await async_function()### 2. Blocking the Event Loop
# Wrong - blocks event loop
import time
async def bad():
time.sleep(1) # Blocks!
# Correct
async def good():
await asyncio.sleep(1) # Non-blocking### 3. Not Handling Cancellation
async def cancelable_task():
"""Task that handles cancellation."""
try:
while True:
await asyncio.sleep(1)
print("Working...")
except asyncio.CancelledError:
print("Task cancelled, cleaning up...")
# Perform cleanup
raise # Re-raise to propagate cancellation### 4. Mixing Sync and Async Code
# Wrong - can't call async from sync directly
def sync_function():
result = await async_function() # SyntaxError!
# Correct
def sync_function():
result = asyncio.run(async_function())## Testing Async Code
import asyncio
import pytest
# Using pytest-asyncio
@pytest.mark.asyncio
async def test_async_function():
"""Test async function."""
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
"""Test with timeout."""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)## Resources
- **Python asyncio documentation**: https://docs.python.org/3/library/asyncio.html
- **aiohttp**: Async HTTP client/server
- **FastAPI**: Modern async web framework
- **asyncpg**: Async PostgreSQL driver
- **motor**: Async MongoDB driver
## Best Practices Summary
1. **Use asyncio.run()** for entry point (Python 3.7+)
2. **Always await coroutines** to execute them
3. **Use gather() for concurrent execution** of multiple tasks
4. **Implement proper error handling** with try/except
5. **Use timeouts** to prevent hanging operations
6. **Pool connections** for better performance
7. **Avoid blocking operations** in async code
8. **Use semaphores** for rate limiting
9. **Handle task cancellation** properly
10. **Test async code** with pytest-asyncio
How to Use This Skill Unit
Option A: Project-Specific (Recommended)
- Click "Download" above
- In your project, create the directory:
.agent/skills/async-python-patterns/ - Save the file as
SKILL.md - The agent will automatically discover the skill based on its description.
Option B: Global Installation (All Agents)
Save the file to these locations to make it available across all projects:
- Claude Code:
~/.claude/skills/wshobson/agents/async-python-patterns/SKILL.md - Cursor:
~/.cursor/skills/wshobson/agents/async-python-patterns/SKILL.md - Antigravity:
~/.gemini/antigravity/skills/wshobson/agents/async-python-patterns/SKILL.md
🚀 Install with CLI:npx skills add wshobson/agents