Skip to content

FastAPI Performance Optimization: Ultimate Guide to High-Performance APIs

FastAPI's exceptional performance is one of its key advantages, but achieving optimal results requires understanding and implementing the right optimization strategies. This comprehensive guide covers everything from basic async patterns to advanced production optimizations, helping you build lightning-fast APIs that can handle thousands of concurrent requests.

Performance Fundamentals

Understanding FastAPI's Performance Model

FastAPI's performance advantages come from several key architectural decisions:

  • ASGI Foundation: Asynchronous Server Gateway Interface enables true concurrency
  • Pydantic Integration: Fast serialization/deserialization with C extensions
  • Starlette Core: Lightweight, high-performance web framework foundation
  • Type System: Compile-time optimizations through Python type hints

Performance Baseline

Before optimization, establish baseline metrics:

import time
import asyncio
from fastapi import FastAPI, Request, Response
from contextlib import asynccontextmanager

# Performance monitoring middleware
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("πŸš€ FastAPI starting up...")
    yield
    # Shutdown
    print("πŸ›‘ FastAPI shutting down...")

app = FastAPI(lifespan=lifespan)

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

@app.get("/health")
async def health_check():
    return {"status": "healthy", "timestamp": time.time()}

Async Optimization Strategies

1. Async All the Way Down

Problem: Mixing sync and async code creates performance bottlenecks.

Bad Example:

import requests  # Synchronous HTTP client
import time

@app.get("/slow-endpoint")
async def slow_endpoint():
    # This blocks the entire event loop!
    response1 = requests.get("https://api.example1.com/data")
    response2 = requests.get("https://api.example2.com/data")
    time.sleep(1)  # Never do this in async code!

    return {
        "data1": response1.json(),
        "data2": response2.json()
    }

Optimized Example:

import httpx
import asyncio

@app.get("/fast-endpoint")
async def fast_endpoint():
    async with httpx.AsyncClient() as client:
        # Concurrent requests
        response1, response2 = await asyncio.gather(
            client.get("https://api.example1.com/data"),
            client.get("https://api.example2.com/data")
        )

        # Async sleep if needed
        await asyncio.sleep(0.1)

    return {
        "data1": response1.json(),
        "data2": response2.json()
    }

Performance Impact: 5-10x improvement in concurrent request handling.

2. Efficient Async Patterns

Concurrent Processing with asyncio.gather():

from typing import List
import asyncio

async def fetch_user_data(user_id: int) -> dict:
    """Simulate async database fetch"""
    await asyncio.sleep(0.1)  # Simulate DB latency
    return {"id": user_id, "name": f"User {user_id}"}

@app.get("/users/batch")
async def get_users_batch(user_ids: List[int]):
    # Process all users concurrently
    users = await asyncio.gather(
        *[fetch_user_data(uid) for uid in user_ids]
    )
    return {"users": users}

# Alternative: Using asyncio.as_completed for streaming results
@app.get("/users/stream")
async def get_users_stream(user_ids: List[int]):
    results = []
    tasks = [fetch_user_data(uid) for uid in user_ids]

    for coro in asyncio.as_completed(tasks):
        user = await coro
        results.append(user)

    return {"users": results}

Semaphore for Rate Limiting:

# Limit concurrent external API calls
API_SEMAPHORE = asyncio.Semaphore(10)

async def rate_limited_api_call(endpoint: str):
    async with API_SEMAPHORE:
        async with httpx.AsyncClient() as client:
            response = await client.get(endpoint)
            return response.json()

@app.get("/external-data/{endpoint_id}")
async def get_external_data(endpoint_id: str):
    endpoint = f"https://api.example.com/data/{endpoint_id}"
    data = await rate_limited_api_call(endpoint)
    return data

Database Performance Optimization

1. Async Database Drivers

PostgreSQL with asyncpg:

import asyncpg
from typing import Optional
import os

