FastAPI Async/Await Patterns - Complete Guide
Async programming in FastAPI can significantly improve performance but is often misunderstood. This guide covers async/await patterns, common mistakes, and best practices for building high-performance FastAPI applications.
Understanding Async in FastAPI
When to Use Async vs Sync
# ✅ Use async for I/O-bound operations
@app.get("/async-endpoint")
async def async_endpoint():
# Database queries
result = await database.fetch_one("SELECT * FROM users")
# HTTP requests
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
# File operations
async with aiofiles.open("file.txt") as f:
content = await f.read()
return {"result": result}
# ✅ Use sync for CPU-bound operations
@app.get("/sync-endpoint")
def sync_endpoint():
# Heavy computation
result = complex_calculation()
# Image processing
processed_image = process_image(image_data)
return {"result": result}
Common Async Mistakes
1. Mixing Sync and Async Incorrectly
# ❌ Wrong: Calling sync function in async context without await
@app.get("/wrong-sync-call")
async def wrong_sync_call():
result = time.sleep(2) # Blocks the event loop!
return {"result": result}
# ✅ Correct: Use asyncio.sleep for delays
@app.get("/correct-async-delay")
async def correct_async_delay():
await asyncio.sleep(2) # Non-blocking
return {"result": "completed"}
# ❌ Wrong: Not awaiting async functions
@app.get("/forgot-await")
async def forgot_await():
result = async_database_call() # Returns coroutine, not result!
return {"result": result}
# ✅ Correct: Always await async functions
@app.get("/proper-await")
async def proper_await():
result = await async_database_call()
return {"result": result}
2. Inefficient Sequential Operations
# ❌ Wrong: Sequential async calls (slow)
@app.get("/slow-sequential")
async def slow_sequential():
user = await get_user(1) # Takes 100ms
posts = await get_posts(1) # Takes 200ms
comments = await get_comments(1) # Takes 150ms
# Total: 450ms
return {"user": user, "posts": posts, "comments": comments}
# ✅ Correct: Concurrent async calls (fast)
@app.get("/fast-concurrent")
async def fast_concurrent():
# All operations run concurrently
results = await asyncio.gather(
get_user(1), # 100ms
get_posts(1), # 200ms
get_comments(1) # 150ms
)
# Total: ~200ms (longest operation)
user, posts, comments = results
return {"user": user, "posts": posts, "comments": comments}
Database Async Patterns
1. SQLAlchemy Async with FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
from fastapi import Depends
# Database setup
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# Dependency
async def get_async_session() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session
# ✅ Proper async database operations
@app.get("/users/")
async def get_users(session: AsyncSession = Depends(get_async_session)):
# Async query
result = await session.execute(select(User))
users = result.scalars().all()
return users
@app.post("/users/")
async def create_user(
user_data: UserCreate,
session: AsyncSession = Depends(get_async_session)
):
# Create new user
user = User(**user_data.dict())
session.add(user)
# Commit changes
await session.commit()
await session.refresh(user)
return user
HTTP Client Async Patterns
1. Reusable HTTP Client
import httpx
from typing import Optional, Dict, Any
class AsyncHTTPClient:
def __init__(self):
self.client: Optional[httpx.AsyncClient] = None
async def start(self):
self.client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20)
)
async def stop(self):
if self.client:
await self.client.aclose()
async def get(self, url: str, **kwargs) -> Dict[Any, Any]:
if not self.client:
raise RuntimeError("HTTP client not started")
response = await self.client.get(url, **kwargs)
response.raise_for_status()
return response.json()
http_client = AsyncHTTPClient()
@app.on_event("startup")
async def startup():
await http_client.start()
@app.on_event("shutdown")
async def shutdown():
await http_client.stop()
# Usage
@app.get("/external-api-data/")
async def get_external_data():
data = await http_client.get("https://api.example.com/data")
return data
2. Multiple HTTP Requests with Error Handling
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
async def fetch_url_safe(
session: httpx.AsyncClient,
url: str
) -> Optional[Dict[str, Any]]:
"""Safely fetch URL with error handling"""
try:
response = await session.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error for {url}: {e.response.status_code}")
return None
except httpx.RequestError as e:
logger.error(f"Request error for {url}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error for {url}: {e}")
return None
@app.get("/fetch-multiple-apis/")
async def fetch_multiple_apis():
urls = [
"https://api1.example.com/data",
"https://api2.example.com/info",
"https://api3.example.com/stats"
]
async with httpx.AsyncClient() as client:
# Fetch all URLs concurrently
tasks = [fetch_url_safe(client, url) for url in urls]
results = await asyncio.gather(*tasks)
# Filter out None results (failed requests)
successful_results = [result for result in results if result is not None]
return {
"results": successful_results,
"success_count": len(successful_results),
"total_requests": len(urls)
}
Background Tasks and Queues
1. Async Background Tasks
from fastapi import BackgroundTasks
import asyncio
async def send_email_async(email: str, message: str):
"""Simulate async email sending"""
await asyncio.sleep(2) # Simulate email sending delay
print(f"Email sent to {email}: {message}")
async def process_data_async(data: Dict[str, Any]):
"""Simulate async data processing"""
await asyncio.sleep(5) # Simulate processing time
print(f"Data processed: {len(data)} items")
@app.post("/trigger-background-tasks/")
async def trigger_background_tasks(
email: str,
data: Dict[str, Any],
background_tasks: BackgroundTasks
):
# Add async background tasks
background_tasks.add_task(send_email_async, email, "Processing started")
background_tasks.add_task(process_data_async, data)
return {"message": "Tasks queued for background processing"}
Error Handling in Async Code
1. Graceful Error Handling
import asyncio
from typing import Optional
async def safe_async_operation(
operation_id: str,
timeout: float = 10.0
) -> Optional[Dict[str, Any]]:
"""Safely execute async operation with timeout and error handling"""
try:
# Simulate async operation that might fail or timeout
result = await asyncio.wait_for(
risky_async_operation(operation_id),
timeout=timeout
)
return result
except asyncio.TimeoutError:
logger.error(f"Operation {operation_id} timed out after {timeout}s")
return None
except ValueError as e:
logger.error(f"Invalid data in operation {operation_id}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error in operation {operation_id}: {e}")
return None
@app.get("/robust-async-endpoint/")
async def robust_async_endpoint():
operations = ["op1", "op2", "op3", "op4", "op5"]
# Execute all operations with individual error handling
tasks = [safe_async_operation(op_id) for op_id in operations]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Separate successful results from errors
successful_results = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append(f"Operation {operations[i]} failed: {result}")
elif result is not None:
successful_results.append(result)
return {
"successful_operations": len(successful_results),
"failed_operations": len(errors),
"results": successful_results,
"errors": errors
}
Testing Async Code
1. Testing Async Endpoints
import pytest
import asyncio
from httpx import AsyncClient
from fastapi.testclient import TestClient
# Sync testing with TestClient
def test_sync_endpoint():
client = TestClient(app)
response = client.get("/users/1")
assert response.status_code == 200
# Async testing with AsyncClient
@pytest.mark.asyncio
async def test_async_endpoint():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/users/1")
assert response.status_code == 200
# Testing async functions directly
@pytest.mark.asyncio
async def test_async_function():
result = await fetch_user_data(1)
assert result["user_id"] == 1
assert "name" in result
Best Practices Summary
- Use async for I/O-bound operations - database queries, HTTP requests, file operations
- Keep CPU-bound operations synchronous - calculations, image processing
- Always await async functions - never forget the
await
keyword - Use asyncio.gather() for concurrent operations - run multiple async operations in parallel
- Properly manage resources - use async context managers
- Handle errors gracefully - implement timeout and error handling
- Monitor performance - track async operation durations
- Use connection pools - for database and HTTP connections
- Test async code properly - use pytest-asyncio for testing