FastAPI Custom Exception Handling - Complete Guide
Proper exception handling in FastAPI ensures robust applications with meaningful error responses and effective debugging capabilities.
1. Custom Exception Classes
from fastapi import HTTPException, status
from typing import Optional, Dict, Any
class BaseAPIException(HTTPException):
"""Base exception class for API errors"""
def __init__(
self,
status_code: int,
detail: str,
error_code: Optional[str] = None,
headers: Optional[Dict[str, Any]] = None
):
super().__init__(status_code=status_code, detail=detail, headers=headers)
self.error_code = error_code
class ValidationException(BaseAPIException):
"""Raised when data validation fails"""
def __init__(self, detail: str, field: Optional[str] = None):
super().__init__(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=detail,
error_code="VALIDATION_ERROR"
)
self.field = field
class ResourceNotFoundException(BaseAPIException):
"""Raised when a requested resource is not found"""
def __init__(self, resource: str, identifier: str):
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"{resource} with id '{identifier}' not found",
error_code="RESOURCE_NOT_FOUND"
)
self.resource = resource
self.identifier = identifier
class DuplicateResourceException(BaseAPIException):
"""Raised when trying to create a duplicate resource"""
def __init__(self, resource: str, field: str, value: str):
super().__init__(
status_code=status.HTTP_409_CONFLICT,
detail=f"{resource} with {field} '{value}' already exists",
error_code="DUPLICATE_RESOURCE"
)
class InsufficientPermissionsException(BaseAPIException):
"""Raised when user lacks required permissions"""
def __init__(self, action: str, resource: str):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient permissions to {action} {resource}",
error_code="INSUFFICIENT_PERMISSIONS"
)
class RateLimitException(BaseAPIException):
"""Raised when rate limit is exceeded"""
def __init__(self, retry_after: int):
super().__init__(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded",
error_code="RATE_LIMIT_EXCEEDED",
headers={"Retry-After": str(retry_after)}
)
2. Global Exception Handlers
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import logging
import traceback
from datetime import datetime
app = FastAPI()
logger = logging.getLogger(__name__)
@app.exception_handler(BaseAPIException)
async def base_api_exception_handler(request: Request, exc: BaseAPIException):
"""Handle custom API exceptions"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.error_code,
"message": exc.detail,
"timestamp": datetime.utcnow().isoformat(),
"path": request.url.path
}
},
headers=exc.headers
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle Pydantic validation errors"""
errors = []
for error in exc.errors():
field_path = " -> ".join(str(loc) for loc in error["loc"][1:])
errors.append({
"field": field_path,
"message": error["msg"],
"type": error["type"],
"input": error.get("input")
})
return JSONResponse(
status_code=422,
content={
"error": {
"code": "VALIDATION_ERROR",
"message": "Request validation failed",
"details": errors,
"timestamp": datetime.utcnow().isoformat(),
"path": request.url.path
}
}
)
@app.exception_handler(500)
async def internal_server_error_handler(request: Request, exc: Exception):
"""Handle unexpected server errors"""
error_id = str(uuid.uuid4())
logger.error(
f"Internal server error [{error_id}]: {str(exc)}",
extra={
"error_id": error_id,
"path": request.url.path,
"method": request.method,
"traceback": traceback.format_exc()
}
)
return JSONResponse(
status_code=500,
content={
"error": {
"code": "INTERNAL_SERVER_ERROR",
"message": "An unexpected error occurred",
"error_id": error_id,
"timestamp": datetime.utcnow().isoformat(),
"path": request.url.path
}
}
)
@app.exception_handler(404)
async def not_found_handler(request: Request, exc: Exception):
"""Handle 404 Not Found errors"""
return JSONResponse(
status_code=404,
content={
"error": {
"code": "NOT_FOUND",
"message": "The requested resource was not found",
"timestamp": datetime.utcnow().isoformat(),
"path": request.url.path
}
}
)
3. Context-Aware Exception Handling
from contextlib import asynccontextmanager
from typing import AsyncGenerator
class ErrorContext:
"""Context manager for error handling with additional metadata"""
def __init__(self, operation: str, user_id: Optional[str] = None):
self.operation = operation
self.user_id = user_id
self.start_time = None
async def __aenter__(self):
self.start_time = datetime.utcnow()
logger.info(f"Starting operation: {self.operation}", extra={
"user_id": self.user_id,
"operation": self.operation
})
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
duration = datetime.utcnow() - self.start_time
if exc_type is None:
logger.info(f"Operation completed: {self.operation}", extra={
"user_id": self.user_id,
"operation": self.operation,
"duration_ms": duration.total_seconds() * 1000
})
else:
logger.error(f"Operation failed: {self.operation}", extra={
"user_id": self.user_id,
"operation": self.operation,
"duration_ms": duration.total_seconds() * 1000,
"error": str(exc_val),
"error_type": exc_type.__name__
})
# Usage in endpoints
@app.get("/users/{user_id}")
async def get_user(user_id: str, current_user: dict = Depends(get_current_user)):
async with ErrorContext("get_user", current_user.get("id")):
try:
user = await user_service.get_user(user_id)
if not user:
raise ResourceNotFoundException("User", user_id)
return user
except DatabaseConnectionError as e:
logger.error(f"Database connection failed: {e}")
raise HTTPException(
status_code=503,
detail="Service temporarily unavailable"
)
4. Database Exception Handling
from sqlalchemy.exc import IntegrityError, DatabaseError
from asyncpg.exceptions import UniqueViolationError, ForeignKeyViolationError
@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
"""Handle database integrity constraint violations"""
error_message = str(exc.orig)
# Parse different types of constraint violations
if "UNIQUE constraint" in error_message or isinstance(exc.orig, UniqueViolationError):
return JSONResponse(
status_code=409,
content={
"error": {
"code": "DUPLICATE_RESOURCE",
"message": "A resource with these values already exists",
"timestamp": datetime.utcnow().isoformat()
}
}
)
elif "FOREIGN KEY constraint" in error_message or isinstance(exc.orig, ForeignKeyViolationError):
return JSONResponse(
status_code=400,
content={
"error": {
"code": "INVALID_REFERENCE",
"message": "Referenced resource does not exist",
"timestamp": datetime.utcnow().isoformat()
}
}
)
# Generic integrity error
return JSONResponse(
status_code=400,
content={
"error": {
"code": "DATA_INTEGRITY_ERROR",
"message": "Data integrity constraint violated",
"timestamp": datetime.utcnow().isoformat()
}
}
)
@app.exception_handler(DatabaseError)
async def database_error_handler(request: Request, exc: DatabaseError):
"""Handle general database errors"""
logger.error(f"Database error: {exc}", extra={
"path": request.url.path,
"method": request.method
})
return JSONResponse(
status_code=503,
content={
"error": {
"code": "DATABASE_ERROR",
"message": "Database service is temporarily unavailable",
"timestamp": datetime.utcnow().isoformat()
}
}
)
5. Service Layer Exception Patterns
# services/user_service.py
from typing import Optional
import asyncio
class UserService:
def __init__(self, db_session, cache_client):
self.db = db_session
self.cache = cache_client
async def create_user(self, user_data: dict) -> dict:
"""Create a new user with proper error handling"""
try:
# Check if user already exists
existing_user = await self.get_user_by_email(user_data["email"])
if existing_user:
raise DuplicateResourceException("User", "email", user_data["email"])
# Validate business rules
if not self._is_valid_age(user_data.get("age")):
raise ValidationException("Age must be between 18 and 120", "age")
# Create user
user = await self.db.create_user(user_data)
# Async operations with timeout
try:
await asyncio.wait_for(
self._send_welcome_email(user["email"]),
timeout=10.0
)
except asyncio.TimeoutError:
logger.warning(f"Welcome email timeout for user {user['id']}")
# Don't fail the request, just log the issue
return user
except ValidationException:
raise # Re-raise validation errors
except DuplicateResourceException:
raise # Re-raise duplicate errors
except Exception as e:
logger.error(f"Unexpected error creating user: {e}")
raise HTTPException(
status_code=500,
detail="Failed to create user"
)
async def get_user(self, user_id: str) -> Optional[dict]:
"""Get user with caching and error handling"""
try:
# Try cache first
cached_user = await self.cache.get(f"user:{user_id}")
if cached_user:
return cached_user
# Fallback to database
user = await self.db.get_user(user_id)
if not user:
return None
# Cache the result
await self.cache.set(f"user:{user_id}", user, expire=300)
return user
except Exception as e:
logger.error(f"Error fetching user {user_id}: {e}")
# Degrade gracefully - try database without cache
try:
return await self.db.get_user(user_id)
except Exception as db_error:
logger.error(f"Database fallback failed for user {user_id}: {db_error}")
raise HTTPException(
status_code=503,
detail="User service temporarily unavailable"
)
6. Middleware for Error Logging
from fastapi import Request, Response
import time
import uuid
@app.middleware("http")
async def error_logging_middleware(request: Request, call_next):
"""Middleware to log errors and add request tracking"""
request_id = str(uuid.uuid4())
start_time = time.time()
# Add request ID to headers for tracing
request.state.request_id = request_id
try:
response = await call_next(request)
# Log successful requests
process_time = time.time() - start_time
logger.info(
"Request completed",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"process_time": process_time,
"user_agent": request.headers.get("user-agent"),
"client_ip": request.client.host
}
)
# Add request ID to response headers
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
# Log errors with full context
process_time = time.time() - start_time
logger.error(
f"Request failed: {str(e)}",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"process_time": process_time,
"error_type": type(e).__name__,
"user_agent": request.headers.get("user-agent"),
"client_ip": request.client.host,
"traceback": traceback.format_exc()
}
)
# Re-raise the exception to be handled by exception handlers
raise
7. Health Check with Error Details
@app.get("/health")
async def health_check():
"""Health check endpoint with component status"""
health_status = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"components": {}
}
# Check database
try:
await db.execute("SELECT 1")
health_status["components"]["database"] = {"status": "healthy"}
except Exception as e:
health_status["components"]["database"] = {
"status": "unhealthy",
"error": str(e)
}
health_status["status"] = "unhealthy"
# Check external services
try:
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.external-service.com/health",
timeout=5.0
)
if response.status_code == 200:
health_status["components"]["external_api"] = {"status": "healthy"}
else:
health_status["components"]["external_api"] = {
"status": "unhealthy",
"error": f"HTTP {response.status_code}"
}
except Exception as e:
health_status["components"]["external_api"] = {
"status": "unhealthy",
"error": str(e)
}
# Return appropriate status code
status_code = 200 if health_status["status"] == "healthy" else 503
return JSONResponse(
status_code=status_code,
content=health_status
)
8. Production Error Monitoring
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
# Initialize Sentry for production error tracking
sentry_sdk.init(
dsn="YOUR_SENTRY_DSN",
integrations=[
FastApiIntegration(auto_enabling_integrations=False),
SqlalchemyIntegration(),
],
traces_sample_rate=0.1,
send_default_pii=False
)
@app.middleware("http")
async def sentry_context_middleware(request: Request, call_next):
"""Add request context to Sentry errors"""
with sentry_sdk.configure_scope() as scope:
scope.set_tag("path", request.url.path)
scope.set_tag("method", request.method)
scope.set_context("request", {
"url": str(request.url),
"headers": dict(request.headers),
"query_params": dict(request.query_params)
})
response = await call_next(request)
scope.set_tag("status_code", response.status_code)
return response
Best Practices Summary
1. Exception Hierarchy
- Create a clear hierarchy of custom exceptions
- Include relevant context in exception messages
- Use appropriate HTTP status codes
2. Error Responses
- Return consistent error response formats
- Include error codes for programmatic handling
- Provide helpful error messages for debugging
3. Logging Strategy
- Log errors with sufficient context
- Use structured logging for better parsing
- Include request IDs for tracing
4. Graceful Degradation
- Handle external service failures gracefully
- Implement circuit breakers for unstable services
- Provide fallback mechanisms where possible