# Connection pool for optimal performance
class DatabaseManager:
    def __init__(self):
        self.pool: Optional[asyncpg.Pool] = None

    async def create_pool(self):
        self.pool = await asyncpg.create_pool(
            host=os.getenv("DB_HOST", "localhost"),
            database=os.getenv("DB_NAME", "myapp"),
            user=os.getenv("DB_USER", "postgres"),
            password=os.getenv("DB_PASSWORD"),
            min_size=10,  # Minimum connections
            max_size=20,  # Maximum connections
            command_timeout=60,
            server_settings={
                'jit': 'off'  # Disable JIT for faster small queries
            }
        )

    async def close_pool(self):
        if self.pool:
            await self.pool.close()

    async def fetch_user(self, user_id: int):
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(
                "SELECT id, name, email FROM users WHERE id = $1",
                user_id
            )

    async def fetch_users_batch(self, user_ids: List[int]):
        async with self.pool.acquire() as conn:
            return await conn.fetch(
                "SELECT id, name, email FROM users WHERE id = ANY($1)",
                user_ids
            )

# Global database manager
db = DatabaseManager()

@app.on_event("startup")
async def startup():
    await db.create_pool()

@app.on_event("shutdown")
async def shutdown():
    await db.close_pool()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await db.fetch_user(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return dict(user)

2. Query Optimization

Avoid N+1 Queries:

# Bad: N+1 query problem
async def get_posts_with_authors_slow():
    posts = await db.fetch("SELECT id, title, author_id FROM posts LIMIT 10")
    result = []

    for post in posts:
        # This creates N additional queries!
        author = await db.fetchrow(
            "SELECT name FROM users WHERE id = $1", 
            post['author_id']
        )
        result.append({
            "id": post['id'],
            "title": post['title'],
            "author": author['name']
        })

    return result

# Good: Single optimized query
async def get_posts_with_authors_fast():
    posts = await db.fetch("""
        SELECT p.id, p.title, u.name as author_name
        FROM posts p
        JOIN users u ON p.author_id = u.id
        LIMIT 10
    """)

    return [
        {
            "id": post['id'],
            "title": post['title'], 
            "author": post['author_name']
        }
        for post in posts
    ]

Prepared Statements for High-Frequency Queries:

class OptimizedQueries:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
        self._prepared_statements = {}

    async def prepare_statements(self):
        async with self.pool.acquire() as conn:
            self._prepared_statements['get_user'] = await conn.prepare(
                "SELECT id, name, email FROM users WHERE id = $1"
            )
            self._prepared_statements['get_user_posts'] = await conn.prepare("""
                SELECT p.id, p.title, p.created_at 
                FROM posts p 
                WHERE p.author_id = $1 
                ORDER BY p.created_at DESC 
                LIMIT $2
            """)

    async def get_user_with_posts(self, user_id: int, limit: int = 10):
        async with self.pool.acquire() as conn:
            # Use prepared statements for better performance
            user = await self._prepared_statements['get_user'].fetchrow(user_id)
            posts = await self._prepared_statements['get_user_posts'].fetch(user_id, limit)

            return {
                "user": dict(user),
                "posts": [dict(post) for post in posts]
            }

3. Database Connection Optimization

Connection Pool Configuration:

# Production-optimized pool settings
async def create_optimized_pool():
    return await asyncpg.create_pool(
        dsn=DATABASE_URL,
        min_size=5,          # Minimum connections (adjust based on load)
        max_size=25,         # Maximum connections (don't exceed DB limits)
        max_queries=50000,   # Rotate connections after N queries
        max_inactive_connection_lifetime=3600,  # 1 hour
        command_timeout=30,  # Query timeout
        server_settings={
            'application_name': 'fastapi_app',
            'jit': 'off',           # Disable JIT for OLTP workloads
            'shared_preload_libraries': 'pg_stat_statements'
        }
    )

Caching Strategies

1. Response Caching with Redis

import redis.asyncio as redis
import json
import hashlib
from functools import wraps

# Redis connection
redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)

