FastAPI + MCP Integration Guide: Building Production AI Agents in 2025
FastAPI has emerged as the perfect framework for MCP integration in 2025, combining Python's AI ecosystem with high-performance async capabilities. This comprehensive guide shows you how to build production-grade AI agents using FastAPI and Model Context Protocol.
Why FastAPI + MCP is a Winning Combination
FastAPI Advantages for MCP
- Native Async Support: Perfect for I/O-heavy AI operations
- Automatic API Documentation: OpenAPI specs that integrate with MCP discovery
- Type Safety: Pydantic models ensure reliable tool contracts
- High Performance: Built on Starlette and Uvicorn for production speed
- Easy Testing: Built-in test client for MCP server validation
MCP Benefits for FastAPI
- Universal Tool Access: Connect to any MCP-compatible service
- Standardized Integration: No more custom API clients
- Real-time Context: Live data access for AI decision-making
- Scalable Architecture: Handle thousands of tools with minimal overhead
Architecture Overview: FastAPI MCP Integration
graph TB
A[AI Agent] --> B[MCP Client]
B --> C[FastAPI MCP Server]
C --> D[FastAPI Application]
D --> E[Database]
D --> F[External APIs]
D --> G[File System]
D --> H[Business Logic]
subgraph "FastAPI MCP Stack"
C
D
I[Pydantic Models]
J[Async Handlers]
K[Authentication]
L[Monitoring]
end
Quick Start: Your First FastAPI MCP Server
Installation
# Install FastAPI MCP dependencies
pip install fastapi-mcp uvicorn httpx pydantic python-dotenv
# Or use uv (recommended for 2025)
uv add fastapi-mcp uvicorn httpx pydantic python-dotenv
Basic MCP Server Setup
# server.py
from fastapi import FastAPI
from fastapi_mcp import FastApiMCP
from pydantic import BaseModel
from typing import List, Optional
import httpx
import os
# Create FastAPI app
app = FastAPI(
title="My AI Agent API",
description="FastAPI server with MCP integration",
version="1.0.0"
)
# Pydantic models for type safety
class WeatherRequest(BaseModel):
city: str
units: str = "metric"
class WeatherResponse(BaseModel):
temperature: float
description: str
humidity: int
# Standard FastAPI endpoints
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "FastAPI MCP Server"}
@app.post("/weather", response_model=WeatherResponse)
async def get_weather(request: WeatherRequest):
"""Get current weather for a city"""
# Your weather API logic here
api_key = os.getenv("WEATHER_API_KEY")
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.openweathermap.org/data/2.5/weather",
params={
"q": request.city,
"appid": api_key,
"units": request.units
}
)
data = response.json()
return WeatherResponse(
temperature=data["main"]["temp"],
description=data["weather"][0]["description"],
humidity=data["main"]["humidity"]
)
# Initialize MCP integration
mcp = FastApiMCP(
app,
name="Weather Agent API",
description="AI-powered weather information service",
base_url="http://localhost:8000"
)
# Mount MCP server to FastAPI app
mcp.mount()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Testing Your MCP Server
# Run the server
uvicorn server:app --reload
# Test standard FastAPI endpoint
curl -X POST "http://localhost:8000/weather" \
-H "Content-Type: application/json" \
-d '{"city": "San Francisco"}'
# Access MCP manifest
curl "http://localhost:8000/mcp/manifest"
# MCP tools are now available at http://localhost:8000/mcp
Advanced Integration Patterns
1. Database Integration with SQLAlchemy
# models.py
from sqlalchemy import Column, Integer, String, DateTime, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
Base = declarative_base()
class UserActivity(Base):
__tablename__ = "user_activities"
id = Column(Integer, primary_key=True)
user_id = Column(String, nullable=False)
activity = Column(String, nullable=False)
timestamp = Column(DateTime, default=datetime.utcnow)
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
# enhanced_server.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi_mcp import FastApiMCP
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import get_db
from models import UserActivity
from typing import List
app = FastAPI()
@app.post("/user-activity", operation_id="log_user_activity")
async def log_user_activity(
user_id: str,
activity: str,
db: AsyncSession = Depends(get_db)
):
"""Log user activity for AI analysis"""
new_activity = UserActivity(user_id=user_id, activity=activity)
db.add(new_activity)
await db.commit()
await db.refresh(new_activity)
return {
"id": new_activity.id,
"user_id": user_id,
"activity": activity,
"timestamp": new_activity.timestamp
}
@app.get("/user-activities/{user_id}", operation_id="get_user_activities")
async def get_user_activities(
user_id: str,
limit: int = 10,
db: AsyncSession = Depends(get_db)
) -> List[dict]:
"""Retrieve user activities for AI context"""
query = select(UserActivity).where(
UserActivity.user_id == user_id
).limit(limit).order_by(UserActivity.timestamp.desc())
result = await db.execute(query)
activities = result.scalars().all()
return [
{
"id": activity.id,
"activity": activity.activity,
"timestamp": activity.timestamp.isoformat()
}
for activity in activities
]
# MCP Integration with explicit operation IDs
mcp = FastApiMCP(
app,
name="User Activity Tracker",
description="Track and analyze user activities for AI insights",
include_operations=["log_user_activity", "get_user_activities"]
)
mcp.mount()
2. Authentication and Security
# auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
import os
security = HTTPBearer()
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return user_id
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
# secure_server.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi_mcp import FastApiMCP
from auth import get_current_user
from typing import Dict, Any
app = FastAPI()
@app.post("/secure-operation", operation_id="secure_ai_operation")
async def secure_ai_operation(
data: Dict[str, Any],
current_user: str = Depends(get_current_user)
):
"""Secure operation that requires authentication"""
# Only authenticated users can access this MCP tool
return {
"message": f"Secure operation completed for user {current_user}",
"data": data,
"timestamp": datetime.utcnow().isoformat()
}
# MCP with authentication dependency
mcp = FastApiMCP(
app,
name="Secure AI Operations",
description="Authenticated MCP tools for enterprise AI",
# FastAPI-MCP will respect your dependency injection
)
mcp.mount()
3. Background Tasks and Async Processing
# async_server.py
from fastapi import FastAPI, BackgroundTasks
from fastapi_mcp import FastApiMCP
from celery import Celery
import asyncio
from typing import Dict, List
app = FastAPI()
# Celery for heavy background processing
celery_app = Celery(
"mcp_tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def process_large_dataset(data: List[Dict]):
"""Heavy processing task for AI analysis"""
# Simulate data processing
import time
time.sleep(10) # Simulate heavy computation
return {"processed_items": len(data), "status": "completed"}
@app.post("/analyze-dataset", operation_id="analyze_large_dataset")
async def analyze_dataset(
dataset: List[Dict],
background_tasks: BackgroundTasks
):
"""Trigger analysis of large dataset for AI insights"""
# Start background processing
task = process_large_dataset.delay(dataset)
# Add monitoring task
background_tasks.add_task(monitor_task_progress, task.id)
return {
"task_id": task.id,
"status": "processing",
"message": "Dataset analysis started"
}
async def monitor_task_progress(task_id: str):
"""Monitor background task progress"""
# Implementation for task monitoring
pass
@app.get("/task-status/{task_id}", operation_id="check_task_status")
async def check_task_status(task_id: str):
"""Check status of background processing task"""
task = process_large_dataset.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
mcp = FastApiMCP(
app,
name="Async Data Processor",
description="Handle large-scale data processing for AI analysis"
)
mcp.mount()
Production Deployment Strategies
1. Docker Containerization
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose MCP port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run with Gunicorn for production
CMD ["gunicorn", "server:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
2. Kubernetes Deployment
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-mcp-server
spec:
replicas: 3
selector:
matchLabels:
app: fastapi-mcp-server
template:
metadata:
labels:
app: fastapi-mcp-server
spec:
containers:
- name: fastapi-mcp
image: your-registry/fastapi-mcp:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: fastapi-mcp-service
spec:
selector:
app: fastapi-mcp-server
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: fastapi-mcp-ingress
annotations:
nginx.ingress.kubernetes.io/proxy-buffering: "off"
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
spec:
rules:
- host: mcp-api.yourcompany.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: fastapi-mcp-service
port:
number: 80
3. Environment Configuration
# config.py
from pydantic import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# Database
database_url: str = "postgresql+asyncpg://localhost/mcpdb"
# Authentication
secret_key: str
jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# MCP Configuration
mcp_server_name: str = "Production MCP Server"
mcp_base_url: str = "https://mcp-api.yourcompany.com"
# External APIs
weather_api_key: Optional[str] = None
openai_api_key: Optional[str] = None
# Monitoring
sentry_dsn: Optional[str] = None
log_level: str = "INFO"
# Performance
worker_processes: int = 4
max_connections: int = 1000
class Config:
env_file = ".env"
settings = Settings()
# production_server.py
from fastapi import FastAPI
from fastapi_mcp import FastApiMCP
from config import settings
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
import logging
# Configure logging
logging.basicConfig(level=getattr(logging, settings.log_level))
logger = logging.getLogger(__name__)
# Initialize Sentry for error tracking
if settings.sentry_dsn:
sentry_sdk.init(
dsn=settings.sentry_dsn,
integrations=[FastApiIntegration()],
traces_sample_rate=0.1,
)
# Create production FastAPI app
app = FastAPI(
title=settings.mcp_server_name,
description="Production-grade FastAPI MCP server",
docs_url="/docs" if settings.log_level == "DEBUG" else None,
redoc_url="/redoc" if settings.log_level == "DEBUG" else None,
)
# Production middleware
@app.middleware("http")
async def log_requests(request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
logger.info(
f"{request.method} {request.url.path} "
f"completed in {process_time:.3f}s "
f"with status {response.status_code}"
)
return response
# MCP configuration for production
mcp = FastApiMCP(
app,
name=settings.mcp_server_name,
description="Enterprise AI operations platform",
base_url=settings.mcp_base_url,
# Security configurations
include_tags=["ai-tools", "analytics"],
exclude_tags=["internal", "debug"]
)
mcp.mount()
Advanced MCP Features with FastAPI
1. Custom Tool Filtering and Discovery
# custom_filtering.py
from fastapi import FastAPI, Request
from fastapi_mcp import FastApiMCP
from typing import List, Dict, Any
app = FastAPI()
class CustomMCPServer(FastApiMCP):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def filter_tools_by_user(self, request: Request) -> List[str]:
"""Custom logic to filter tools based on user permissions"""
# Extract user from request (JWT, session, etc.)
user_role = await self.extract_user_role(request)
if user_role == "admin":
return ["all"] # Admin sees all tools
elif user_role == "analyst":
return ["read_data", "analyze_trends", "generate_report"]
else:
return ["read_public_data"]
async def extract_user_role(self, request: Request) -> str:
# Your authentication logic here
return "analyst" # Example
# Usage
mcp = CustomMCPServer(
app,
name="Role-Based MCP Server",
description="MCP server with role-based tool access"
)
2. Real-time Data Streaming
# streaming_server.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi_mcp import FastApiMCP
import asyncio
import json
app = FastAPI()
@app.get("/stream-data/{data_source}", operation_id="stream_real_time_data")
async def stream_real_time_data(data_source: str):
"""Stream real-time data for AI analysis"""
async def generate_data_stream():
"""Generate streaming data"""
counter = 0
while counter < 100: # Limit for demo
data = {
"timestamp": datetime.utcnow().isoformat(),
"source": data_source,
"value": random.random() * 100,
"counter": counter
}
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(1)
counter += 1
return StreamingResponse(
generate_data_stream(),
media_type="text/plain"
)
@app.post("/analyze-stream", operation_id="analyze_data_stream")
async def analyze_data_stream(
stream_config: Dict[str, Any],
analysis_type: str = "real_time"
):
"""Configure real-time data analysis"""
# Set up streaming analysis pipeline
return {
"analysis_id": f"stream_{uuid.uuid4().hex[:8]}",
"status": "active",
"config": stream_config
}
mcp = FastApiMCP(app, name="Streaming Data Analyzer")
mcp.mount()
3. Multi-Agent Coordination
# multi_agent_server.py
from fastapi import FastAPI
from fastapi_mcp import FastApiMCP
from typing import Dict, List
import asyncio
app = FastAPI()
class AgentCoordinator:
def __init__(self):
self.active_agents = {}
self.task_queue = asyncio.Queue()
async def register_agent(self, agent_id: str, capabilities: List[str]):
self.active_agents[agent_id] = {
"capabilities": capabilities,
"status": "ready",
"current_task": None
}
async def delegate_task(self, task: Dict) -> str:
# Find best agent for task
suitable_agents = [
agent_id for agent_id, info in self.active_agents.items()
if task["required_capability"] in info["capabilities"]
and info["status"] == "ready"
]
if suitable_agents:
selected_agent = suitable_agents[0]
self.active_agents[selected_agent]["status"] = "busy"
self.active_agents[selected_agent]["current_task"] = task["task_id"]
return selected_agent
else:
await self.task_queue.put(task)
return "queued"
coordinator = AgentCoordinator()
@app.post("/register-agent", operation_id="register_ai_agent")
async def register_agent(agent_id: str, capabilities: List[str]):
"""Register an AI agent with the coordination system"""
await coordinator.register_agent(agent_id, capabilities)
return {"status": "registered", "agent_id": agent_id}
@app.post("/delegate-task", operation_id="delegate_task_to_agent")
async def delegate_task(task: Dict):
"""Delegate a task to the most suitable agent"""
assigned_agent = await coordinator.delegate_task(task)
return {
"task_id": task["task_id"],
"assigned_agent": assigned_agent,
"status": "assigned" if assigned_agent != "queued" else "queued"
}
@app.get("/agent-status", operation_id="get_agent_status")
async def get_agent_status():
"""Get status of all registered agents"""
return coordinator.active_agents
mcp = FastApiMCP(
app,
name="Multi-Agent Coordinator",
description="Coordinate multiple AI agents for complex tasks"
)
mcp.mount()
Performance Optimization and Monitoring
1. Caching and Performance
# performance_server.py
from fastapi import FastAPI, Depends
from fastapi_mcp import FastApiMCP
from functools import lru_cache
import redis
import pickle
from typing import Any, Optional
app = FastAPI()
# Redis for distributed caching
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class CacheManager:
def __init__(self, redis_client):
self.redis = redis_client
async def get(self, key: str) -> Optional[Any]:
cached = self.redis.get(key)
if cached:
return pickle.loads(cached)
return None
async def set(self, key: str, value: Any, expire: int = 3600):
self.redis.setex(key, expire, pickle.dumps(value))
cache = CacheManager(redis_client)
@app.get("/expensive-computation/{input_data}", operation_id="cached_computation")
async def expensive_computation(input_data: str):
"""Expensive computation with caching for AI operations"""
cache_key = f"computation:{hash(input_data)}"
# Check cache first
cached_result = await cache.get(cache_key)
if cached_result:
return {"result": cached_result, "from_cache": True}
# Simulate expensive computation
await asyncio.sleep(2) # Simulate processing time
result = f"processed_{input_data}"
# Cache the result
await cache.set(cache_key, result, expire=1800) # 30 minutes
return {"result": result, "from_cache": False}
# Memory caching for frequently accessed data
@lru_cache(maxsize=1000)
def get_static_ai_prompt(prompt_type: str) -> str:
"""Get cached AI prompts"""
prompts = {
"analysis": "Analyze the following data...",
"summary": "Provide a summary of...",
"recommendation": "Based on the data, recommend..."
}
return prompts.get(prompt_type, "Default prompt")
mcp = FastApiMCP(app, name="High-Performance MCP Server")
mcp.mount()
2. Monitoring and Observability
# monitoring_server.py
from fastapi import FastAPI, Request
from fastapi_mcp import FastApiMCP
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
import time
from typing import Dict
app = FastAPI()
# Prometheus metrics
REQUEST_COUNT = Counter(
'mcp_requests_total',
'Total MCP requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'mcp_request_duration_seconds',
'MCP request duration',
['method', 'endpoint']
)
TOOL_USAGE = Counter(
'mcp_tool_usage_total',
'MCP tool usage count',
['tool_name', 'status']
)
@app.middleware("http")
async def monitor_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
# Record metrics
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
@app.get("/metrics")
async def get_metrics():
"""Prometheus metrics endpoint"""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.get("/health-detailed")
async def detailed_health_check():
"""Detailed health check for monitoring"""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0",
"checks": {
"database": await check_database_health(),
"redis": await check_redis_health(),
"external_apis": await check_external_apis()
}
}
async def check_database_health() -> Dict:
"""Check database connectivity"""
try:
# Your database health check logic
return {"status": "healthy", "response_time_ms": 10}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
mcp = FastApiMCP(app, name="Monitored MCP Server")
mcp.mount()
Testing FastAPI MCP Integrations
1. Unit Tests
# test_mcp_server.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch
from server import app, mcp
@pytest.fixture
def client():
return TestClient(app)
@pytest.fixture
def mcp_client():
"""Test client specifically for MCP endpoints"""
return TestClient(app, base_url="http://testserver/mcp")
def test_health_endpoint(client):
"""Test basic health endpoint"""
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_mcp_manifest(mcp_client):
"""Test MCP manifest endpoint"""
response = mcp_client.get("/manifest")
assert response.status_code == 200
manifest = response.json()
assert "name" in manifest
assert "endpoints" in manifest
@pytest.mark.asyncio
async def test_weather_tool_with_mock():
"""Test weather tool with mocked external API"""
with patch('httpx.AsyncClient.get') as mock_get:
# Mock external API response
mock_response = AsyncMock()
mock_response.json.return_value = {
"main": {"temp": 22.5, "humidity": 65},
"weather": [{"description": "sunny"}]
}
mock_get.return_value = mock_response
client = TestClient(app)
response = client.post(
"/weather",
json={"city": "San Francisco", "units": "metric"}
)
assert response.status_code == 200
data = response.json()
assert data["temperature"] == 22.5
assert data["description"] == "sunny"
def test_mcp_tool_discovery(mcp_client):
"""Test MCP tool discovery"""
response = mcp_client.get("/tools")
assert response.status_code == 200
tools = response.json()
assert isinstance(tools, list)
assert len(tools) > 0
# Check tool structure
tool = tools[0]
assert "name" in tool
assert "description" in tool
assert "input_schema" in tool
@pytest.mark.asyncio
async def test_authenticated_mcp_tool():
"""Test MCP tool that requires authentication"""
from auth import create_access_token
# Create test token
token = create_access_token({"sub": "test_user"})
client = TestClient(app)
response = client.post(
"/secure-operation",
json={"test": "data"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "test_user" in response.json()["message"]
2. Integration Tests
# test_mcp_integration.py
import pytest
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
@pytest.mark.asyncio
async def test_full_mcp_integration():
"""Test complete MCP client-server interaction"""
server_params = StdioServerParameters(
command="uvicorn",
args=["server:app", "--port", "8001"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Test tool discovery
tools = await session.list_tools()
assert len(tools.tools) > 0
# Test tool execution
result = await session.call_tool(
"get_weather",
{"city": "London", "units": "metric"}
)
assert result.content
assert "temperature" in str(result.content[0])
@pytest.mark.asyncio
async def test_mcp_performance():
"""Test MCP server performance under load"""
import time
start_time = time.time()
tasks = []
for i in range(100):
task = asyncio.create_task(
make_mcp_request(f"test_city_{i}")
)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
# Assert all requests completed successfully
assert len(results) == 100
assert all(result["status"] == "success" for result in results)
# Assert reasonable performance (adjust threshold as needed)
total_time = end_time - start_time
assert total_time < 30 # All 100 requests in under 30 seconds
async def make_mcp_request(city: str):
"""Helper function for load testing"""
# Implementation for making MCP requests
pass
Best Practices and Common Patterns
1. Error Handling
# error_handling.py
from fastapi import FastAPI, HTTPException, Request
from fastapi_mcp import FastApiMCP
from fastapi.responses import JSONResponse
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
class MCPError(Exception):
"""Custom MCP-specific error"""
def __init__(self, message: str, error_code: str = "MCP_ERROR"):
self.message = message
self.error_code = error_code
super().__init__(self.message)
@app.exception_handler(MCPError)
async def mcp_error_handler(request: Request, exc: MCPError):
"""Custom error handler for MCP errors"""
logger.error(f"MCP Error: {exc.message} - Code: {exc.error_code}")
return JSONResponse(
status_code=400,
content={
"error": exc.error_code,
"message": exc.message,
"type": "mcp_error"
}
)
@app.post("/safe-operation", operation_id="safe_ai_operation")
async def safe_ai_operation(data: dict):
"""AI operation with comprehensive error handling"""
try:
# Validate input
if not data.get("required_field"):
raise MCPError(
"Missing required field",
error_code="MISSING_FIELD"
)
# Perform operation
result = await perform_ai_operation(data)
return {"success": True, "result": result}
except MCPError:
raise # Re-raise MCP errors
except ValueError as e:
raise MCPError(f"Invalid input: {str(e)}", "INVALID_INPUT")
except Exception as e:
logger.exception("Unexpected error in AI operation")
raise HTTPException(
status_code=500,
detail="Internal server error"
)
async def perform_ai_operation(data: dict):
"""Simulate AI operation that might fail"""
if data.get("simulate_error"):
raise ValueError("Simulated error")
return "operation_completed"
mcp = FastApiMCP(app, name="Robust MCP Server")
mcp.mount()
2. Configuration Management
# config_management.py
from pydantic import BaseSettings, validator
from typing import Dict, List, Optional
import os
class MCPSettings(BaseSettings):
"""MCP-specific configuration"""
server_name: str = "FastAPI MCP Server"
server_description: str = "Production MCP server"
base_url: str = "http://localhost:8000"
# Tool filtering
include_operations: Optional[List[str]] = None
exclude_operations: Optional[List[str]] = None
include_tags: Optional[List[str]] = None
exclude_tags: Optional[List[str]] = None
# Features
describe_all_responses: bool = True
describe_full_response_schema: bool = False
@validator('base_url')
def validate_base_url(cls, v):
if not v.startswith(('http://', 'https://')):
raise ValueError('base_url must start with http:// or https://')
return v.rstrip('/')
class AppSettings(BaseSettings):
"""Application configuration"""
# Environment
environment: str = "development"
debug: bool = False
# MCP Configuration
mcp: MCPSettings = MCPSettings()
# Database
database_url: str = "sqlite:///./app.db"
# Security
secret_key: str
jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# External APIs
external_apis: Dict[str, str] = {}
# Monitoring
enable_metrics: bool = True
metrics_path: str = "/metrics"
class Config:
env_file = ".env"
env_nested_delimiter = "__"
case_sensitive = False
# Global settings instance
settings = AppSettings()
# Usage in your FastAPI app
def create_mcp_server(app: FastAPI) -> FastApiMCP:
"""Factory function to create MCP server with configuration"""
return FastApiMCP(
app,
name=settings.mcp.server_name,
description=settings.mcp.server_description,
base_url=settings.mcp.base_url,
include_operations=settings.mcp.include_operations,
exclude_operations=settings.mcp.exclude_operations,
include_tags=settings.mcp.include_tags,
exclude_tags=settings.mcp.exclude_tags,
describe_all_responses=settings.mcp.describe_all_responses,
describe_full_response_schema=settings.mcp.describe_full_response_schema
)
Real-World Use Case: AI-Powered E-commerce Platform
Let's build a complete e-commerce AI agent using FastAPI and MCP:
# ecommerce_ai_server.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi_mcp import FastApiMCP
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional, Dict
import asyncio
app = FastAPI(title="E-commerce AI Platform")
# Product recommendation system
@app.post("/recommendations", operation_id="get_product_recommendations")
async def get_product_recommendations(
user_id: str,
category: Optional[str] = None,
limit: int = 10,
db: AsyncSession = Depends(get_db)
):
"""Get AI-powered product recommendations for a user"""
# Fetch user history
user_history = await get_user_purchase_history(db, user_id)
# Get browsing behavior
browsing_data = await get_user_browsing_data(db, user_id)
# AI recommendation algorithm
recommendations = await generate_recommendations(
user_history, browsing_data, category, limit
)
return {
"user_id": user_id,
"recommendations": recommendations,
"generated_at": datetime.utcnow().isoformat()
}
@app.post("/inventory-optimization", operation_id="optimize_inventory")
async def optimize_inventory(
store_id: str,
prediction_horizon_days: int = 30,
db: AsyncSession = Depends(get_db)
):
"""AI-powered inventory optimization"""
# Get sales history
sales_data = await get_sales_history(db, store_id, days=90)
# Get current inventory
current_inventory = await get_current_inventory(db, store_id)
# Get external factors (seasonality, trends, etc.)
market_factors = await get_market_factors(store_id)
# AI optimization
optimization_plan = await calculate_optimal_inventory(
sales_data, current_inventory, market_factors, prediction_horizon_days
)
return {
"store_id": store_id,
"optimization_plan": optimization_plan,
"estimated_cost_savings": optimization_plan["savings"],
"confidence_score": optimization_plan["confidence"]
}
@app.post("/dynamic-pricing", operation_id="calculate_dynamic_pricing")
async def calculate_dynamic_pricing(
product_ids: List[str],
competitor_data: Optional[Dict] = None,
db: AsyncSession = Depends(get_db)
):
"""AI-driven dynamic pricing optimization"""
pricing_results = []
for product_id in product_ids:
# Get product data
product = await get_product_details(db, product_id)
# Get demand patterns
demand_data = await get_demand_patterns(db, product_id)
# Analyze competitor pricing
competitor_prices = competitor_data or await scrape_competitor_prices(product_id)
# AI pricing algorithm
optimal_price = await calculate_optimal_price(
product, demand_data, competitor_prices
)
pricing_results.append({
"product_id": product_id,
"current_price": product["price"],
"recommended_price": optimal_price["price"],
"expected_revenue_change": optimal_price["revenue_impact"],
"confidence": optimal_price["confidence"]
})
return {
"pricing_results": pricing_results,
"total_expected_revenue_increase": sum(
p["expected_revenue_change"] for p in pricing_results
)
}
@app.post("/customer-service-automation", operation_id="handle_customer_inquiry")
async def handle_customer_inquiry(
inquiry: str,
customer_id: Optional[str] = None,
urgency: str = "normal",
db: AsyncSession = Depends(get_db)
):
"""AI-powered customer service automation"""
# Analyze inquiry intent
intent_analysis = await analyze_inquiry_intent(inquiry)
# Get customer context if available
customer_context = None
if customer_id:
customer_context = await get_customer_context(db, customer_id)
# Generate response
if intent_analysis["can_auto_resolve"]:
response = await generate_automated_response(
inquiry, intent_analysis, customer_context
)
# Log interaction
await log_customer_interaction(
db, customer_id, inquiry, response, "automated"
)
return {
"response": response["message"],
"resolution_type": "automated",
"confidence": response["confidence"],
"suggested_actions": response.get("actions", [])
}
else:
# Escalate to human agent
ticket = await create_support_ticket(
db, customer_id, inquiry, intent_analysis, urgency
)
return {
"response": "Your inquiry has been forwarded to our support team.",
"resolution_type": "escalated",
"ticket_id": ticket["id"],
"estimated_response_time": ticket["eta"]
}
# Initialize MCP server for e-commerce AI
mcp = FastApiMCP(
app,
name="E-commerce AI Platform",
description="Comprehensive AI tools for e-commerce optimization",
include_tags=["ai", "ecommerce", "optimization"]
)
mcp.mount()
# Helper functions (implementation details)
async def generate_recommendations(history, browsing, category, limit):
"""AI recommendation algorithm"""
# Implementation would include ML model inference
pass
async def calculate_optimal_inventory(sales, inventory, factors, horizon):
"""Inventory optimization algorithm"""
# Implementation would include demand forecasting
pass
async def calculate_optimal_price(product, demand, competitors):
"""Dynamic pricing algorithm"""
# Implementation would include price elasticity analysis
pass
async def analyze_inquiry_intent(inquiry):
"""NLP intent analysis"""
# Implementation would include NLP model inference
pass
Conclusion: FastAPI + MCP Best Practices
Key Takeaways
- Start Simple: Begin with basic MCP integration and gradually add complexity
- Use Type Safety: Leverage Pydantic models for reliable tool contracts
- Plan for Scale: Design with async patterns and proper caching
- Security First: Implement authentication and authorization from the start
- Monitor Everything: Add comprehensive logging and metrics
- Test Thoroughly: Write tests for both FastAPI endpoints and MCP integration
Production Checklist
- [ ] Security: Authentication, authorization, input validation
- [ ] Performance: Async operations, caching, connection pooling
- [ ] Monitoring: Metrics, logging, health checks, alerting
- [ ] Testing: Unit tests, integration tests, load tests
- [ ] Documentation: API docs, MCP tool descriptions, deployment guides
- [ ] Deployment: Containerization, orchestration, CI/CD pipelines
- [ ] Reliability: Error handling, retries, circuit breakers
Future Considerations
As MCP continues to evolve in 2025, keep an eye on:
- New Transport Protocols: WebSocket and gRPC support
- Enhanced Security Features: OAuth2 integration, fine-grained permissions
- Performance Improvements: Better caching, connection pooling
- Tooling Ecosystem: IDE integrations, testing frameworks, monitoring tools
FastAPI and MCP together provide a powerful foundation for building next-generation AI applications. By following the patterns and practices outlined in this guide, you'll be well-equipped to create production-grade AI agents that can scale with your business needs.
For more information about MCP fundamentals, check out our Complete MCP Overview Guide.