Skip to content

FastAPI + MCP Integration Guide: Building Production AI Agents in 2025

FastAPI has emerged as the perfect framework for MCP integration in 2025, combining Python's AI ecosystem with high-performance async capabilities. This comprehensive guide shows you how to build production-grade AI agents using FastAPI and Model Context Protocol.

Why FastAPI + MCP is a Winning Combination

FastAPI Advantages for MCP

  • Native Async Support: Perfect for I/O-heavy AI operations
  • Automatic API Documentation: OpenAPI specs that integrate with MCP discovery
  • Type Safety: Pydantic models ensure reliable tool contracts
  • High Performance: Built on Starlette and Uvicorn for production speed
  • Easy Testing: Built-in test client for MCP server validation

MCP Benefits for FastAPI

  • Universal Tool Access: Connect to any MCP-compatible service
  • Standardized Integration: No more custom API clients
  • Real-time Context: Live data access for AI decision-making
  • Scalable Architecture: Handle thousands of tools with minimal overhead

Architecture Overview: FastAPI MCP Integration

graph TB
    A[AI Agent] --> B[MCP Client]
    B --> C[FastAPI MCP Server]
    C --> D[FastAPI Application]
    D --> E[Database]
    D --> F[External APIs]
    D --> G[File System]
    D --> H[Business Logic]

    subgraph "FastAPI MCP Stack"
        C
        D
        I[Pydantic Models]
        J[Async Handlers]
        K[Authentication]
        L[Monitoring]
    end

Quick Start: Your First FastAPI MCP Server

Installation

# Install FastAPI MCP dependencies
pip install fastapi-mcp uvicorn httpx pydantic python-dotenv

# Or use uv (recommended for 2025)
uv add fastapi-mcp uvicorn httpx pydantic python-dotenv

Basic MCP Server Setup

# server.py
from fastapi import FastAPI
from fastapi_mcp import FastApiMCP
from pydantic import BaseModel
from typing import List, Optional
import httpx
import os

# Create FastAPI app
app = FastAPI(
    title="My AI Agent API",
    description="FastAPI server with MCP integration",
    version="1.0.0"
)

# Pydantic models for type safety
class WeatherRequest(BaseModel):
    city: str
    units: str = "metric"

class WeatherResponse(BaseModel):
    temperature: float
    description: str
    humidity: int

# Standard FastAPI endpoints
@app.get("/health")
async def health_check():
    return {"status": "healthy", "service": "FastAPI MCP Server"}

@app.post("/weather", response_model=WeatherResponse)
async def get_weather(request: WeatherRequest):
    """Get current weather for a city"""
    # Your weather API logic here
    api_key = os.getenv("WEATHER_API_KEY")
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.openweathermap.org/data/2.5/weather",
            params={
                "q": request.city,
                "appid": api_key,
                "units": request.units
            }
        )
        data = response.json()

    return WeatherResponse(
        temperature=data["main"]["temp"],
        description=data["weather"][0]["description"],
        humidity=data["main"]["humidity"]
    )

# Initialize MCP integration
mcp = FastApiMCP(
    app,
    name="Weather Agent API",
    description="AI-powered weather information service",
    base_url="http://localhost:8000"
)

# Mount MCP server to FastAPI app
mcp.mount()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Testing Your MCP Server

# Run the server
uvicorn server:app --reload

# Test standard FastAPI endpoint
curl -X POST "http://localhost:8000/weather" \
     -H "Content-Type: application/json" \
     -d '{"city": "San Francisco"}'

# Access MCP manifest
curl "http://localhost:8000/mcp/manifest"

# MCP tools are now available at http://localhost:8000/mcp

Advanced Integration Patterns

1. Database Integration with SQLAlchemy

# models.py
from sqlalchemy import Column, Integer, String, DateTime, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

Base = declarative_base()

class UserActivity(Base):
    __tablename__ = "user_activities"

    id = Column(Integer, primary_key=True)
    user_id = Column(String, nullable=False)
    activity = Column(String, nullable=False)
    timestamp = Column(DateTime, default=datetime.utcnow)

# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()
# enhanced_server.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi_mcp import FastApiMCP
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import get_db
from models import UserActivity
from typing import List

app = FastAPI()

@app.post("/user-activity", operation_id="log_user_activity")
async def log_user_activity(
    user_id: str,
    activity: str,
    db: AsyncSession = Depends(get_db)
):
    """Log user activity for AI analysis"""
    new_activity = UserActivity(user_id=user_id, activity=activity)
    db.add(new_activity)
    await db.commit()
    await db.refresh(new_activity)

    return {
        "id": new_activity.id,
        "user_id": user_id,
        "activity": activity,
        "timestamp": new_activity.timestamp
    }

@app.get("/user-activities/{user_id}", operation_id="get_user_activities")
async def get_user_activities(
    user_id: str,
    limit: int = 10,
    db: AsyncSession = Depends(get_db)
) -> List[dict]:
    """Retrieve user activities for AI context"""
    query = select(UserActivity).where(
        UserActivity.user_id == user_id
    ).limit(limit).order_by(UserActivity.timestamp.desc())

    result = await db.execute(query)
    activities = result.scalars().all()

    return [
        {
            "id": activity.id,
            "activity": activity.activity,
            "timestamp": activity.timestamp.isoformat()
        }
        for activity in activities
    ]

# MCP Integration with explicit operation IDs
mcp = FastApiMCP(
    app,
    name="User Activity Tracker",
    description="Track and analyze user activities for AI insights",
    include_operations=["log_user_activity", "get_user_activities"]
)
mcp.mount()

2. Authentication and Security

# auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
import os

security = HTTPBearer()

SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid authentication credentials"
            )
        return user_id
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials"
        )
# secure_server.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi_mcp import FastApiMCP
from auth import get_current_user
from typing import Dict, Any

app = FastAPI()

@app.post("/secure-operation", operation_id="secure_ai_operation")
async def secure_ai_operation(
    data: Dict[str, Any],
    current_user: str = Depends(get_current_user)
):
    """Secure operation that requires authentication"""
    # Only authenticated users can access this MCP tool
    return {
        "message": f"Secure operation completed for user {current_user}",
        "data": data,
        "timestamp": datetime.utcnow().isoformat()
    }

# MCP with authentication dependency
mcp = FastApiMCP(
    app,
    name="Secure AI Operations",
    description="Authenticated MCP tools for enterprise AI",
    # FastAPI-MCP will respect your dependency injection
)
mcp.mount()

3. Background Tasks and Async Processing

# async_server.py
from fastapi import FastAPI, BackgroundTasks
from fastapi_mcp import FastApiMCP
from celery import Celery
import asyncio
from typing import Dict, List

app = FastAPI()

# Celery for heavy background processing
celery_app = Celery(
    "mcp_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@celery_app.task
def process_large_dataset(data: List[Dict]):
    """Heavy processing task for AI analysis"""
    # Simulate data processing
    import time
    time.sleep(10)  # Simulate heavy computation
    return {"processed_items": len(data), "status": "completed"}

@app.post("/analyze-dataset", operation_id="analyze_large_dataset")
async def analyze_dataset(
    dataset: List[Dict],
    background_tasks: BackgroundTasks
):
    """Trigger analysis of large dataset for AI insights"""
    # Start background processing
    task = process_large_dataset.delay(dataset)

    # Add monitoring task
    background_tasks.add_task(monitor_task_progress, task.id)

    return {
        "task_id": task.id,
        "status": "processing",
        "message": "Dataset analysis started"
    }

async def monitor_task_progress(task_id: str):
    """Monitor background task progress"""
    # Implementation for task monitoring
    pass

@app.get("/task-status/{task_id}", operation_id="check_task_status")
async def check_task_status(task_id: str):
    """Check status of background processing task"""
    task = process_large_dataset.AsyncResult(task_id)

    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.ready() else None
    }

mcp = FastApiMCP(
    app,
    name="Async Data Processor",
    description="Handle large-scale data processing for AI analysis"
)
mcp.mount()

Production Deployment Strategies

1. Docker Containerization