def cache_response(expiration: int = 300):
    """Decorator for caching API responses"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Create cache key from function name and arguments
            cache_key = f"cache:{func.__name__}:{hashlib.md5(str(kwargs).encode()).hexdigest()}"

            # Check cache first
            cached_result = await redis_client.get(cache_key)
            if cached_result:
                return json.loads(cached_result)

            # Execute function and cache result
            result = await func(*args, **kwargs)
            await redis_client.setex(
                cache_key, 
                expiration, 
                json.dumps(result, default=str)
            )

            return result
        return wrapper
    return decorator

@app.get("/expensive-computation/{param}")
@cache_response(expiration=600)  # Cache for 10 minutes
async def expensive_computation(param: str):
    # Simulate expensive operation
    await asyncio.sleep(2)
    return {"result": f"Computed result for {param}", "timestamp": time.time()}

2. In-Memory Caching

from functools import lru_cache
import asyncio
from typing import Dict, Any

class AsyncLRUCache:
    def __init__(self, maxsize: int = 128):
        self.cache: Dict[str, Any] = {}
        self.maxsize = maxsize

    async def get(self, key: str):
        return self.cache.get(key)

    async def set(self, key: str, value: Any):
        if len(self.cache) >= self.maxsize:
            # Remove oldest item (simple LRU implementation)
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]

        self.cache[key] = value

# Global cache instance
app_cache = AsyncLRUCache(maxsize=1000)

@app.get("/config")
async def get_app_config():
    """Cache application configuration"""
    config = await app_cache.get("app_config")

    if not config:
        # Load from database or external service
        config = await load_config_from_db()
        await app_cache.set("app_config", config)

    return config

# For CPU-intensive computations
@lru_cache(maxsize=256)
def cpu_intensive_calculation(param: str) -> dict:
    """Pure function that can be cached with lru_cache"""
    # Expensive CPU operation
    result = sum(i * ord(c) for i, c in enumerate(param * 1000))
    return {"result": result, "param": param}

@app.get("/calculate/{param}")
async def calculate(param: str):
    # This runs in a thread pool to avoid blocking
    result = await asyncio.get_event_loop().run_in_executor(
        None, 
        cpu_intensive_calculation, 
        param
    )
    return result

3. Cache Invalidation Strategies

from typing import Set

class CacheManager:
    def __init__(self, redis_client):
        self.redis = redis_client

    async def invalidate_user_cache(self, user_id: int):
        """Invalidate all cache entries related to a user"""
        pattern = f"user:{user_id}:*"
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

    async def invalidate_pattern(self, pattern: str):
        """Invalidate cache entries matching a pattern"""
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

    async def tag_cache_set(self, key: str, value: str, tags: Set[str], expiration: int = 300):
        """Set cache with tags for easy invalidation"""
        # Set the main cache entry
        await self.redis.setex(key, expiration, value)

        # Associate with tags
        for tag in tags:
            tag_key = f"tag:{tag}"
            await self.redis.sadd(tag_key, key)
            await self.redis.expire(tag_key, expiration + 60)  # Tags live slightly longer

    async def invalidate_by_tags(self, tags: Set[str]):
        """Invalidate all cache entries with specific tags"""
        keys_to_delete = set()

        for tag in tags:
            tag_key = f"tag:{tag}"
            tagged_keys = await self.redis.smembers(tag_key)
            keys_to_delete.update(tagged_keys)

        if keys_to_delete:
            await self.redis.delete(*keys_to_delete)

        # Clean up tag keys
        tag_keys = [f"tag:{tag}" for tag in tags]
        await self.redis.delete(*tag_keys)

cache_manager = CacheManager(redis_client)

@app.put("/users/{user_id}")
async def update_user(user_id: int, user_data: dict):
    # Update user in database
    await db.update_user(user_id, user_data)

    # Invalidate related cache entries
    await cache_manager.invalidate_by_tags({f"user:{user_id}", "users_list"})

    return {"message": "User updated successfully"}

Request/Response Optimization

1. Response Compression

from fastapi.middleware.gzip import GZipMiddleware

# Add GZip compression middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)

# For custom compression handling
import gzip
from fastapi import Response

@app.get("/large-dataset")
async def get_large_dataset(compress: bool = True):
    # Generate large response
    data = {"items": [{"id": i, "value": f"item_{i}"} for i in range(10000)]}

    if compress:
        # Manual compression for specific endpoints
        json_data = json.dumps(data)
        compressed_data = gzip.compress(json_data.encode())

        return Response(
            content=compressed_data,
            media_type="application/json",
            headers={"Content-Encoding": "gzip"}
        )

    return data

2. Streaming Responses

from fastapi.responses import StreamingResponse
import csv
import io

@app.get("/export/users")
async def export_users():
    """Stream large CSV export without loading all data into memory"""

    async def generate_csv():
        # Create CSV header
        output = io.StringIO()
        writer = csv.writer(output)
        writer.writerow(["id", "name", "email", "created_at"])
        yield output.getvalue()
        output.seek(0)
        output.truncate(0)

        # Stream data in batches
        offset = 0
        batch_size = 1000

        while True:
            users = await db.fetch_users_batch(offset, batch_size)
            if not users:
                break

            for user in users:
                writer.writerow([user['id'], user['name'], user['email'], user['created_at']])
                yield output.getvalue()
                output.seek(0)
                output.truncate(0)

            offset += batch_size

    return StreamingResponse(
        generate_csv(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=users.csv"}
    )

3. Background Tasks Optimization

from fastapi import BackgroundTasks
import asyncio
from concurrent.futures import ThreadPoolExecutor

# Thread pool for CPU-intensive tasks
cpu_executor = ThreadPoolExecutor(max_workers=4)

def cpu_intensive_task(data: dict):
    """CPU-intensive task that should run in thread pool"""
    # Simulate heavy computation
    result = sum(i ** 2 for i in range(100000))
    # Log or process result
    print(f"Processed {data} with result {result}")

async def io_intensive_task(user_id: int):
    """I/O task that can run async"""
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://api.analytics.com/event",
            json={"user_id": user_id, "action": "profile_updated"}
        )

@app.post("/users/{user_id}/update")
async def update_user_profile(
    user_id: int, 
    profile_data: dict, 
    background_tasks: BackgroundTasks
):
    # Update user in database (main request)
    await db.update_user_profile(user_id, profile_data)

    # Add background tasks
    background_tasks.add_task(io_intensive_task, user_id)

    # For CPU-intensive tasks, use thread pool
    loop = asyncio.get_event_loop()
    loop.run_in_executor(cpu_executor, cpu_intensive_task, profile_data)

    return {"message": "Profile updated successfully"}

Production Server Optimization

1. Uvicorn Configuration

# uvicorn_config.py
import multiprocessing

# Production Uvicorn configuration
def get_uvicorn_config():
    return {
        "host": "0.0.0.0",
        "port": 8000,
        "workers": multiprocessing.cpu_count(),
        "worker_class": "uvicorn.workers.UvicornWorker",
        "worker_connections": 1000,
        "max_requests": 1000,
        "max_requests_jitter": 100,
        "keepalive": 5,
        "loop": "uvloop",  # Use uvloop for better performance
        "http": "httptools",  # Use httptools for HTTP parsing
        "log_level": "info",
        "access_log": False,  # Disable for better performance in production
    }

# Alternative: Gunicorn with Uvicorn workers
# gunicorn_conf.py
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count()
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
keepalive = 5
preload_app = True

2. Application Startup Optimization

from contextlib import asynccontextmanager

# Optimize startup with proper lifespan management
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup optimizations
    print("πŸš€ Starting FastAPI application...")

    # Warm up connection pools
    await db.create_pool()
    await redis_client.ping()

    # Precompile regular expressions if used
    import re
    app.state.email_regex = re.compile(r'^[^@]+@[^@]+\.[^@]+$')

    # Pre-load cache with frequently accessed data
    await preload_cache()

    # Warm up JIT compilation for critical paths
    await warmup_endpoints()

    print("βœ… FastAPI application started successfully")
    yield

    # Cleanup
    print("πŸ›‘ Shutting down FastAPI application...")
    await db.close_pool()
    await redis_client.close()
    print("βœ… FastAPI application shut down successfully")

async def preload_cache():
    """Pre-load frequently accessed data into cache"""
    config = await load_app_config()
    await redis_client.setex("app_config", 3600, json.dumps(config))

async def warmup_endpoints():
    """Warm up critical endpoints to trigger JIT compilation"""
    async with httpx.AsyncClient() as client:
        # Hit critical endpoints to warm up
        await client.get("http://localhost:8000/health")

app = FastAPI(lifespan=lifespan)

Monitoring and Profiling

1. Performance Metrics Collection

import time
import psutil
from prometheus_client import Counter, Histogram, Gauge, generate_latest

# Prometheus metrics
REQUEST_COUNT = Counter('fastapi_requests_total', 'Total requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('fastapi_request_duration_seconds', 'Request duration')
ACTIVE_CONNECTIONS = Gauge('fastapi_active_connections', 'Active connections')
MEMORY_USAGE = Gauge('fastapi_memory_usage_bytes', 'Memory usage')
CPU_USAGE = Gauge('fastapi_cpu_usage_percent', 'CPU usage')

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()

    # Increment request counter
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path
    ).inc()

    # Process request
    response = await call_next(request)

    # Record duration
    duration = time.time() - start_time
    REQUEST_DURATION.observe(duration)

    # Add performance headers
    response.headers["X-Process-Time"] = str(duration)

    return response

# Background task to update system metrics
async def update_system_metrics():
    while True:
        MEMORY_USAGE.set(psutil.virtual_memory().used)
        CPU_USAGE.set(psutil.cpu_percent())
        await asyncio.sleep(10)

@app.on_event("startup")
async def start_metrics_collection():
    asyncio.create_task(update_system_metrics())

@app.get("/metrics")
async def get_metrics():
    return Response(generate_latest(), media_type="text/plain")

2. Application Profiling

import cProfile
import io
import pstats
from functools import wraps

def profile_endpoint(func):
    """Decorator to profile specific endpoints"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        if os.getenv("ENABLE_PROFILING") == "true":
            profiler = cProfile.Profile()
            profiler.enable()

            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                profiler.disable()

                # Save profile results
                s = io.StringIO()
                ps = pstats.Stats(profiler, stream=s)
                ps.sort_stats('cumulative').print_stats(20)

                # Log or save profile results
                with open(f"/tmp/profile_{func.__name__}.txt", "w") as f:
                    f.write(s.getvalue())
        else:
            return await func(*args, **kwargs)

    return wrapper

