FastAPI Dependency Injection Mastery 2025 - Build Scalable Applications
Master FastAPI's dependency injection system to build maintainable, testable, and scalable applications. This tutorial builds on our PostgreSQL Task Management app, showing how to implement clean architecture patterns using dependency injection.
What You'll Learn
By completing this tutorial, you'll master:
- Dependency injection fundamentals and advanced patterns
- Service layer architecture for business logic separation
- Authentication dependencies for secure endpoints
- Configuration management with environment-based dependencies
- Testing strategies with dependency overrides
- Performance optimization with dependency caching
- Clean architecture principles for maintainable code
Prerequisites
What you need before starting:
- Completed the PostgreSQL Tutorial - We'll refactor that app
- Understanding of async/await in Python
- Basic knowledge of design patterns (helpful but not required)
- Familiarity with testing concepts (unit tests, mocks)
Time to complete: 25 minutes
What We're Building
You'll refactor the Task Management API from the PostgreSQL tutorial with:
- Service layer pattern - Business logic separated from API layer
- Authentication dependencies - JWT-based user authentication
- Configuration dependencies - Environment-based settings
- Repository pattern - Database access abstraction
- Testing framework - Dependency overrides for unit tests
- Caching layer - Performance optimization with Redis
- Error handling - Centralized exception management
Before vs After: - Before: Direct database access in endpoints, mixed concerns - After: Clean separation with service layers, dependency injection, and testable architecture
Step 1: Understanding Dependency Injection Patterns
What is Dependency Injection?
Dependency Injection is a design pattern where dependencies are provided to a function or class from external sources rather than being created internally. In FastAPI, this enables:
- Separation of concerns - API logic separate from business logic
- Testability - Easy mocking and testing
- Reusability - Share dependencies across endpoints
- Performance - Efficient resource management
FastAPI DI System Overview
Create backend/app/examples/di_examples.py
:
from fastapi import FastAPI, Depends, HTTPException
from typing import Annotated
import asyncio
app = FastAPI()
# 1. Simple Dependency
def get_current_timestamp():
from datetime import datetime
return datetime.now().isoformat()
# 2. Async Dependency
async def get_user_agent(request: Request):
return request.headers.get("user-agent", "unknown")
# 3. Class-based Dependency
class DatabaseService:
def __init__(self):
self.connection_pool = "Mock DB Pool"
async def get_connection(self):
await asyncio.sleep(0.1) # Simulate connection time
return f"Connection from {self.connection_pool}"
# Dependency instances
db_service = DatabaseService()
# 4. Sub-dependencies (dependencies that depend on other dependencies)
async def get_db_connection(service: DatabaseService = Depends(lambda: db_service)):
return await service.get_connection()
async def get_user_context(
timestamp: str = Depends(get_current_timestamp),
user_agent: str = Depends(get_user_agent),
db_conn: str = Depends(get_db_connection)
):
return {
"timestamp": timestamp,
"user_agent": user_agent,
"db_connection": db_conn
}
# Using dependencies in endpoints
@app.get("/simple")
async def simple_endpoint(timestamp: str = Depends(get_current_timestamp)):
return {"current_time": timestamp}
@app.get("/complex")
async def complex_endpoint(
context: dict = Depends(get_user_context)
):
return {"context": context}
# Type annotations for better IDE support
@app.get("/typed")
async def typed_endpoint(
timestamp: Annotated[str, Depends(get_current_timestamp)]
):
return {"typed_timestamp": timestamp}
Dependency Scopes and Lifecycle
Dependencies in FastAPI have different scopes:
- Per request - Created fresh for each request (default)
- Per application - Created once at startup (singleton pattern)
- Per test - Overridden during testing
Step 2: Service Layer Architecture
Create Service Layer Structure
Refactor our Task Management app with service layers:
Create backend/app/services/__init__.py
:
# Service layer exports for clean imports
from .user_service import UserService
from .task_service import TaskService
from .auth_service import AuthService
__all__ = ["UserService", "TaskService", "AuthService"]
Create backend/app/services/base_service.py
:
from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession
from typing import TypeVar, Generic, Optional, List
T = TypeVar('T')
class BaseService(ABC, Generic[T]):
"""Base service class with common patterns"""
def __init__(self, session: AsyncSession):
self.session = session
@abstractmethod
async def create(self, data: dict) -> T:
pass
@abstractmethod
async def get_by_id(self, id: int) -> Optional[T]:
pass
@abstractmethod
async def update(self, id: int, data: dict) -> Optional[T]:
pass
@abstractmethod
async def delete(self, id: int) -> bool:
pass
User Service Implementation
Create backend/app/services/user_service.py
:
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from passlib.context import CryptContext
from ..models import User
from ..schemas import UserCreate, UserUpdate
from ..crud import UserRepository
from .base_service import BaseService
class UserService(BaseService[User]):
"""User business logic service"""
def __init__(self, session: AsyncSession):
super().__init__(session)
self.user_repo = UserRepository(session)
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
async def create(self, user_data: UserCreate) -> User:
"""Create new user with validation"""
# Business logic: Check if user already exists
if await self.user_repo.get_by_email(user_data.email):
raise ValueError("Email already registered")
if await self.user_repo.get_by_username(user_data.username):
raise ValueError("Username already taken")
# Business logic: Validate username format
if len(user_data.username) < 3:
raise ValueError("Username must be at least 3 characters")
return await self.user_repo.create(user_data)
async def get_by_id(self, user_id: int) -> Optional[User]:
"""Get user by ID with business logic checks"""
user = await self.user_repo.get_by_id(user_id)
if user and not user.is_active:
return None # Don't return inactive users
return user
async def get_by_email(self, email: str) -> Optional[User]:
"""Get active user by email"""
user = await self.user_repo.get_by_email(email)
return user if user and user.is_active else None
async def update(self, user_id: int, user_data: UserUpdate) -> Optional[User]:
"""Update user with validation"""
user = await self.get_by_id(user_id)
if not user:
return None
# Business logic: Validate email uniqueness if changing
if user_data.email and user_data.email != user.email:
existing = await self.user_repo.get_by_email(user_data.email)
if existing:
raise ValueError("Email already in use")
# Business logic: Validate username uniqueness if changing
if user_data.username and user_data.username != user.username:
existing = await self.user_repo.get_by_username(user_data.username)
if existing:
raise ValueError("Username already taken")
return await self.user_repo.update(user_id, user_data)
async def delete(self, user_id: int) -> bool:
"""Soft delete user (deactivate)"""
user = await self.get_by_id(user_id)
if not user:
return False
# Business logic: Soft delete by deactivating
user_data = UserUpdate(is_active=False)
await self.user_repo.update(user_id, user_data)
return True
async def get_user_stats(self, user_id: int) -> dict:
"""Get user statistics with business logic"""
user = await self.get_by_id(user_id)
if not user:
return {}
# Business logic could include complex calculations
return {
"user_id": user.id,
"username": user.username,
"account_age_days": (user.created_at - user.created_at).days,
"is_premium": len(user.username) > 5, # Example business rule
}
Task Service Implementation
Create backend/app/services/task_service.py
:
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timedelta
from ..models import Task, User
from ..schemas import TaskCreate, TaskUpdate
from ..crud import TaskRepository
from .base_service import BaseService
from .user_service import UserService
class TaskService(BaseService[Task]):
"""Task business logic service"""
def __init__(self, session: AsyncSession):
super().__init__(session)
self.task_repo = TaskRepository(session)
self.user_service = UserService(session)
async def create(self, task_data: TaskCreate, owner_id: int) -> Task:
"""Create task with business validation"""
# Business logic: Verify user exists and is active
user = await self.user_service.get_by_id(owner_id)
if not user:
raise ValueError("User not found or inactive")
# Business logic: Validate task limits
user_tasks = await self.task_repo.get_user_tasks(owner_id, limit=1000)
if len(user_tasks) >= 100: # Business rule: max 100 tasks per user
raise ValueError("Task limit reached. Maximum 100 tasks per user.")
# Business logic: Set due date if high priority
if task_data.priority == "high" and not task_data.due_date:
task_data.due_date = datetime.now() + timedelta(days=3)
return await self.task_repo.create(task_data, owner_id)
async def get_by_id(self, task_id: int) -> Optional[Task]:
"""Get task by ID"""
return await self.task_repo.get_by_id(task_id)
async def get_user_tasks_with_business_logic(
self,
owner_id: int,
skip: int = 0,
limit: int = 100,
completed: Optional[bool] = None,
priority: Optional[str] = None,
search: Optional[str] = None,
include_overdue: bool = True
) -> List[Task]:
"""Get user tasks with business logic filtering"""
# Business logic: Verify user access
user = await self.user_service.get_by_id(owner_id)
if not user:
return []
tasks = await self.task_repo.get_user_tasks(
owner_id, skip, limit, completed, priority, search
)
# Business logic: Filter overdue tasks if requested
if not include_overdue:
now = datetime.now()
tasks = [
task for task in tasks
if not task.due_date or task.due_date > now
]
return tasks
async def update(self, task_id: int, task_data: TaskUpdate, user_id: int) -> Optional[Task]:
"""Update task with ownership validation"""
task = await self.get_by_id(task_id)
if not task:
return None
# Business logic: Verify ownership
if task.owner_id != user_id:
raise ValueError("Not authorized to update this task")
# Business logic: Auto-complete subtasks
if task_data.completed and task_data.completed != task.completed:
await self._mark_subtasks_completed(task_id)
return await self.task_repo.update(task_id, task_data)
async def delete(self, task_id: int, user_id: int) -> bool:
"""Delete task with ownership validation"""
task = await self.get_by_id(task_id)
if not task:
return False
# Business logic: Verify ownership
if task.owner_id != user_id:
raise ValueError("Not authorized to delete this task")
return await self.task_repo.delete(task_id)
async def get_task_analytics(self, owner_id: int) -> dict:
"""Get advanced task analytics"""
user = await self.user_service.get_by_id(owner_id)
if not user:
return {}
stats = await self.task_repo.get_task_stats(owner_id)
tasks = await self.task_repo.get_user_tasks(owner_id, limit=1000)
# Business logic: Calculate advanced metrics
now = datetime.now()
overdue_tasks = [
task for task in tasks
if task.due_date and task.due_date < now and not task.completed
]
high_priority_tasks = [
task for task in tasks if task.priority == "high"
]
return {
**stats,
"overdue_count": len(overdue_tasks),
"high_priority_count": len(high_priority_tasks),
"productivity_score": self._calculate_productivity_score(tasks),
"avg_completion_time": self._calculate_avg_completion_time(tasks)
}
async def _mark_subtasks_completed(self, parent_task_id: int):
"""Business logic helper: Mark related subtasks as completed"""
# In a real app, you might have task relationships
pass
def _calculate_productivity_score(self, tasks: List[Task]) -> float:
"""Business logic helper: Calculate productivity score"""
if not tasks:
return 0.0
completed_tasks = [task for task in tasks if task.completed]
return len(completed_tasks) / len(tasks) * 100
def _calculate_avg_completion_time(self, tasks: List[Task]) -> Optional[float]:
"""Business logic helper: Calculate average completion time"""
completed_tasks = [
task for task in tasks
if task.completed and task.created_at and task.updated_at
]
if not completed_tasks:
return None
total_time = sum(
(task.updated_at - task.created_at).total_seconds()
for task in completed_tasks
)
return total_time / len(completed_tasks) / 3600 # Convert to hours
Step 3: Dependency Injection Setup
Create Dependency Providers
Create backend/app/dependencies/__init__.py
:
from .database import get_database_session
from .services import get_user_service, get_task_service
from .auth import get_current_user, get_current_active_user
from .config import get_settings
__all__ = [
"get_database_session",
"get_user_service",
"get_task_service",
"get_current_user",
"get_current_active_user",
"get_settings"
]
Create backend/app/dependencies/database.py
:
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from ..database import AsyncSessionLocal
async def get_database_session() -> AsyncGenerator[AsyncSession, None]:
"""Database session dependency with proper cleanup"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
Create backend/app/dependencies/services.py
:
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from ..services import UserService, TaskService
from .database import get_database_session
def get_user_service(
session: AsyncSession = Depends(get_database_session)
) -> UserService:
"""User service dependency"""
return UserService(session)
def get_task_service(
session: AsyncSession = Depends(get_database_session)
) -> TaskService:
"""Task service dependency"""
return TaskService(session)
Create backend/app/dependencies/config.py
:
import os
from functools import lru_cache
from typing import Optional
from pydantic import BaseSettings
class Settings(BaseSettings):
"""Application settings with environment variable support"""
# Database
database_url: str
database_url_sync: str
# Security
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# Application
app_name: str = "Task Management API"
debug: bool = False
environment: str = "production"
# Redis (for caching)
redis_url: Optional[str] = None
# Email (for notifications)
smtp_server: Optional[str] = None
smtp_port: int = 587
smtp_username: Optional[str] = None
smtp_password: Optional[str] = None
class Config:
env_file = ".env"
case_sensitive = False
@lru_cache()
def get_settings() -> Settings:
"""Settings dependency with caching"""
return Settings()
Step 4: Authentication Dependencies
JWT Authentication Service
Create backend/app/services/auth_service.py
:
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from ..models import User
from ..schemas import UserCreate
from .user_service import UserService
class AuthService:
"""Authentication business logic"""
def __init__(self, session: AsyncSession, secret_key: str, algorithm: str = "HS256"):
self.session = session
self.user_service = UserService(session)
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.secret_key = secret_key
self.algorithm = algorithm
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return self.pwd_context.verify(plain_password, hashed_password)
def get_password_hash(self, password: str) -> str:
"""Generate password hash"""
return self.pwd_context.hash(password)
async def authenticate_user(self, email: str, password: str) -> Optional[User]:
"""Authenticate user with email and password"""
user = await self.user_service.get_by_email(email)
if not user:
return None
# In a real app, users would have password hashes
# For now, we'll create a simple mock verification
if not self.verify_password(password, "mock_hash"):
return None
return user
def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
return encoded_jwt
def verify_token(self, token: str) -> Optional[str]:
"""Verify JWT token and return username"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
username: str = payload.get("sub")
return username
except JWTError:
return None
async def get_user_from_token(self, token: str) -> Optional[User]:
"""Get user from JWT token"""
username = self.verify_token(token)
if username is None:
return None
user = await self.user_service.get_by_email(username)
return user
Authentication Dependencies
Create backend/app/dependencies/auth.py
:
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from ..models import User
from ..services.auth_service import AuthService
from .database import get_database_session
from .config import get_settings, Settings
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_auth_service(
session: AsyncSession = Depends(get_database_session),
settings: Settings = Depends(get_settings)
) -> AuthService:
"""Auth service dependency"""
return AuthService(session, settings.secret_key, settings.algorithm)
async def get_current_user(
token: str = Depends(oauth2_scheme),
auth_service: AuthService = Depends(get_auth_service)
) -> User:
"""Get current authenticated user"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await auth_service.get_user_from_token(token)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Get current active user (not disabled)"""
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# Optional dependency for endpoints that can work with or without auth
async def get_optional_current_user(
token: Optional[str] = Depends(oauth2_scheme),
auth_service: AuthService = Depends(get_auth_service)
) -> Optional[User]:
"""Get current user if token provided, None otherwise"""
if not token:
return None
try:
user = await auth_service.get_user_from_token(token)
return user if user and user.is_active else None
except:
return None
Step 5: Refactored API Endpoints
Update Main Application
Update backend/app/main.py
:
from fastapi import FastAPI, Depends, HTTPException, Query, status
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional
from datetime import timedelta
from .dependencies import (
get_user_service, get_task_service, get_auth_service,
get_current_user, get_current_active_user, get_settings
)
from .services import UserService, TaskService, AuthService
from .schemas import (
User, UserCreate, UserUpdate,
Task, TaskCreate, TaskUpdate,
Token
)
from .models import User as UserModel
from .dependencies.config import Settings
app = FastAPI(
title="Task Management API with Dependency Injection",
description="A clean, scalable task management API built with FastAPI and DI patterns",
version="3.0.0"
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Authentication endpoints
@app.post("/token", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
auth_service: AuthService = Depends(get_auth_service),
settings: Settings = Depends(get_settings)
):
"""Login endpoint that returns JWT token"""
user = await auth_service.authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = auth_service.create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# User endpoints with service layer
@app.post("/users", response_model=User)
async def create_user(
user_data: UserCreate,
user_service: UserService = Depends(get_user_service)
):
"""Create new user using service layer"""
try:
return await user_service.create(user_data)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/users/me", response_model=User)
async def get_current_user_info(
current_user: UserModel = Depends(get_current_active_user)
):
"""Get current user information"""
return current_user
@app.get("/users/me/stats")
async def get_user_stats(
current_user: UserModel = Depends(get_current_active_user),
user_service: UserService = Depends(get_user_service)
):
"""Get current user statistics"""
return await user_service.get_user_stats(current_user.id)
# Task endpoints with service layer and authentication
@app.post("/tasks", response_model=Task)
async def create_task(
task_data: TaskCreate,
current_user: UserModel = Depends(get_current_active_user),
task_service: TaskService = Depends(get_task_service)
):
"""Create task using service layer with auth"""
try:
return await task_service.create(task_data, current_user.id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/tasks", response_model=List[Task])
async def get_user_tasks(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
completed: Optional[bool] = Query(None),
priority: Optional[str] = Query(None, regex="^(low|medium|high)$"),
search: Optional[str] = Query(None),
include_overdue: bool = Query(True),
current_user: UserModel = Depends(get_current_active_user),
task_service: TaskService = Depends(get_task_service)
):
"""Get user tasks with business logic filtering"""
return await task_service.get_user_tasks_with_business_logic(
owner_id=current_user.id,
skip=skip,
limit=limit,
completed=completed,
priority=priority,
search=search,
include_overdue=include_overdue
)
@app.get("/tasks/{task_id}", response_model=Task)
async def get_task(
task_id: int,
current_user: UserModel = Depends(get_current_active_user),
task_service: TaskService = Depends(get_task_service)
):
"""Get specific task with ownership validation"""
task = await task_service.get_by_id(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if task.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized to access this task")
return task
@app.put("/tasks/{task_id}", response_model=Task)
async def update_task(
task_id: int,
task_data: TaskUpdate,
current_user: UserModel = Depends(get_current_active_user),
task_service: TaskService = Depends(get_task_service)
):
"""Update task using service layer with auth"""
try:
task = await task_service.update(task_id, task_data, current_user.id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
except ValueError as e:
raise HTTPException(status_code=403, detail=str(e))
@app.delete("/tasks/{task_id}")
async def delete_task(
task_id: int,
current_user: UserModel = Depends(get_current_active_user),
task_service: TaskService = Depends(get_task_service)
):
"""Delete task using service layer with auth"""
try:
deleted = await task_service.delete(task_id, current_user.id)
if not deleted:
raise HTTPException(status_code=404, detail="Task not found")
return {"message": "Task deleted successfully"}
except ValueError as e:
raise HTTPException(status_code=403, detail=str(e))
@app.get("/tasks/analytics")
async def get_task_analytics(
current_user: UserModel = Depends(get_current_active_user),
task_service: TaskService = Depends(get_task_service)
):
"""Get advanced task analytics"""
return await task_service.get_task_analytics(current_user.id)
# Health check
@app.get("/health")
async def health_check(settings: Settings = Depends(get_settings)):
"""Health check endpoint"""
return {
"status": "healthy",
"app_name": settings.app_name,
"environment": settings.environment
}
Step 6: Testing with Dependency Overrides
Create Testing Infrastructure
Create backend/tests/conftest.py
:
import pytest
import asyncio
from typing import AsyncGenerator
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import Base, get_database_session
from app.dependencies import get_settings
from app.dependencies.config import Settings
# Test database URL
TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db"
# Test settings
class TestSettings(Settings):
database_url: str = TEST_DATABASE_URL
database_url_sync: str = "sqlite:///./test.db"
secret_key: str = "test-secret-key"
environment: str = "test"
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def test_engine():
"""Create test database engine"""
engine = create_async_engine(TEST_DATABASE_URL, echo=True)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
# Cleanup
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def test_session(test_engine) -> AsyncGenerator[AsyncSession, None]:
"""Create test database session"""
TestSessionLocal = sessionmaker(
test_engine, class_=AsyncSession, expire_on_commit=False
)
async with TestSessionLocal() as session:
yield session
await session.rollback()
@pytest.fixture
def test_settings():
"""Test settings fixture"""
return TestSettings()
@pytest.fixture
def client(test_session, test_settings):
"""Test client with dependency overrides"""
def override_get_db():
yield test_session
def override_get_settings():
return test_settings
app.dependency_overrides[get_database_session] = override_get_db
app.dependency_overrides[get_settings] = override_get_settings
with TestClient(app) as test_client:
yield test_client
# Cleanup overrides
app.dependency_overrides.clear()
Service Layer Tests
Create backend/tests/test_services.py
:
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.services import UserService, TaskService
from app.schemas import UserCreate, TaskCreate
from app.models import User
@pytest.mark.asyncio
async def test_user_service_create(test_session: AsyncSession):
"""Test user service creation with business logic"""
user_service = UserService(test_session)
user_data = UserCreate(
email="[email protected]",
username="testuser",
full_name="Test User"
)
user = await user_service.create(user_data)
assert user.email == "[email protected]"
assert user.username == "testuser"
assert user.is_active is True
@pytest.mark.asyncio
async def test_user_service_duplicate_email(test_session: AsyncSession):
"""Test user service duplicate email validation"""
user_service = UserService(test_session)
user_data = UserCreate(
email="[email protected]",
username="testuser",
full_name="Test User"
)
# Create first user
await user_service.create(user_data)
# Try to create second user with same email
user_data2 = UserCreate(
email="[email protected]",
username="testuser2",
full_name="Test User 2"
)
with pytest.raises(ValueError, match="Email already registered"):
await user_service.create(user_data2)
@pytest.mark.asyncio
async def test_task_service_create_with_user_validation(test_session: AsyncSession):
"""Test task service creation with user validation"""
user_service = UserService(test_session)
task_service = TaskService(test_session)
# Create user first
user_data = UserCreate(
email="[email protected]",
username="testuser",
full_name="Test User"
)
user = await user_service.create(user_data)
# Create task
task_data = TaskCreate(
title="Test Task",
description="Test Description",
priority="high"
)
task = await task_service.create(task_data, user.id)
assert task.title == "Test Task"
assert task.owner_id == user.id
assert task.priority == "high"
# Business logic: high priority tasks should have due date set
assert task.due_date is not None
@pytest.mark.asyncio
async def test_task_service_invalid_user(test_session: AsyncSession):
"""Test task service with invalid user"""
task_service = TaskService(test_session)
task_data = TaskCreate(
title="Test Task",
description="Test Description"
)
# Try to create task for non-existent user
with pytest.raises(ValueError, match="User not found or inactive"):
await task_service.create(task_data, 99999)
Dependency Override Tests
Create backend/tests/test_dependencies.py
:
import pytest
from unittest.mock import AsyncMock
from fastapi.testclient import TestClient
from app.main import app
from app.dependencies import get_user_service, get_current_active_user
from app.models import User
def test_dependency_override():
"""Test dependency override functionality"""
# Mock user service
mock_user_service = AsyncMock()
mock_user_service.get_user_stats.return_value = {"mock": "stats"}
# Mock current user
mock_user = User(
id=1,
email="[email protected]",
username="testuser",
is_active=True
)
# Override dependencies
app.dependency_overrides[get_user_service] = lambda: mock_user_service
app.dependency_overrides[get_current_active_user] = lambda: mock_user
with TestClient(app) as client:
response = client.get("/users/me/stats")
assert response.status_code == 200
assert response.json() == {"mock": "stats"}
# Cleanup
app.dependency_overrides.clear()
Troubleshooting
Common Issues & Solutions
Circular Import Errors:
# Avoid circular imports by using forward references
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .services import UserService
Dependency Resolution Issues:
# Use Depends() properly in function signatures
async def endpoint(
service: UserService = Depends(get_user_service) # Correct
# service = Depends(get_user_service) # Incorrect
):
pass
Testing Dependency Overrides:
# Always clear overrides after tests
@pytest.fixture(autouse=True)
def clear_dependency_overrides():
yield
app.dependency_overrides.clear()
Performance Issues:
# Use lru_cache for expensive dependencies
from functools import lru_cache
@lru_cache()
def get_expensive_resource():
return ExpensiveResource()
What You've Accomplished
Congratulations! You've mastered FastAPI dependency injection with:
- Clean architecture with service layers and repository patterns
- Authentication system with JWT-based security dependencies
- Configuration management with environment-based settings
- Testing framework with dependency overrides and mocks
- Business logic separation from API endpoints
- Scalable patterns for production applications
- Error handling and validation at service layer
- Performance optimization with dependency caching
Next Steps
Advanced Dependency Patterns:
- Caching dependencies - Redis integration for performance
- Background task dependencies - Celery or RQ integration
- External service dependencies - HTTP clients, third-party APIs
- Event-driven dependencies - Message queues and pub/sub
- Multi-tenant dependencies - Tenant-aware services
Production Considerations:
- Dependency monitoring - Track performance and health
- Graceful degradation - Fallbacks for failed dependencies
- Resource pooling - Efficient connection management
- Dependency injection containers - Advanced DI frameworks
- Microservice dependencies - Service-to-service communication
Pro Tips for Dependency Injection
Best Practices:
- Keep dependencies focused - Single responsibility principle
- Use type hints - Better IDE support and documentation
- Minimize dependency depth - Avoid complex dependency chains
- Test dependencies in isolation - Unit test service layers
- Document dependency lifecycles - Clear resource management
Performance Optimization:
- Cache expensive dependencies - Use lru_cache for singletons
- Lazy load dependencies - Only create when needed
- Pool connections - Reuse database and HTTP connections
- Monitor dependency performance - Track creation and execution time
- Use async dependencies - Maximize concurrency benefits
Ready to build scalable, maintainable applications? You now have the foundation to create clean, testable FastAPI applications using proper dependency injection patterns. Your code will be more modular, testable, and ready for production scale!