FastAPI Performance Optimization: Ultimate Guide to High-Performance APIs
FastAPI's exceptional performance is one of its key advantages, but achieving optimal results requires understanding and implementing the right optimization strategies. This comprehensive guide covers everything from basic async patterns to advanced production optimizations, helping you build lightning-fast APIs that can handle thousands of concurrent requests.
Performance Fundamentals
Understanding FastAPI's Performance Model
FastAPI's performance advantages come from several key architectural decisions:
- ASGI Foundation: Asynchronous Server Gateway Interface enables true concurrency
- Pydantic Integration: Fast serialization/deserialization with C extensions
- Starlette Core: Lightweight, high-performance web framework foundation
- Type System: Compile-time optimizations through Python type hints
Performance Baseline
Before optimization, establish baseline metrics:
import time
import asyncio
from fastapi import FastAPI, Request, Response
from contextlib import asynccontextmanager
# Performance monitoring middleware
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("π FastAPI starting up...")
yield
# Shutdown
print("π FastAPI shutting down...")
app = FastAPI(lifespan=lifespan)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": time.time()}
Async Optimization Strategies
1. Async All the Way Down
Problem: Mixing sync and async code creates performance bottlenecks.
Bad Example:
import requests # Synchronous HTTP client
import time
@app.get("/slow-endpoint")
async def slow_endpoint():
# This blocks the entire event loop!
response1 = requests.get("https://api.example1.com/data")
response2 = requests.get("https://api.example2.com/data")
time.sleep(1) # Never do this in async code!
return {
"data1": response1.json(),
"data2": response2.json()
}
Optimized Example:
import httpx
import asyncio
@app.get("/fast-endpoint")
async def fast_endpoint():
async with httpx.AsyncClient() as client:
# Concurrent requests
response1, response2 = await asyncio.gather(
client.get("https://api.example1.com/data"),
client.get("https://api.example2.com/data")
)
# Async sleep if needed
await asyncio.sleep(0.1)
return {
"data1": response1.json(),
"data2": response2.json()
}
Performance Impact: 5-10x improvement in concurrent request handling.
2. Efficient Async Patterns
Concurrent Processing with asyncio.gather():
from typing import List
import asyncio
async def fetch_user_data(user_id: int) -> dict:
"""Simulate async database fetch"""
await asyncio.sleep(0.1) # Simulate DB latency
return {"id": user_id, "name": f"User {user_id}"}
@app.get("/users/batch")
async def get_users_batch(user_ids: List[int]):
# Process all users concurrently
users = await asyncio.gather(
*[fetch_user_data(uid) for uid in user_ids]
)
return {"users": users}
# Alternative: Using asyncio.as_completed for streaming results
@app.get("/users/stream")
async def get_users_stream(user_ids: List[int]):
results = []
tasks = [fetch_user_data(uid) for uid in user_ids]
for coro in asyncio.as_completed(tasks):
user = await coro
results.append(user)
return {"users": results}
Semaphore for Rate Limiting:
# Limit concurrent external API calls
API_SEMAPHORE = asyncio.Semaphore(10)
async def rate_limited_api_call(endpoint: str):
async with API_SEMAPHORE:
async with httpx.AsyncClient() as client:
response = await client.get(endpoint)
return response.json()
@app.get("/external-data/{endpoint_id}")
async def get_external_data(endpoint_id: str):
endpoint = f"https://api.example.com/data/{endpoint_id}"
data = await rate_limited_api_call(endpoint)
return data
Database Performance Optimization
1. Async Database Drivers
PostgreSQL with asyncpg:
import asyncpg
from typing import Optional
import os
# Connection pool for optimal performance
class DatabaseManager:
def __init__(self):
self.pool: Optional[asyncpg.Pool] = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
host=os.getenv("DB_HOST", "localhost"),
database=os.getenv("DB_NAME", "myapp"),
user=os.getenv("DB_USER", "postgres"),
password=os.getenv("DB_PASSWORD"),
min_size=10, # Minimum connections
max_size=20, # Maximum connections
command_timeout=60,
server_settings={
'jit': 'off' # Disable JIT for faster small queries
}
)
async def close_pool(self):
if self.pool:
await self.pool.close()
async def fetch_user(self, user_id: int):
async with self.pool.acquire() as conn:
return await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
async def fetch_users_batch(self, user_ids: List[int]):
async with self.pool.acquire() as conn:
return await conn.fetch(
"SELECT id, name, email FROM users WHERE id = ANY($1)",
user_ids
)
# Global database manager
db = DatabaseManager()
@app.on_event("startup")
async def startup():
await db.create_pool()
@app.on_event("shutdown")
async def shutdown():
await db.close_pool()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await db.fetch_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)
2. Query Optimization
Avoid N+1 Queries:
# Bad: N+1 query problem
async def get_posts_with_authors_slow():
posts = await db.fetch("SELECT id, title, author_id FROM posts LIMIT 10")
result = []
for post in posts:
# This creates N additional queries!
author = await db.fetchrow(
"SELECT name FROM users WHERE id = $1",
post['author_id']
)
result.append({
"id": post['id'],
"title": post['title'],
"author": author['name']
})
return result
# Good: Single optimized query
async def get_posts_with_authors_fast():
posts = await db.fetch("""
SELECT p.id, p.title, u.name as author_name
FROM posts p
JOIN users u ON p.author_id = u.id
LIMIT 10
""")
return [
{
"id": post['id'],
"title": post['title'],
"author": post['author_name']
}
for post in posts
]
Prepared Statements for High-Frequency Queries:
class OptimizedQueries:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
self._prepared_statements = {}
async def prepare_statements(self):
async with self.pool.acquire() as conn:
self._prepared_statements['get_user'] = await conn.prepare(
"SELECT id, name, email FROM users WHERE id = $1"
)
self._prepared_statements['get_user_posts'] = await conn.prepare("""
SELECT p.id, p.title, p.created_at
FROM posts p
WHERE p.author_id = $1
ORDER BY p.created_at DESC
LIMIT $2
""")
async def get_user_with_posts(self, user_id: int, limit: int = 10):
async with self.pool.acquire() as conn:
# Use prepared statements for better performance
user = await self._prepared_statements['get_user'].fetchrow(user_id)
posts = await self._prepared_statements['get_user_posts'].fetch(user_id, limit)
return {
"user": dict(user),
"posts": [dict(post) for post in posts]
}
3. Database Connection Optimization
Connection Pool Configuration:
# Production-optimized pool settings
async def create_optimized_pool():
return await asyncpg.create_pool(
dsn=DATABASE_URL,
min_size=5, # Minimum connections (adjust based on load)
max_size=25, # Maximum connections (don't exceed DB limits)
max_queries=50000, # Rotate connections after N queries
max_inactive_connection_lifetime=3600, # 1 hour
command_timeout=30, # Query timeout
server_settings={
'application_name': 'fastapi_app',
'jit': 'off', # Disable JIT for OLTP workloads
'shared_preload_libraries': 'pg_stat_statements'
}
)
Caching Strategies
1. Response Caching with Redis
import redis.asyncio as redis
import json
import hashlib
from functools import wraps
# Redis connection
redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)
def cache_response(expiration: int = 300):
"""Decorator for caching API responses"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key from function name and arguments
cache_key = f"cache:{func.__name__}:{hashlib.md5(str(kwargs).encode()).hexdigest()}"
# Check cache first
cached_result = await redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Execute function and cache result
result = await func(*args, **kwargs)
await redis_client.setex(
cache_key,
expiration,
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
@app.get("/expensive-computation/{param}")
@cache_response(expiration=600) # Cache for 10 minutes
async def expensive_computation(param: str):
# Simulate expensive operation
await asyncio.sleep(2)
return {"result": f"Computed result for {param}", "timestamp": time.time()}
2. In-Memory Caching
from functools import lru_cache
import asyncio
from typing import Dict, Any
class AsyncLRUCache:
def __init__(self, maxsize: int = 128):
self.cache: Dict[str, Any] = {}
self.maxsize = maxsize
async def get(self, key: str):
return self.cache.get(key)
async def set(self, key: str, value: Any):
if len(self.cache) >= self.maxsize:
# Remove oldest item (simple LRU implementation)
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = value
# Global cache instance
app_cache = AsyncLRUCache(maxsize=1000)
@app.get("/config")
async def get_app_config():
"""Cache application configuration"""
config = await app_cache.get("app_config")
if not config:
# Load from database or external service
config = await load_config_from_db()
await app_cache.set("app_config", config)
return config
# For CPU-intensive computations
@lru_cache(maxsize=256)
def cpu_intensive_calculation(param: str) -> dict:
"""Pure function that can be cached with lru_cache"""
# Expensive CPU operation
result = sum(i * ord(c) for i, c in enumerate(param * 1000))
return {"result": result, "param": param}
@app.get("/calculate/{param}")
async def calculate(param: str):
# This runs in a thread pool to avoid blocking
result = await asyncio.get_event_loop().run_in_executor(
None,
cpu_intensive_calculation,
param
)
return result
3. Cache Invalidation Strategies
from typing import Set
class CacheManager:
def __init__(self, redis_client):
self.redis = redis_client
async def invalidate_user_cache(self, user_id: int):
"""Invalidate all cache entries related to a user"""
pattern = f"user:{user_id}:*"
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
async def invalidate_pattern(self, pattern: str):
"""Invalidate cache entries matching a pattern"""
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
async def tag_cache_set(self, key: str, value: str, tags: Set[str], expiration: int = 300):
"""Set cache with tags for easy invalidation"""
# Set the main cache entry
await self.redis.setex(key, expiration, value)
# Associate with tags
for tag in tags:
tag_key = f"tag:{tag}"
await self.redis.sadd(tag_key, key)
await self.redis.expire(tag_key, expiration + 60) # Tags live slightly longer
async def invalidate_by_tags(self, tags: Set[str]):
"""Invalidate all cache entries with specific tags"""
keys_to_delete = set()
for tag in tags:
tag_key = f"tag:{tag}"
tagged_keys = await self.redis.smembers(tag_key)
keys_to_delete.update(tagged_keys)
if keys_to_delete:
await self.redis.delete(*keys_to_delete)
# Clean up tag keys
tag_keys = [f"tag:{tag}" for tag in tags]
await self.redis.delete(*tag_keys)
cache_manager = CacheManager(redis_client)
@app.put("/users/{user_id}")
async def update_user(user_id: int, user_data: dict):
# Update user in database
await db.update_user(user_id, user_data)
# Invalidate related cache entries
await cache_manager.invalidate_by_tags({f"user:{user_id}", "users_list"})
return {"message": "User updated successfully"}
Request/Response Optimization
1. Response Compression
from fastapi.middleware.gzip import GZipMiddleware
# Add GZip compression middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
# For custom compression handling
import gzip
from fastapi import Response
@app.get("/large-dataset")
async def get_large_dataset(compress: bool = True):
# Generate large response
data = {"items": [{"id": i, "value": f"item_{i}"} for i in range(10000)]}
if compress:
# Manual compression for specific endpoints
json_data = json.dumps(data)
compressed_data = gzip.compress(json_data.encode())
return Response(
content=compressed_data,
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
return data
2. Streaming Responses
from fastapi.responses import StreamingResponse
import csv
import io
@app.get("/export/users")
async def export_users():
"""Stream large CSV export without loading all data into memory"""
async def generate_csv():
# Create CSV header
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["id", "name", "email", "created_at"])
yield output.getvalue()
output.seek(0)
output.truncate(0)
# Stream data in batches
offset = 0
batch_size = 1000
while True:
users = await db.fetch_users_batch(offset, batch_size)
if not users:
break
for user in users:
writer.writerow([user['id'], user['name'], user['email'], user['created_at']])
yield output.getvalue()
output.seek(0)
output.truncate(0)
offset += batch_size
return StreamingResponse(
generate_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"}
)
3. Background Tasks Optimization
from fastapi import BackgroundTasks
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Thread pool for CPU-intensive tasks
cpu_executor = ThreadPoolExecutor(max_workers=4)
def cpu_intensive_task(data: dict):
"""CPU-intensive task that should run in thread pool"""
# Simulate heavy computation
result = sum(i ** 2 for i in range(100000))
# Log or process result
print(f"Processed {data} with result {result}")
async def io_intensive_task(user_id: int):
"""I/O task that can run async"""
async with httpx.AsyncClient() as client:
await client.post(
"https://api.analytics.com/event",
json={"user_id": user_id, "action": "profile_updated"}
)
@app.post("/users/{user_id}/update")
async def update_user_profile(
user_id: int,
profile_data: dict,
background_tasks: BackgroundTasks
):
# Update user in database (main request)
await db.update_user_profile(user_id, profile_data)
# Add background tasks
background_tasks.add_task(io_intensive_task, user_id)
# For CPU-intensive tasks, use thread pool
loop = asyncio.get_event_loop()
loop.run_in_executor(cpu_executor, cpu_intensive_task, profile_data)
return {"message": "Profile updated successfully"}
Production Server Optimization
1. Uvicorn Configuration
# uvicorn_config.py
import multiprocessing
# Production Uvicorn configuration
def get_uvicorn_config():
return {
"host": "0.0.0.0",
"port": 8000,
"workers": multiprocessing.cpu_count(),
"worker_class": "uvicorn.workers.UvicornWorker",
"worker_connections": 1000,
"max_requests": 1000,
"max_requests_jitter": 100,
"keepalive": 5,
"loop": "uvloop", # Use uvloop for better performance
"http": "httptools", # Use httptools for HTTP parsing
"log_level": "info",
"access_log": False, # Disable for better performance in production
}
# Alternative: Gunicorn with Uvicorn workers
# gunicorn_conf.py
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count()
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
keepalive = 5
preload_app = True
2. Application Startup Optimization
from contextlib import asynccontextmanager
# Optimize startup with proper lifespan management
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup optimizations
print("π Starting FastAPI application...")
# Warm up connection pools
await db.create_pool()
await redis_client.ping()
# Precompile regular expressions if used
import re
app.state.email_regex = re.compile(r'^[^@]+@[^@]+\.[^@]+$')
# Pre-load cache with frequently accessed data
await preload_cache()
# Warm up JIT compilation for critical paths
await warmup_endpoints()
print("β
FastAPI application started successfully")
yield
# Cleanup
print("π Shutting down FastAPI application...")
await db.close_pool()
await redis_client.close()
print("β
FastAPI application shut down successfully")
async def preload_cache():
"""Pre-load frequently accessed data into cache"""
config = await load_app_config()
await redis_client.setex("app_config", 3600, json.dumps(config))
async def warmup_endpoints():
"""Warm up critical endpoints to trigger JIT compilation"""
async with httpx.AsyncClient() as client:
# Hit critical endpoints to warm up
await client.get("http://localhost:8000/health")
app = FastAPI(lifespan=lifespan)
Monitoring and Profiling
1. Performance Metrics Collection
import time
import psutil
from prometheus_client import Counter, Histogram, Gauge, generate_latest
# Prometheus metrics
REQUEST_COUNT = Counter('fastapi_requests_total', 'Total requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('fastapi_request_duration_seconds', 'Request duration')
ACTIVE_CONNECTIONS = Gauge('fastapi_active_connections', 'Active connections')
MEMORY_USAGE = Gauge('fastapi_memory_usage_bytes', 'Memory usage')
CPU_USAGE = Gauge('fastapi_cpu_usage_percent', 'CPU usage')
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
# Increment request counter
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path
).inc()
# Process request
response = await call_next(request)
# Record duration
duration = time.time() - start_time
REQUEST_DURATION.observe(duration)
# Add performance headers
response.headers["X-Process-Time"] = str(duration)
return response
# Background task to update system metrics
async def update_system_metrics():
while True:
MEMORY_USAGE.set(psutil.virtual_memory().used)
CPU_USAGE.set(psutil.cpu_percent())
await asyncio.sleep(10)
@app.on_event("startup")
async def start_metrics_collection():
asyncio.create_task(update_system_metrics())
@app.get("/metrics")
async def get_metrics():
return Response(generate_latest(), media_type="text/plain")
2. Application Profiling
import cProfile
import io
import pstats
from functools import wraps
def profile_endpoint(func):
"""Decorator to profile specific endpoints"""
@wraps(func)
async def wrapper(*args, **kwargs):
if os.getenv("ENABLE_PROFILING") == "true":
profiler = cProfile.Profile()
profiler.enable()
try:
result = await func(*args, **kwargs)
return result
finally:
profiler.disable()
# Save profile results
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s)
ps.sort_stats('cumulative').print_stats(20)
# Log or save profile results
with open(f"/tmp/profile_{func.__name__}.txt", "w") as f:
f.write(s.getvalue())
else:
return await func(*args, **kwargs)
return wrapper
@app.get("/slow-endpoint")
@profile_endpoint
async def slow_endpoint():
# This endpoint will be profiled when ENABLE_PROFILING=true
await expensive_operation()
return {"message": "Operation completed"}
3. Database Query Monitoring
import time
from typing import Dict, List
class QueryMonitor:
def __init__(self):
self.slow_queries: List[Dict] = []
self.query_stats: Dict[str, Dict] = {}
async def log_query(self, query: str, duration: float, params=None):
# Log slow queries
if duration > 0.1: # 100ms threshold
self.slow_queries.append({
"query": query[:200], # Truncate long queries
"duration": duration,
"timestamp": time.time(),
"params": str(params)[:100] if params else None
})
# Keep only last 100 slow queries
if len(self.slow_queries) > 100:
self.slow_queries.pop(0)
# Update query statistics
query_hash = str(hash(query))
if query_hash not in self.query_stats:
self.query_stats[query_hash] = {
"count": 0,
"total_duration": 0,
"avg_duration": 0,
"max_duration": 0
}
stats = self.query_stats[query_hash]
stats["count"] += 1
stats["total_duration"] += duration
stats["avg_duration"] = stats["total_duration"] / stats["count"]
stats["max_duration"] = max(stats["max_duration"], duration)
query_monitor = QueryMonitor()
# Monkey patch asyncpg to add monitoring
original_fetch = asyncpg.Connection.fetch
original_fetchrow = asyncpg.Connection.fetchrow
async def monitored_fetch(self, query, *args, **kwargs):
start_time = time.time()
try:
result = await original_fetch(self, query, *args, **kwargs)
return result
finally:
duration = time.time() - start_time
await query_monitor.log_query(query, duration, args)
async def monitored_fetchrow(self, query, *args, **kwargs):
start_time = time.time()
try:
result = await original_fetchrow(self, query, *args, **kwargs)
return result
finally:
duration = time.time() - start_time
await query_monitor.log_query(query, duration, args)
# Apply monkey patches
asyncpg.Connection.fetch = monitored_fetch
asyncpg.Connection.fetchrow = monitored_fetchrow
@app.get("/debug/slow-queries")
async def get_slow_queries():
"""Debug endpoint to view slow queries"""
return {"slow_queries": query_monitor.slow_queries[-10:]}
Load Testing and Benchmarking
1. Load Testing Setup
# load_test.py
import asyncio
import aiohttp
import time
from statistics import mean, median
async def single_request(session, url):
start_time = time.time()
try:
async with session.get(url) as response:
await response.text()
return time.time() - start_time, response.status
except Exception as e:
return time.time() - start_time, -1
async def load_test(url: str, concurrent_requests: int, total_requests: int):
async with aiohttp.ClientSession() as session:
results = []
# Create semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(concurrent_requests)
async def bounded_request():
async with semaphore:
return await single_request(session, url)
# Run load test
start_time = time.time()
tasks = [bounded_request() for _ in range(total_requests)]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# Calculate statistics
durations = [r[0] for r in results]
statuses = [r[1] for r in results]
successful_requests = len([s for s in statuses if s == 200])
print(f"Total time: {total_time:.2f} seconds")
print(f"Requests per second: {total_requests / total_time:.2f}")
print(f"Successful requests: {successful_requests}/{total_requests}")
print(f"Average response time: {mean(durations):.3f}s")
print(f"Median response time: {median(durations):.3f}s")
print(f"95th percentile: {sorted(durations)[int(0.95 * len(durations))]:.3f}s")
# Run load test
if __name__ == "__main__":
asyncio.run(load_test(
url="http://localhost:8000/api/users",
concurrent_requests=50,
total_requests=1000
))
2. Benchmark Different Configurations
# benchmark.py
import subprocess
import time
import json
def run_benchmark(config_name: str, uvicorn_args: str):
print(f"\n㪠Benchmarking {config_name}")
print("-" * 50)
# Start server
server_process = subprocess.Popen(
f"uvicorn main:app {uvicorn_args}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Wait for server to start
time.sleep(3)
try:
# Run load test
result = subprocess.run([
"wrk", "-t12", "-c400", "-d30s",
"http://localhost:8000/api/health"
], capture_output=True, text=True)
print(result.stdout)
# Parse results (simplified)
lines = result.stdout.split('\n')
for line in lines:
if 'Requests/sec:' in line:
rps = float(line.split(':')[1].strip())
return rps
finally:
server_process.terminate()
server_process.wait()
return 0
# Benchmark different configurations
configurations = [
("Single Worker", "--workers 1"),
("Multiple Workers", "--workers 4"),
("With UVLoop", "--workers 4 --loop uvloop"),
("With HTTPTools", "--workers 4 --loop uvloop --http httptools"),
]
results = {}
for name, args in configurations:
rps = run_benchmark(name, args)
results[name] = rps
# Display results
print("\nπ Benchmark Results")
print("=" * 50)
for name, rps in results.items():
print(f"{name:20}: {rps:8.0f} req/s")
Advanced Optimization Techniques
1. Custom Serialization
import orjson
from fastapi.responses import ORJSONResponse
# Use faster JSON serialization
app = FastAPI(default_response_class=ORJSONResponse)
# Custom serialization for specific data types
class OptimizedResponse(ORJSONResponse):
def render(self, content):
return orjson.dumps(
content,
option=orjson.OPT_FAST_BYTES | orjson.OPT_SERIALIZE_NUMPY
)
@app.get("/optimized-data", response_class=OptimizedResponse)
async def get_optimized_data():
# Return large dataset with optimized serialization
return {"data": list(range(10000))}
2. Connection Reuse
# Global HTTP client for external API calls
http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30
)
)
@app.on_event("startup")
async def startup():
global http_client
# Client is already initialized
@app.on_event("shutdown")
async def shutdown():
await http_client.aclose()
@app.get("/external-api/{resource}")
async def proxy_external_api(resource: str):
# Reuse HTTP connection
response = await http_client.get(f"https://api.example.com/{resource}")
return response.json()
3. Memory Optimization
import gc
from typing import AsyncGenerator
async def process_large_dataset() -> AsyncGenerator[dict, None]:
"""Process large dataset with memory optimization"""
batch_size = 1000
offset = 0
while True:
# Process data in batches
batch = await db.fetch_data_batch(offset, batch_size)
if not batch:
break
for item in batch:
yield {"processed": process_item(item)}
# Force garbage collection for large datasets
if offset % 10000 == 0:
gc.collect()
offset += batch_size
@app.get("/large-dataset")
async def stream_large_dataset():
async def generate_response():
count = 0
async for item in process_large_dataset():
if count == 0:
yield '{"items":['
else:
yield ','
yield json.dumps(item)
count += 1
yield ']}'
return StreamingResponse(
generate_response(),
media_type="application/json"
)
Performance Checklist
Development Phase
- [ ] Use async/await throughout the application
- [ ] Implement proper error handling that doesn't leak resources
- [ ] Use type hints for better performance and IDE support
- [ ] Choose async-compatible libraries (httpx, asyncpg, etc.)
- [ ] Implement connection pooling for databases and external APIs
- [ ] Add request/response compression for large payloads
- [ ] Use background tasks for non-critical operations
Testing Phase
- [ ] Implement comprehensive load testing
- [ ] Profile critical endpoints under load
- [ ] Monitor memory usage and detect leaks
- [ ] Test with realistic data volumes
- [ ] Validate caching effectiveness
- [ ] Benchmark different server configurations
Production Phase
- [ ] Configure optimal worker counts based on CPU cores
- [ ] Set up proper monitoring and alerting
- [ ] Implement proper logging without performance impact
- [ ] Use CDN for static assets
- [ ] Configure database connection limits
- [ ] Set up health checks and graceful shutdowns
- [ ] Monitor and optimize slow queries
Infrastructure Phase
- [ ] Use SSD storage for databases
- [ ] Configure appropriate instance types
- [ ] Set up load balancing
- [ ] Implement auto-scaling policies
- [ ] Use connection pooling at the infrastructure level
- [ ] Configure proper timeout settings
- [ ] Implement circuit breakers for external dependencies
Common Performance Pitfalls
1. Blocking the Event Loop
# β BAD: Blocking operations
import time
import requests
@app.get("/bad-endpoint")
async def bad_endpoint():
time.sleep(1) # Blocks the entire event loop!
response = requests.get("https://api.example.com") # Synchronous HTTP call
return response.json()
# β
GOOD: Non-blocking operations
import asyncio
import httpx
@app.get("/good-endpoint")
async def good_endpoint():
await asyncio.sleep(1) # Non-blocking sleep
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()
2. Database Connection Leaks
# β BAD: Not properly closing connections
@app.get("/bad-db-usage")
async def bad_db_usage():
conn = await asyncpg.connect(DATABASE_URL)
result = await conn.fetch("SELECT * FROM users")
# Connection never closed!
return result
# β
GOOD: Proper connection management
@app.get("/good-db-usage")
async def good_db_usage():
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM users")
# Connection automatically returned to pool
return result
3. Memory Leaks in Long-Running Tasks
# β BAD: Accumulating data in memory
large_cache = {}
@app.post("/bad-caching")
async def bad_caching(data: dict):
key = data.get("key")
large_cache[key] = data # Never cleaned up!
return {"cached": True}
# β
GOOD: Bounded cache with expiration
from cachetools import TTLCache
bounded_cache = TTLCache(maxsize=1000, ttl=300)
@app.post("/good-caching")
async def good_caching(data: dict):
key = data.get("key")
bounded_cache[key] = data # Automatically expires
return {"cached": True}
Conclusion
FastAPI performance optimization is a multi-layered approach that involves:
- Async Best Practices: Using async/await properly throughout your application
- Database Optimization: Connection pooling, query optimization, and proper async drivers
- Caching Strategies: Multi-level caching with proper invalidation
- Server Configuration: Optimal worker settings and production configurations
- Monitoring: Comprehensive metrics and profiling to identify bottlenecks
By implementing these optimizations systematically, you can achieve:
- 3-5x performance improvements over basic implementations
- Sub-100ms response times for most endpoints
- Thousands of concurrent connections with minimal resource usage
- Horizontal scaling capabilities for high-traffic applications
Remember that premature optimization is the root of all evilβalways measure first, then optimize based on real bottlenecks identified through profiling and monitoring.
Related Resources
- FastAPI Production Deployment Guide
- FastAPI vs Flask Comparison
- Database Integration with FastAPI
- FastAPI Background Tasks Guide
Performance metrics and benchmarks are based on typical hardware configurations and may vary depending on your specific setup and requirements.