@app.get("/slow-endpoint")
@profile_endpoint
async def slow_endpoint():
    # This endpoint will be profiled when ENABLE_PROFILING=true
    await expensive_operation()
    return {"message": "Operation completed"}

3. Database Query Monitoring

import time
from typing import Dict, List

class QueryMonitor:
    def __init__(self):
        self.slow_queries: List[Dict] = []
        self.query_stats: Dict[str, Dict] = {}

    async def log_query(self, query: str, duration: float, params=None):
        # Log slow queries
        if duration > 0.1:  # 100ms threshold
            self.slow_queries.append({
                "query": query[:200],  # Truncate long queries
                "duration": duration,
                "timestamp": time.time(),
                "params": str(params)[:100] if params else None
            })

            # Keep only last 100 slow queries
            if len(self.slow_queries) > 100:
                self.slow_queries.pop(0)

        # Update query statistics
        query_hash = str(hash(query))
        if query_hash not in self.query_stats:
            self.query_stats[query_hash] = {
                "count": 0,
                "total_duration": 0,
                "avg_duration": 0,
                "max_duration": 0
            }

        stats = self.query_stats[query_hash]
        stats["count"] += 1
        stats["total_duration"] += duration
        stats["avg_duration"] = stats["total_duration"] / stats["count"]
        stats["max_duration"] = max(stats["max_duration"], duration)

