Skip to content

FastAPI Async/Await Patterns - Complete Guide

Async programming in FastAPI can significantly improve performance but is often misunderstood. This guide covers async/await patterns, common mistakes, and best practices for building high-performance FastAPI applications.

Understanding Async in FastAPI

When to Use Async vs Sync

# ✅ Use async for I/O-bound operations
@app.get("/async-endpoint")
async def async_endpoint():
    # Database queries
    result = await database.fetch_one("SELECT * FROM users")

    # HTTP requests
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")

    # File operations
    async with aiofiles.open("file.txt") as f:
        content = await f.read()

    return {"result": result}

# ✅ Use sync for CPU-bound operations
@app.get("/sync-endpoint")
def sync_endpoint():
    # Heavy computation
    result = complex_calculation()

    # Image processing
    processed_image = process_image(image_data)

    return {"result": result}

Common Async Mistakes

1. Mixing Sync and Async Incorrectly

# ❌ Wrong: Calling sync function in async context without await
@app.get("/wrong-sync-call")
async def wrong_sync_call():
    result = time.sleep(2)  # Blocks the event loop!
    return {"result": result}

# ✅ Correct: Use asyncio.sleep for delays
@app.get("/correct-async-delay")
async def correct_async_delay():
    await asyncio.sleep(2)  # Non-blocking
    return {"result": "completed"}

# ❌ Wrong: Not awaiting async functions
@app.get("/forgot-await")
async def forgot_await():
    result = async_database_call()  # Returns coroutine, not result!
    return {"result": result}

# ✅ Correct: Always await async functions
@app.get("/proper-await")
async def proper_await():
    result = await async_database_call()
    return {"result": result}

2. Inefficient Sequential Operations

# ❌ Wrong: Sequential async calls (slow)
@app.get("/slow-sequential")
async def slow_sequential():
    user = await get_user(1)      # Takes 100ms
    posts = await get_posts(1)    # Takes 200ms
    comments = await get_comments(1)  # Takes 150ms
    # Total: 450ms

    return {"user": user, "posts": posts, "comments": comments}

# ✅ Correct: Concurrent async calls (fast)
@app.get("/fast-concurrent")
async def fast_concurrent():
    # All operations run concurrently
    results = await asyncio.gather(
        get_user(1),      # 100ms
        get_posts(1),     # 200ms
        get_comments(1)   # 150ms
    )
    # Total: ~200ms (longest operation)

    user, posts, comments = results
    return {"user": user, "posts": posts, "comments": comments}

Database Async Patterns

1. SQLAlchemy Async with FastAPI

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
from fastapi import Depends

# Database setup
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# Dependency
async def get_async_session() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

# ✅ Proper async database operations
@app.get("/users/")
async def get_users(session: AsyncSession = Depends(get_async_session)):
    # Async query
    result = await session.execute(select(User))
    users = result.scalars().all()
    return users

@app.post("/users/")
async def create_user(
    user_data: UserCreate, 
    session: AsyncSession = Depends(get_async_session)
):
    # Create new user
    user = User(**user_data.dict())
    session.add(user)

    # Commit changes
    await session.commit()
    await session.refresh(user)

    return user

HTTP Client Async Patterns

1. Reusable HTTP Client

import httpx
from typing import Optional, Dict, Any

class AsyncHTTPClient:
    def __init__(self):
        self.client: Optional[httpx.AsyncClient] = None

    async def start(self):
        self.client = httpx.AsyncClient(
            timeout=30.0,
            limits=httpx.Limits(max_keepalive_connections=10, max_connections=20)
        )

    async def stop(self):
        if self.client:
            await self.client.aclose()

    async def get(self, url: str, **kwargs) -> Dict[Any, Any]:
        if not self.client:
            raise RuntimeError("HTTP client not started")

        response = await self.client.get(url, **kwargs)
        response.raise_for_status()
        return response.json()

http_client = AsyncHTTPClient()

@app.on_event("startup")
async def startup():
    await http_client.start()

@app.on_event("shutdown")
async def shutdown():
    await http_client.stop()

# Usage
@app.get("/external-api-data/")
async def get_external_data():
    data = await http_client.get("https://api.example.com/data")
    return data

2. Multiple HTTP Requests with Error Handling

from typing import List, Optional
import logging

logger = logging.getLogger(__name__)