# Dockerfile
FROM python:3.12-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose MCP port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run with Gunicorn for production
CMD ["gunicorn", "server:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]

2. Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-mcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fastapi-mcp-server
  template:
    metadata:
      labels:
        app: fastapi-mcp-server
    spec:
      containers:
      - name: fastapi-mcp
        image: your-registry/fastapi-mcp:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: fastapi-mcp-service
spec:
  selector:
    app: fastapi-mcp-server
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: fastapi-mcp-ingress
  annotations:
    nginx.ingress.kubernetes.io/proxy-buffering: "off"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
spec:
  rules:
  - host: mcp-api.yourcompany.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: fastapi-mcp-service
            port:
              number: 80

3. Environment Configuration

# config.py
from pydantic import BaseSettings
from typing import Optional

class Settings(BaseSettings):
    # Database
    database_url: str = "postgresql+asyncpg://localhost/mcpdb"

    # Authentication
    secret_key: str
    jwt_algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # MCP Configuration
    mcp_server_name: str = "Production MCP Server"
    mcp_base_url: str = "https://mcp-api.yourcompany.com"

    # External APIs
    weather_api_key: Optional[str] = None
    openai_api_key: Optional[str] = None

    # Monitoring
    sentry_dsn: Optional[str] = None
    log_level: str = "INFO"

    # Performance
    worker_processes: int = 4
    max_connections: int = 1000

    class Config:
        env_file = ".env"

settings = Settings()
# production_server.py
from fastapi import FastAPI
from fastapi_mcp import FastApiMCP
from config import settings
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
import logging

# Configure logging
logging.basicConfig(level=getattr(logging, settings.log_level))
logger = logging.getLogger(__name__)

# Initialize Sentry for error tracking
if settings.sentry_dsn:
    sentry_sdk.init(
        dsn=settings.sentry_dsn,
        integrations=[FastApiIntegration()],
        traces_sample_rate=0.1,
    )

# Create production FastAPI app
app = FastAPI(
    title=settings.mcp_server_name,
    description="Production-grade FastAPI MCP server",
    docs_url="/docs" if settings.log_level == "DEBUG" else None,
    redoc_url="/redoc" if settings.log_level == "DEBUG" else None,
)

# Production middleware
@app.middleware("http")
async def log_requests(request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time

    logger.info(
        f"{request.method} {request.url.path} "
        f"completed in {process_time:.3f}s "
        f"with status {response.status_code}"
    )
    return response

# MCP configuration for production
mcp = FastApiMCP(
    app,
    name=settings.mcp_server_name,
    description="Enterprise AI operations platform",
    base_url=settings.mcp_base_url,
    # Security configurations
    include_tags=["ai-tools", "analytics"],
    exclude_tags=["internal", "debug"]
)

mcp.mount()

Advanced MCP Features with FastAPI

1. Custom Tool Filtering and Discovery

# custom_filtering.py
from fastapi import FastAPI, Request
from fastapi_mcp import FastApiMCP
from typing import List, Dict, Any

app = FastAPI()

class CustomMCPServer(FastApiMCP):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    async def filter_tools_by_user(self, request: Request) -> List[str]:
        """Custom logic to filter tools based on user permissions"""
        # Extract user from request (JWT, session, etc.)
        user_role = await self.extract_user_role(request)

        if user_role == "admin":
            return ["all"]  # Admin sees all tools
        elif user_role == "analyst":
            return ["read_data", "analyze_trends", "generate_report"]
        else:
            return ["read_public_data"]

    async def extract_user_role(self, request: Request) -> str:
        # Your authentication logic here
        return "analyst"  # Example

# Usage
mcp = CustomMCPServer(
    app,
    name="Role-Based MCP Server",
    description="MCP server with role-based tool access"
)

2. Real-time Data Streaming

# streaming_server.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi_mcp import FastApiMCP
import asyncio
import json

app = FastAPI()

@app.get("/stream-data/{data_source}", operation_id="stream_real_time_data")
async def stream_real_time_data(data_source: str):
    """Stream real-time data for AI analysis"""

    async def generate_data_stream():
        """Generate streaming data"""
        counter = 0
        while counter < 100:  # Limit for demo
            data = {
                "timestamp": datetime.utcnow().isoformat(),
                "source": data_source,
                "value": random.random() * 100,
                "counter": counter
            }
            yield f"data: {json.dumps(data)}\n\n"
            await asyncio.sleep(1)
            counter += 1

    return StreamingResponse(
        generate_data_stream(),
        media_type="text/plain"
    )

@app.post("/analyze-stream", operation_id="analyze_data_stream")
async def analyze_data_stream(
    stream_config: Dict[str, Any],
    analysis_type: str = "real_time"
):
    """Configure real-time data analysis"""
    # Set up streaming analysis pipeline
    return {
        "analysis_id": f"stream_{uuid.uuid4().hex[:8]}",
        "status": "active",
        "config": stream_config
    }

mcp = FastApiMCP(app, name="Streaming Data Analyzer")
mcp.mount()

3. Multi-Agent Coordination

# multi_agent_server.py
from fastapi import FastAPI
from fastapi_mcp import FastApiMCP
from typing import Dict, List
import asyncio

app = FastAPI()

class AgentCoordinator:
    def __init__(self):
        self.active_agents = {}
        self.task_queue = asyncio.Queue()

    async def register_agent(self, agent_id: str, capabilities: List[str]):
        self.active_agents[agent_id] = {
            "capabilities": capabilities,
            "status": "ready",
            "current_task": None
        }

    async def delegate_task(self, task: Dict) -> str:
        # Find best agent for task
        suitable_agents = [
            agent_id for agent_id, info in self.active_agents.items()
            if task["required_capability"] in info["capabilities"]
            and info["status"] == "ready"
        ]

        if suitable_agents:
            selected_agent = suitable_agents[0]
            self.active_agents[selected_agent]["status"] = "busy"
            self.active_agents[selected_agent]["current_task"] = task["task_id"]
            return selected_agent
        else:
            await self.task_queue.put(task)
            return "queued"

coordinator = AgentCoordinator()

@app.post("/register-agent", operation_id="register_ai_agent")
async def register_agent(agent_id: str, capabilities: List[str]):
    """Register an AI agent with the coordination system"""
    await coordinator.register_agent(agent_id, capabilities)
    return {"status": "registered", "agent_id": agent_id}

@app.post("/delegate-task", operation_id="delegate_task_to_agent")
async def delegate_task(task: Dict):
    """Delegate a task to the most suitable agent"""
    assigned_agent = await coordinator.delegate_task(task)
    return {
        "task_id": task["task_id"],
        "assigned_agent": assigned_agent,
        "status": "assigned" if assigned_agent != "queued" else "queued"
    }

@app.get("/agent-status", operation_id="get_agent_status")
async def get_agent_status():
    """Get status of all registered agents"""
    return coordinator.active_agents

mcp = FastApiMCP(
    app,
    name="Multi-Agent Coordinator",
    description="Coordinate multiple AI agents for complex tasks"
)
mcp.mount()

Performance Optimization and Monitoring

1. Caching and Performance

# performance_server.py
from fastapi import FastAPI, Depends
from fastapi_mcp import FastApiMCP
from functools import lru_cache
import redis
import pickle
from typing import Any, Optional

app = FastAPI()

# Redis for distributed caching
redis_client = redis.Redis(host='localhost', port=6379, db=0)

class CacheManager:
    def __init__(self, redis_client):
        self.redis = redis_client

    async def get(self, key: str) -> Optional[Any]:
        cached = self.redis.get(key)
        if cached:
            return pickle.loads(cached)
        return None

    async def set(self, key: str, value: Any, expire: int = 3600):
        self.redis.setex(key, expire, pickle.dumps(value))

cache = CacheManager(redis_client)

@app.get("/expensive-computation/{input_data}", operation_id="cached_computation")
async def expensive_computation(input_data: str):
    """Expensive computation with caching for AI operations"""
    cache_key = f"computation:{hash(input_data)}"

    # Check cache first
    cached_result = await cache.get(cache_key)
    if cached_result:
        return {"result": cached_result, "from_cache": True}

    # Simulate expensive computation
    await asyncio.sleep(2)  # Simulate processing time
    result = f"processed_{input_data}"

    # Cache the result
    await cache.set(cache_key, result, expire=1800)  # 30 minutes

    return {"result": result, "from_cache": False}

# Memory caching for frequently accessed data
@lru_cache(maxsize=1000)
def get_static_ai_prompt(prompt_type: str) -> str:
    """Get cached AI prompts"""
    prompts = {
        "analysis": "Analyze the following data...",
        "summary": "Provide a summary of...",
        "recommendation": "Based on the data, recommend..."
    }
    return prompts.get(prompt_type, "Default prompt")

mcp = FastApiMCP(app, name="High-Performance MCP Server")
mcp.mount()

2. Monitoring and Observability

# monitoring_server.py
from fastapi import FastAPI, Request
from fastapi_mcp import FastApiMCP
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
import time
from typing import Dict

app = FastAPI()

# Prometheus metrics
REQUEST_COUNT = Counter(
    'mcp_requests_total',
    'Total MCP requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'mcp_request_duration_seconds',
    'MCP request duration',
    ['method', 'endpoint']
)

TOOL_USAGE = Counter(
    'mcp_tool_usage_total',
    'MCP tool usage count',
    ['tool_name', 'status']
)

@app.middleware("http")
async def monitor_requests(request: Request, call_next):
    start_time = time.time()

    response = await call_next(request)

    duration = time.time() - start_time

    # Record metrics
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)

    return response

@app.get("/metrics")
async def get_metrics():
    """Prometheus metrics endpoint"""
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

@app.get("/health-detailed")
async def detailed_health_check():
    """Detailed health check for monitoring"""
    return {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat(),
        "version": "1.0.0",
        "checks": {
            "database": await check_database_health(),
            "redis": await check_redis_health(),
            "external_apis": await check_external_apis()
        }
    }

async def check_database_health() -> Dict:
    """Check database connectivity"""
    try:
        # Your database health check logic
        return {"status": "healthy", "response_time_ms": 10}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

mcp = FastApiMCP(app, name="Monitored MCP Server")
mcp.mount()

Testing FastAPI MCP Integrations

1. Unit Tests

# test_mcp_server.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch
from server import app, mcp

@pytest.fixture
def client():
    return TestClient(app)

@pytest.fixture
def mcp_client():
    """Test client specifically for MCP endpoints"""
    return TestClient(app, base_url="http://testserver/mcp")

def test_health_endpoint(client):
    """Test basic health endpoint"""
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"

def test_mcp_manifest(mcp_client):
    """Test MCP manifest endpoint"""
    response = mcp_client.get("/manifest")
    assert response.status_code == 200

    manifest = response.json()
    assert "name" in manifest
    assert "endpoints" in manifest

@pytest.mark.asyncio
async def test_weather_tool_with_mock():
    """Test weather tool with mocked external API"""
    with patch('httpx.AsyncClient.get') as mock_get:
        # Mock external API response
        mock_response = AsyncMock()
        mock_response.json.return_value = {
            "main": {"temp": 22.5, "humidity": 65},
            "weather": [{"description": "sunny"}]
        }
        mock_get.return_value = mock_response

        client = TestClient(app)
        response = client.post(
            "/weather",
            json={"city": "San Francisco", "units": "metric"}
        )

        assert response.status_code == 200
        data = response.json()
        assert data["temperature"] == 22.5
        assert data["description"] == "sunny"

def test_mcp_tool_discovery(mcp_client):
    """Test MCP tool discovery"""
    response = mcp_client.get("/tools")
    assert response.status_code == 200

    tools = response.json()
    assert isinstance(tools, list)
    assert len(tools) > 0

    # Check tool structure
    tool = tools[0]
    assert "name" in tool
    assert "description" in tool
    assert "input_schema" in tool

@pytest.mark.asyncio
async def test_authenticated_mcp_tool():
    """Test MCP tool that requires authentication"""
    from auth import create_access_token

    # Create test token
    token = create_access_token({"sub": "test_user"})

    client = TestClient(app)
    response = client.post(
        "/secure-operation",
        json={"test": "data"},
        headers={"Authorization": f"Bearer {token}"}
    )

    assert response.status_code == 200
    assert "test_user" in response.json()["message"]

2. Integration Tests

# test_mcp_integration.py
import pytest
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

@pytest.mark.asyncio
async def test_full_mcp_integration():
    """Test complete MCP client-server interaction"""
    server_params = StdioServerParameters(
        command="uvicorn",
        args=["server:app", "--port", "8001"]
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # Test tool discovery
            tools = await session.list_tools()
            assert len(tools.tools) > 0

            # Test tool execution
            result = await session.call_tool(
                "get_weather",
                {"city": "London", "units": "metric"}
            )

            assert result.content
            assert "temperature" in str(result.content[0])

@pytest.mark.asyncio
async def test_mcp_performance():
    """Test MCP server performance under load"""
    import time

    start_time = time.time()
    tasks = []

    for i in range(100):
        task = asyncio.create_task(
            make_mcp_request(f"test_city_{i}")
        )
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    end_time = time.time()

    # Assert all requests completed successfully
    assert len(results) == 100
    assert all(result["status"] == "success" for result in results)

    # Assert reasonable performance (adjust threshold as needed)
    total_time = end_time - start_time
    assert total_time < 30  # All 100 requests in under 30 seconds

async def make_mcp_request(city: str):
    """Helper function for load testing"""
    # Implementation for making MCP requests
    pass

Best Practices and Common Patterns

1. Error Handling

# error_handling.py
from fastapi import FastAPI, HTTPException, Request
from fastapi_mcp import FastApiMCP
from fastapi.responses import JSONResponse
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

class MCPError(Exception):
    """Custom MCP-specific error"""
    def __init__(self, message: str, error_code: str = "MCP_ERROR"):
        self.message = message
        self.error_code = error_code
        super().__init__(self.message)

@app.exception_handler(MCPError)
async def mcp_error_handler(request: Request, exc: MCPError):
    """Custom error handler for MCP errors"""
    logger.error(f"MCP Error: {exc.message} - Code: {exc.error_code}")

    return JSONResponse(
        status_code=400,
        content={
            "error": exc.error_code,
            "message": exc.message,
            "type": "mcp_error"
        }
    )

@app.post("/safe-operation", operation_id="safe_ai_operation")
async def safe_ai_operation(data: dict):
    """AI operation with comprehensive error handling"""
    try:
        # Validate input
        if not data.get("required_field"):
            raise MCPError(
                "Missing required field",
                error_code="MISSING_FIELD"
            )

        # Perform operation
        result = await perform_ai_operation(data)

        return {"success": True, "result": result}

    except MCPError:
        raise  # Re-raise MCP errors
    except ValueError as e:
        raise MCPError(f"Invalid input: {str(e)}", "INVALID_INPUT")
    except Exception as e:
        logger.exception("Unexpected error in AI operation")
        raise HTTPException(
            status_code=500,
            detail="Internal server error"
        )

async def perform_ai_operation(data: dict):
    """Simulate AI operation that might fail"""
    if data.get("simulate_error"):
        raise ValueError("Simulated error")
    return "operation_completed"

mcp = FastApiMCP(app, name="Robust MCP Server")
mcp.mount()

2. Configuration Management

# config_management.py
from pydantic import BaseSettings, validator
from typing import Dict, List, Optional
import os

class MCPSettings(BaseSettings):
    """MCP-specific configuration"""
    server_name: str = "FastAPI MCP Server"
    server_description: str = "Production MCP server"
    base_url: str = "http://localhost:8000"

    # Tool filtering
    include_operations: Optional[List[str]] = None
    exclude_operations: Optional[List[str]] = None
    include_tags: Optional[List[str]] = None
    exclude_tags: Optional[List[str]] = None

    # Features
    describe_all_responses: bool = True
    describe_full_response_schema: bool = False

    @validator('base_url')
    def validate_base_url(cls, v):
        if not v.startswith(('http://', 'https://')):
            raise ValueError('base_url must start with http:// or https://')
        return v.rstrip('/')

class AppSettings(BaseSettings):
    """Application configuration"""
    # Environment
    environment: str = "development"
    debug: bool = False

    # MCP Configuration
    mcp: MCPSettings = MCPSettings()

    # Database
    database_url: str = "sqlite:///./app.db"

    # Security
    secret_key: str
    jwt_algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # External APIs
    external_apis: Dict[str, str] = {}

    # Monitoring
    enable_metrics: bool = True
    metrics_path: str = "/metrics"

    class Config:
        env_file = ".env"
        env_nested_delimiter = "__"
        case_sensitive = False

# Global settings instance
settings = AppSettings()

# Usage in your FastAPI app
def create_mcp_server(app: FastAPI) -> FastApiMCP:
    """Factory function to create MCP server with configuration"""
    return FastApiMCP(
        app,
        name=settings.mcp.server_name,
        description=settings.mcp.server_description,
        base_url=settings.mcp.base_url,
        include_operations=settings.mcp.include_operations,
        exclude_operations=settings.mcp.exclude_operations,
        include_tags=settings.mcp.include_tags,
        exclude_tags=settings.mcp.exclude_tags,
        describe_all_responses=settings.mcp.describe_all_responses,
        describe_full_response_schema=settings.mcp.describe_full_response_schema
    )

Real-World Use Case: AI-Powered E-commerce Platform

Let's build a complete e-commerce AI agent using FastAPI and MCP:

# ecommerce_ai_server.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi_mcp import FastApiMCP
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional, Dict
import asyncio

app = FastAPI(title="E-commerce AI Platform")

# Product recommendation system
@app.post("/recommendations", operation_id="get_product_recommendations")
async def get_product_recommendations(
    user_id: str,
    category: Optional[str] = None,
    limit: int = 10,
    db: AsyncSession = Depends(get_db)
):
    """Get AI-powered product recommendations for a user"""
    # Fetch user history
    user_history = await get_user_purchase_history(db, user_id)

    # Get browsing behavior
    browsing_data = await get_user_browsing_data(db, user_id)

    # AI recommendation algorithm
    recommendations = await generate_recommendations(
        user_history, browsing_data, category, limit
    )

    return {
        "user_id": user_id,
        "recommendations": recommendations,
        "generated_at": datetime.utcnow().isoformat()
    }

@app.post("/inventory-optimization", operation_id="optimize_inventory")
async def optimize_inventory(
    store_id: str,
    prediction_horizon_days: int = 30,
    db: AsyncSession = Depends(get_db)
):
    """AI-powered inventory optimization"""
    # Get sales history
    sales_data = await get_sales_history(db, store_id, days=90)

    # Get current inventory
    current_inventory = await get_current_inventory(db, store_id)

    # Get external factors (seasonality, trends, etc.)
    market_factors = await get_market_factors(store_id)

    # AI optimization
    optimization_plan = await calculate_optimal_inventory(
        sales_data, current_inventory, market_factors, prediction_horizon_days
    )

    return {
        "store_id": store_id,
        "optimization_plan": optimization_plan,
        "estimated_cost_savings": optimization_plan["savings"],
        "confidence_score": optimization_plan["confidence"]
    }

@app.post("/dynamic-pricing", operation_id="calculate_dynamic_pricing")
async def calculate_dynamic_pricing(
    product_ids: List[str],
    competitor_data: Optional[Dict] = None,
    db: AsyncSession = Depends(get_db)
):
    """AI-driven dynamic pricing optimization"""
    pricing_results = []

    for product_id in product_ids:
        # Get product data
        product = await get_product_details(db, product_id)

        # Get demand patterns
        demand_data = await get_demand_patterns(db, product_id)

        # Analyze competitor pricing
        competitor_prices = competitor_data or await scrape_competitor_prices(product_id)

        # AI pricing algorithm
        optimal_price = await calculate_optimal_price(
            product, demand_data, competitor_prices
        )

        pricing_results.append({
            "product_id": product_id,
            "current_price": product["price"],
            "recommended_price": optimal_price["price"],
            "expected_revenue_change": optimal_price["revenue_impact"],
            "confidence": optimal_price["confidence"]
        })

    return {
        "pricing_results": pricing_results,
        "total_expected_revenue_increase": sum(
            p["expected_revenue_change"] for p in pricing_results
        )
    }

@app.post("/customer-service-automation", operation_id="handle_customer_inquiry")
async def handle_customer_inquiry(
    inquiry: str,
    customer_id: Optional[str] = None,
    urgency: str = "normal",
    db: AsyncSession = Depends(get_db)
):
    """AI-powered customer service automation"""
    # Analyze inquiry intent
    intent_analysis = await analyze_inquiry_intent(inquiry)

    # Get customer context if available
    customer_context = None
    if customer_id:
        customer_context = await get_customer_context(db, customer_id)

    # Generate response
    if intent_analysis["can_auto_resolve"]:
        response = await generate_automated_response(
            inquiry, intent_analysis, customer_context
        )

        # Log interaction
        await log_customer_interaction(
            db, customer_id, inquiry, response, "automated"
        )

        return {
            "response": response["message"],
            "resolution_type": "automated",
            "confidence": response["confidence"],
            "suggested_actions": response.get("actions", [])
        }
    else:
        # Escalate to human agent
        ticket = await create_support_ticket(
            db, customer_id, inquiry, intent_analysis, urgency
        )

        return {
            "response": "Your inquiry has been forwarded to our support team.",
            "resolution_type": "escalated",
            "ticket_id": ticket["id"],
            "estimated_response_time": ticket["eta"]
        }

# Initialize MCP server for e-commerce AI
mcp = FastApiMCP(
    app,
    name="E-commerce AI Platform",
    description="Comprehensive AI tools for e-commerce optimization",
    include_tags=["ai", "ecommerce", "optimization"]
)
mcp.mount()

# Helper functions (implementation details)
async def generate_recommendations(history, browsing, category, limit):
    """AI recommendation algorithm"""
    # Implementation would include ML model inference
    pass

async def calculate_optimal_inventory(sales, inventory, factors, horizon):
    """Inventory optimization algorithm"""
    # Implementation would include demand forecasting
    pass

async def calculate_optimal_price(product, demand, competitors):
    """Dynamic pricing algorithm"""
    # Implementation would include price elasticity analysis
    pass

async def analyze_inquiry_intent(inquiry):
    """NLP intent analysis"""
    # Implementation would include NLP model inference
    pass

Conclusion: FastAPI + MCP Best Practices

Key Takeaways

  1. Start Simple: Begin with basic MCP integration and gradually add complexity
  2. Use Type Safety: Leverage Pydantic models for reliable tool contracts
  3. Plan for Scale: Design with async patterns and proper caching
  4. Security First: Implement authentication and authorization from the start
  5. Monitor Everything: Add comprehensive logging and metrics
  6. Test Thoroughly: Write tests for both FastAPI endpoints and MCP integration

Production Checklist

  • [ ] Security: Authentication, authorization, input validation
  • [ ] Performance: Async operations, caching, connection pooling
  • [ ] Monitoring: Metrics, logging, health checks, alerting
  • [ ] Testing: Unit tests, integration tests, load tests
  • [ ] Documentation: API docs, MCP tool descriptions, deployment guides
  • [ ] Deployment: Containerization, orchestration, CI/CD pipelines
  • [ ] Reliability: Error handling, retries, circuit breakers

Future Considerations

As MCP continues to evolve in 2025, keep an eye on:

  • New Transport Protocols: WebSocket and gRPC support
  • Enhanced Security Features: OAuth2 integration, fine-grained permissions
  • Performance Improvements: Better caching, connection pooling
  • Tooling Ecosystem: IDE integrations, testing frameworks, monitoring tools

FastAPI and MCP together provide a powerful foundation for building next-generation AI applications. By following the patterns and practices outlined in this guide, you'll be well-equipped to create production-grade AI agents that can scale with your business needs.

For more information about MCP fundamentals, check out our Complete MCP Overview Guide.