query_monitor = QueryMonitor()

# Monkey patch asyncpg to add monitoring
original_fetch = asyncpg.Connection.fetch
original_fetchrow = asyncpg.Connection.fetchrow

async def monitored_fetch(self, query, *args, **kwargs):
    start_time = time.time()
    try:
        result = await original_fetch(self, query, *args, **kwargs)
        return result
    finally:
        duration = time.time() - start_time
        await query_monitor.log_query(query, duration, args)

async def monitored_fetchrow(self, query, *args, **kwargs):
    start_time = time.time()
    try:
        result = await original_fetchrow(self, query, *args, **kwargs)
        return result
    finally:
        duration = time.time() - start_time
        await query_monitor.log_query(query, duration, args)

# Apply monkey patches
asyncpg.Connection.fetch = monitored_fetch
asyncpg.Connection.fetchrow = monitored_fetchrow

@app.get("/debug/slow-queries")
async def get_slow_queries():
    """Debug endpoint to view slow queries"""
    return {"slow_queries": query_monitor.slow_queries[-10:]}

Load Testing and Benchmarking

1. Load Testing Setup

# load_test.py
import asyncio
import aiohttp
import time
from statistics import mean, median

async def single_request(session, url):
    start_time = time.time()
    try:
        async with session.get(url) as response:
            await response.text()
            return time.time() - start_time, response.status
    except Exception as e:
        return time.time() - start_time, -1