async def fetch_url_safe(
    session: httpx.AsyncClient, 
    url: str
) -> Optional[Dict[str, Any]]:
    """Safely fetch URL with error handling"""
    try:
        response = await session.get(url, timeout=10.0)
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"HTTP error for {url}: {e.response.status_code}")
        return None
    except httpx.RequestError as e:
        logger.error(f"Request error for {url}: {e}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error for {url}: {e}")
        return None

@app.get("/fetch-multiple-apis/")
async def fetch_multiple_apis():
    urls = [
        "https://api1.example.com/data",
        "https://api2.example.com/info",
        "https://api3.example.com/stats"
    ]

    async with httpx.AsyncClient() as client:
        # Fetch all URLs concurrently
        tasks = [fetch_url_safe(client, url) for url in urls]
        results = await asyncio.gather(*tasks)

    # Filter out None results (failed requests)
    successful_results = [result for result in results if result is not None]

    return {
        "results": successful_results,
        "success_count": len(successful_results),
        "total_requests": len(urls)
    }

Background Tasks and Queues

1. Async Background Tasks

from fastapi import BackgroundTasks
import asyncio

async def send_email_async(email: str, message: str):
    """Simulate async email sending"""
    await asyncio.sleep(2)  # Simulate email sending delay
    print(f"Email sent to {email}: {message}")

async def process_data_async(data: Dict[str, Any]):
    """Simulate async data processing"""
    await asyncio.sleep(5)  # Simulate processing time
    print(f"Data processed: {len(data)} items")

@app.post("/trigger-background-tasks/")
async def trigger_background_tasks(
    email: str,
    data: Dict[str, Any],
    background_tasks: BackgroundTasks
):
    # Add async background tasks
    background_tasks.add_task(send_email_async, email, "Processing started")
    background_tasks.add_task(process_data_async, data)

    return {"message": "Tasks queued for background processing"}

Error Handling in Async Code

1. Graceful Error Handling

import asyncio
from typing import Optional

async def safe_async_operation(
    operation_id: str,
    timeout: float = 10.0
) -> Optional[Dict[str, Any]]:
    """Safely execute async operation with timeout and error handling"""
    try:
        # Simulate async operation that might fail or timeout
        result = await asyncio.wait_for(
            risky_async_operation(operation_id),
            timeout=timeout
        )
        return result

    except asyncio.TimeoutError:
        logger.error(f"Operation {operation_id} timed out after {timeout}s")
        return None

    except ValueError as e:
        logger.error(f"Invalid data in operation {operation_id}: {e}")
        return None

    except Exception as e:
        logger.error(f"Unexpected error in operation {operation_id}: {e}")
        return None

@app.get("/robust-async-endpoint/")
async def robust_async_endpoint():
    operations = ["op1", "op2", "op3", "op4", "op5"]

    # Execute all operations with individual error handling
    tasks = [safe_async_operation(op_id) for op_id in operations]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Separate successful results from errors
    successful_results = []
    errors = []

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            errors.append(f"Operation {operations[i]} failed: {result}")
        elif result is not None:
            successful_results.append(result)

    return {
        "successful_operations": len(successful_results),
        "failed_operations": len(errors),
        "results": successful_results,
        "errors": errors
    }

Testing Async Code

1. Testing Async Endpoints

import pytest
import asyncio
from httpx import AsyncClient
from fastapi.testclient import TestClient

# Sync testing with TestClient
def test_sync_endpoint():
    client = TestClient(app)
    response = client.get("/users/1")
    assert response.status_code == 200

# Async testing with AsyncClient
@pytest.mark.asyncio
async def test_async_endpoint():
    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.get("/users/1")
        assert response.status_code == 200

# Testing async functions directly
@pytest.mark.asyncio
async def test_async_function():
    result = await fetch_user_data(1)
    assert result["user_id"] == 1
    assert "name" in result

Best Practices Summary

  1. Use async for I/O-bound operations - database queries, HTTP requests, file operations
  2. Keep CPU-bound operations synchronous - calculations, image processing
  3. Always await async functions - never forget the await keyword
  4. Use asyncio.gather() for concurrent operations - run multiple async operations in parallel
  5. Properly manage resources - use async context managers
  6. Handle errors gracefully - implement timeout and error handling
  7. Monitor performance - track async operation durations
  8. Use connection pools - for database and HTTP connections
  9. Test async code properly - use pytest-asyncio for testing

External Resources