async def load_test(url: str, concurrent_requests: int, total_requests: int):
    async with aiohttp.ClientSession() as session:
        results = []

        # Create semaphore to limit concurrent requests
        semaphore = asyncio.Semaphore(concurrent_requests)

        async def bounded_request():
            async with semaphore:
                return await single_request(session, url)

        # Run load test
        start_time = time.time()
        tasks = [bounded_request() for _ in range(total_requests)]
        results = await asyncio.gather(*tasks)
        total_time = time.time() - start_time

        # Calculate statistics
        durations = [r[0] for r in results]
        statuses = [r[1] for r in results]

        successful_requests = len([s for s in statuses if s == 200])

        print(f"Total time: {total_time:.2f} seconds")
        print(f"Requests per second: {total_requests / total_time:.2f}")
        print(f"Successful requests: {successful_requests}/{total_requests}")
        print(f"Average response time: {mean(durations):.3f}s")
        print(f"Median response time: {median(durations):.3f}s")
        print(f"95th percentile: {sorted(durations)[int(0.95 * len(durations))]:.3f}s")

# Run load test
if __name__ == "__main__":
    asyncio.run(load_test(
        url="http://localhost:8000/api/users",
        concurrent_requests=50,
        total_requests=1000
    ))

2. Benchmark Different Configurations

# benchmark.py
import subprocess
import time
import json

def run_benchmark(config_name: str, uvicorn_args: str):
    print(f"\nπŸ”¬ Benchmarking {config_name}")
    print("-" * 50)

    # Start server
    server_process = subprocess.Popen(
        f"uvicorn main:app {uvicorn_args}",
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )

    # Wait for server to start
    time.sleep(3)

    try:
        # Run load test
        result = subprocess.run([
            "wrk", "-t12", "-c400", "-d30s", 
            "http://localhost:8000/api/health"
        ], capture_output=True, text=True)

        print(result.stdout)

        # Parse results (simplified)
        lines = result.stdout.split('\n')
        for line in lines:
            if 'Requests/sec:' in line:
                rps = float(line.split(':')[1].strip())
                return rps

    finally:
        server_process.terminate()
        server_process.wait()

    return 0

# Benchmark different configurations
configurations = [
    ("Single Worker", "--workers 1"),
    ("Multiple Workers", "--workers 4"),
    ("With UVLoop", "--workers 4 --loop uvloop"),
    ("With HTTPTools", "--workers 4 --loop uvloop --http httptools"),
]

results = {}
for name, args in configurations:
    rps = run_benchmark(name, args)
    results[name] = rps

# Display results
print("\nπŸ“Š Benchmark Results")
print("=" * 50)
for name, rps in results.items():
    print(f"{name:20}: {rps:8.0f} req/s")

Advanced Optimization Techniques

1. Custom Serialization

import orjson
from fastapi.responses import ORJSONResponse

# Use faster JSON serialization
app = FastAPI(default_response_class=ORJSONResponse)

# Custom serialization for specific data types
class OptimizedResponse(ORJSONResponse):
    def render(self, content):
        return orjson.dumps(
            content,
            option=orjson.OPT_FAST_BYTES | orjson.OPT_SERIALIZE_NUMPY
        )

@app.get("/optimized-data", response_class=OptimizedResponse)
async def get_optimized_data():
    # Return large dataset with optimized serialization
    return {"data": list(range(10000))}

2. Connection Reuse

# Global HTTP client for external API calls
http_client = httpx.AsyncClient(
    timeout=30.0,
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30
    )
)

@app.on_event("startup")
async def startup():
    global http_client
    # Client is already initialized

@app.on_event("shutdown") 
async def shutdown():
    await http_client.aclose()

@app.get("/external-api/{resource}")
async def proxy_external_api(resource: str):
    # Reuse HTTP connection
    response = await http_client.get(f"https://api.example.com/{resource}")
    return response.json()

3. Memory Optimization

import gc
from typing import AsyncGenerator

async def process_large_dataset() -> AsyncGenerator[dict, None]:
    """Process large dataset with memory optimization"""
    batch_size = 1000
    offset = 0

    while True:
        # Process data in batches
        batch = await db.fetch_data_batch(offset, batch_size)
        if not batch:
            break

        for item in batch:
            yield {"processed": process_item(item)}

        # Force garbage collection for large datasets
        if offset % 10000 == 0:
            gc.collect()

        offset += batch_size

@app.get("/large-dataset")
async def stream_large_dataset():
    async def generate_response():
        count = 0
        async for item in process_large_dataset():
            if count == 0:
                yield '{"items":['
            else:
                yield ','
            yield json.dumps(item)
            count += 1
        yield ']}'

    return StreamingResponse(
        generate_response(),
        media_type="application/json"
    )

Performance Checklist

Development Phase

  • [ ] Use async/await throughout the application
  • [ ] Implement proper error handling that doesn't leak resources
  • [ ] Use type hints for better performance and IDE support
  • [ ] Choose async-compatible libraries (httpx, asyncpg, etc.)
  • [ ] Implement connection pooling for databases and external APIs
  • [ ] Add request/response compression for large payloads
  • [ ] Use background tasks for non-critical operations

Testing Phase

  • [ ] Implement comprehensive load testing
  • [ ] Profile critical endpoints under load
  • [ ] Monitor memory usage and detect leaks
  • [ ] Test with realistic data volumes
  • [ ] Validate caching effectiveness
  • [ ] Benchmark different server configurations

Production Phase

  • [ ] Configure optimal worker counts based on CPU cores
  • [ ] Set up proper monitoring and alerting
  • [ ] Implement proper logging without performance impact
  • [ ] Use CDN for static assets
  • [ ] Configure database connection limits
  • [ ] Set up health checks and graceful shutdowns
  • [ ] Monitor and optimize slow queries

Infrastructure Phase

  • [ ] Use SSD storage for databases
  • [ ] Configure appropriate instance types
  • [ ] Set up load balancing
  • [ ] Implement auto-scaling policies
  • [ ] Use connection pooling at the infrastructure level
  • [ ] Configure proper timeout settings
  • [ ] Implement circuit breakers for external dependencies

Common Performance Pitfalls

1. Blocking the Event Loop

# ❌ BAD: Blocking operations
import time
import requests

@app.get("/bad-endpoint")
async def bad_endpoint():
    time.sleep(1)  # Blocks the entire event loop!
    response = requests.get("https://api.example.com")  # Synchronous HTTP call
    return response.json()

# βœ… GOOD: Non-blocking operations
import asyncio
import httpx

@app.get("/good-endpoint")
async def good_endpoint():
    await asyncio.sleep(1)  # Non-blocking sleep
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
    return response.json()

2. Database Connection Leaks

# ❌ BAD: Not properly closing connections
@app.get("/bad-db-usage")
async def bad_db_usage():
    conn = await asyncpg.connect(DATABASE_URL)
    result = await conn.fetch("SELECT * FROM users")
    # Connection never closed!
    return result

# βœ… GOOD: Proper connection management
@app.get("/good-db-usage")
async def good_db_usage():
    async with pool.acquire() as conn:
        result = await conn.fetch("SELECT * FROM users")
    # Connection automatically returned to pool
    return result

3. Memory Leaks in Long-Running Tasks

# ❌ BAD: Accumulating data in memory
large_cache = {}

@app.post("/bad-caching")
async def bad_caching(data: dict):
    key = data.get("key")
    large_cache[key] = data  # Never cleaned up!
    return {"cached": True}

# βœ… GOOD: Bounded cache with expiration
from cachetools import TTLCache

bounded_cache = TTLCache(maxsize=1000, ttl=300)

@app.post("/good-caching")
async def good_caching(data: dict):
    key = data.get("key")
    bounded_cache[key] = data  # Automatically expires
    return {"cached": True}

Conclusion

FastAPI performance optimization is a multi-layered approach that involves:

  1. Async Best Practices: Using async/await properly throughout your application
  2. Database Optimization: Connection pooling, query optimization, and proper async drivers
  3. Caching Strategies: Multi-level caching with proper invalidation
  4. Server Configuration: Optimal worker settings and production configurations
  5. Monitoring: Comprehensive metrics and profiling to identify bottlenecks

By implementing these optimizations systematically, you can achieve:

  • 3-5x performance improvements over basic implementations
  • Sub-100ms response times for most endpoints
  • Thousands of concurrent connections with minimal resource usage
  • Horizontal scaling capabilities for high-traffic applications

Remember that premature optimization is the root of all evilβ€”always measure first, then optimize based on real bottlenecks identified through profiling and monitoring.


Performance metrics and benchmarks are based on typical hardware configurations and may vary depending on your specific setup and requirements.