Skip to content

FastAPI Dependency Injection Mastery 2025 - Build Scalable Applications

Master FastAPI's dependency injection system to build maintainable, testable, and scalable applications. This tutorial builds on our PostgreSQL Task Management app, showing how to implement clean architecture patterns using dependency injection.

What You'll Learn

By completing this tutorial, you'll master:

  • Dependency injection fundamentals and advanced patterns
  • Service layer architecture for business logic separation
  • Authentication dependencies for secure endpoints
  • Configuration management with environment-based dependencies
  • Testing strategies with dependency overrides
  • Performance optimization with dependency caching
  • Clean architecture principles for maintainable code

Prerequisites

What you need before starting:

  • Completed the PostgreSQL Tutorial - We'll refactor that app
  • Understanding of async/await in Python
  • Basic knowledge of design patterns (helpful but not required)
  • Familiarity with testing concepts (unit tests, mocks)

Time to complete: 25 minutes


What We're Building

You'll refactor the Task Management API from the PostgreSQL tutorial with:

  • Service layer pattern - Business logic separated from API layer
  • Authentication dependencies - JWT-based user authentication
  • Configuration dependencies - Environment-based settings
  • Repository pattern - Database access abstraction
  • Testing framework - Dependency overrides for unit tests
  • Caching layer - Performance optimization with Redis
  • Error handling - Centralized exception management

Before vs After: - Before: Direct database access in endpoints, mixed concerns - After: Clean separation with service layers, dependency injection, and testable architecture


Step 1: Understanding Dependency Injection Patterns

What is Dependency Injection?

Dependency Injection is a design pattern where dependencies are provided to a function or class from external sources rather than being created internally. In FastAPI, this enables:

  • Separation of concerns - API logic separate from business logic
  • Testability - Easy mocking and testing
  • Reusability - Share dependencies across endpoints
  • Performance - Efficient resource management

FastAPI DI System Overview

Create backend/app/examples/di_examples.py:

from fastapi import FastAPI, Depends, HTTPException
from typing import Annotated
import asyncio

app = FastAPI()

# 1. Simple Dependency
def get_current_timestamp():
    from datetime import datetime
    return datetime.now().isoformat()

# 2. Async Dependency
async def get_user_agent(request: Request):
    return request.headers.get("user-agent", "unknown")

# 3. Class-based Dependency
class DatabaseService:
    def __init__(self):
        self.connection_pool = "Mock DB Pool"

    async def get_connection(self):
        await asyncio.sleep(0.1)  # Simulate connection time
        return f"Connection from {self.connection_pool}"

# Dependency instances
db_service = DatabaseService()

# 4. Sub-dependencies (dependencies that depend on other dependencies)
async def get_db_connection(service: DatabaseService = Depends(lambda: db_service)):
    return await service.get_connection()

async def get_user_context(
    timestamp: str = Depends(get_current_timestamp),
    user_agent: str = Depends(get_user_agent),
    db_conn: str = Depends(get_db_connection)
):
    return {
        "timestamp": timestamp,
        "user_agent": user_agent,
        "db_connection": db_conn
    }

# Using dependencies in endpoints
@app.get("/simple")
async def simple_endpoint(timestamp: str = Depends(get_current_timestamp)):
    return {"current_time": timestamp}

@app.get("/complex")
async def complex_endpoint(
    context: dict = Depends(get_user_context)
):
    return {"context": context}

# Type annotations for better IDE support
@app.get("/typed")
async def typed_endpoint(
    timestamp: Annotated[str, Depends(get_current_timestamp)]
):
    return {"typed_timestamp": timestamp}

Dependency Scopes and Lifecycle

Dependencies in FastAPI have different scopes:

  • Per request - Created fresh for each request (default)
  • Per application - Created once at startup (singleton pattern)
  • Per test - Overridden during testing

Step 2: Service Layer Architecture

Create Service Layer Structure

Refactor our Task Management app with service layers:

Create backend/app/services/__init__.py:

# Service layer exports for clean imports
from .user_service import UserService
from .task_service import TaskService
from .auth_service import AuthService

__all__ = ["UserService", "TaskService", "AuthService"]

Create backend/app/services/base_service.py:

from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession
from typing import TypeVar, Generic, Optional, List

T = TypeVar('T')

class BaseService(ABC, Generic[T]):
    """Base service class with common patterns"""

    def __init__(self, session: AsyncSession):
        self.session = session

    @abstractmethod
    async def create(self, data: dict) -> T:
        pass

    @abstractmethod
    async def get_by_id(self, id: int) -> Optional[T]:
        pass

    @abstractmethod
    async def update(self, id: int, data: dict) -> Optional[T]:
        pass

    @abstractmethod
    async def delete(self, id: int) -> bool:
        pass

User Service Implementation

Create backend/app/services/user_service.py:

from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from passlib.context import CryptContext

from ..models import User
from ..schemas import UserCreate, UserUpdate
from ..crud import UserRepository
from .base_service import BaseService

class UserService(BaseService[User]):
    """User business logic service"""

    def __init__(self, session: AsyncSession):
        super().__init__(session)
        self.user_repo = UserRepository(session)
        self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

    async def create(self, user_data: UserCreate) -> User:
        """Create new user with validation"""
        # Business logic: Check if user already exists
        if await self.user_repo.get_by_email(user_data.email):
            raise ValueError("Email already registered")

        if await self.user_repo.get_by_username(user_data.username):
            raise ValueError("Username already taken")

        # Business logic: Validate username format
        if len(user_data.username) < 3:
            raise ValueError("Username must be at least 3 characters")

        return await self.user_repo.create(user_data)

    async def get_by_id(self, user_id: int) -> Optional[User]:
        """Get user by ID with business logic checks"""
        user = await self.user_repo.get_by_id(user_id)
        if user and not user.is_active:
            return None  # Don't return inactive users
        return user

    async def get_by_email(self, email: str) -> Optional[User]:
        """Get active user by email"""
        user = await self.user_repo.get_by_email(email)
        return user if user and user.is_active else None

    async def update(self, user_id: int, user_data: UserUpdate) -> Optional[User]:
        """Update user with validation"""
        user = await self.get_by_id(user_id)
        if not user:
            return None

        # Business logic: Validate email uniqueness if changing
        if user_data.email and user_data.email != user.email:
            existing = await self.user_repo.get_by_email(user_data.email)
            if existing:
                raise ValueError("Email already in use")

        # Business logic: Validate username uniqueness if changing
        if user_data.username and user_data.username != user.username:
            existing = await self.user_repo.get_by_username(user_data.username)
            if existing:
                raise ValueError("Username already taken")

        return await self.user_repo.update(user_id, user_data)

    async def delete(self, user_id: int) -> bool:
        """Soft delete user (deactivate)"""
        user = await self.get_by_id(user_id)
        if not user:
            return False

        # Business logic: Soft delete by deactivating
        user_data = UserUpdate(is_active=False)
        await self.user_repo.update(user_id, user_data)
        return True

    async def get_user_stats(self, user_id: int) -> dict:
        """Get user statistics with business logic"""
        user = await self.get_by_id(user_id)
        if not user:
            return {}

        # Business logic could include complex calculations
        return {
            "user_id": user.id,
            "username": user.username,
            "account_age_days": (user.created_at - user.created_at).days,
            "is_premium": len(user.username) > 5,  # Example business rule
        }

Task Service Implementation

Create backend/app/services/task_service.py:

from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timedelta

from ..models import Task, User
from ..schemas import TaskCreate, TaskUpdate
from ..crud import TaskRepository
from .base_service import BaseService
from .user_service import UserService

class TaskService(BaseService[Task]):
    """Task business logic service"""

    def __init__(self, session: AsyncSession):
        super().__init__(session)
        self.task_repo = TaskRepository(session)
        self.user_service = UserService(session)

    async def create(self, task_data: TaskCreate, owner_id: int) -> Task:
        """Create task with business validation"""
        # Business logic: Verify user exists and is active
        user = await self.user_service.get_by_id(owner_id)
        if not user:
            raise ValueError("User not found or inactive")

        # Business logic: Validate task limits
        user_tasks = await self.task_repo.get_user_tasks(owner_id, limit=1000)
        if len(user_tasks) >= 100:  # Business rule: max 100 tasks per user
            raise ValueError("Task limit reached. Maximum 100 tasks per user.")

        # Business logic: Set due date if high priority
        if task_data.priority == "high" and not task_data.due_date:
            task_data.due_date = datetime.now() + timedelta(days=3)

        return await self.task_repo.create(task_data, owner_id)

    async def get_by_id(self, task_id: int) -> Optional[Task]:
        """Get task by ID"""
        return await self.task_repo.get_by_id(task_id)

    async def get_user_tasks_with_business_logic(
        self,
        owner_id: int,
        skip: int = 0,
        limit: int = 100,
        completed: Optional[bool] = None,
        priority: Optional[str] = None,
        search: Optional[str] = None,
        include_overdue: bool = True
    ) -> List[Task]:
        """Get user tasks with business logic filtering"""

        # Business logic: Verify user access
        user = await self.user_service.get_by_id(owner_id)
        if not user:
            return []

        tasks = await self.task_repo.get_user_tasks(
            owner_id, skip, limit, completed, priority, search
        )

        # Business logic: Filter overdue tasks if requested
        if not include_overdue:
            now = datetime.now()
            tasks = [
                task for task in tasks 
                if not task.due_date or task.due_date > now
            ]

        return tasks

    async def update(self, task_id: int, task_data: TaskUpdate, user_id: int) -> Optional[Task]:
        """Update task with ownership validation"""
        task = await self.get_by_id(task_id)
        if not task:
            return None

        # Business logic: Verify ownership
        if task.owner_id != user_id:
            raise ValueError("Not authorized to update this task")

        # Business logic: Auto-complete subtasks
        if task_data.completed and task_data.completed != task.completed:
            await self._mark_subtasks_completed(task_id)

        return await self.task_repo.update(task_id, task_data)

    async def delete(self, task_id: int, user_id: int) -> bool:
        """Delete task with ownership validation"""
        task = await self.get_by_id(task_id)
        if not task:
            return False

        # Business logic: Verify ownership
        if task.owner_id != user_id:
            raise ValueError("Not authorized to delete this task")

        return await self.task_repo.delete(task_id)

    async def get_task_analytics(self, owner_id: int) -> dict:
        """Get advanced task analytics"""
        user = await self.user_service.get_by_id(owner_id)
        if not user:
            return {}

        stats = await self.task_repo.get_task_stats(owner_id)
        tasks = await self.task_repo.get_user_tasks(owner_id, limit=1000)

        # Business logic: Calculate advanced metrics
        now = datetime.now()
        overdue_tasks = [
            task for task in tasks 
            if task.due_date and task.due_date < now and not task.completed
        ]

        high_priority_tasks = [
            task for task in tasks if task.priority == "high"
        ]

        return {
            **stats,
            "overdue_count": len(overdue_tasks),
            "high_priority_count": len(high_priority_tasks),
            "productivity_score": self._calculate_productivity_score(tasks),
            "avg_completion_time": self._calculate_avg_completion_time(tasks)
        }

    async def _mark_subtasks_completed(self, parent_task_id: int):
        """Business logic helper: Mark related subtasks as completed"""
        # In a real app, you might have task relationships
        pass

    def _calculate_productivity_score(self, tasks: List[Task]) -> float:
        """Business logic helper: Calculate productivity score"""
        if not tasks:
            return 0.0

        completed_tasks = [task for task in tasks if task.completed]
        return len(completed_tasks) / len(tasks) * 100

    def _calculate_avg_completion_time(self, tasks: List[Task]) -> Optional[float]:
        """Business logic helper: Calculate average completion time"""
        completed_tasks = [
            task for task in tasks 
            if task.completed and task.created_at and task.updated_at
        ]

        if not completed_tasks:
            return None

        total_time = sum(
            (task.updated_at - task.created_at).total_seconds()
            for task in completed_tasks
        )

        return total_time / len(completed_tasks) / 3600  # Convert to hours


Step 3: Dependency Injection Setup

Create Dependency Providers

Create backend/app/dependencies/__init__.py:

from .database import get_database_session
from .services import get_user_service, get_task_service
from .auth import get_current_user, get_current_active_user
from .config import get_settings

__all__ = [
    "get_database_session",
    "get_user_service", 
    "get_task_service",
    "get_current_user",
    "get_current_active_user", 
    "get_settings"
]

Create backend/app/dependencies/database.py:

from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from ..database import AsyncSessionLocal

async def get_database_session() -> AsyncGenerator[AsyncSession, None]:
    """Database session dependency with proper cleanup"""
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

Create backend/app/dependencies/services.py:

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from ..services import UserService, TaskService
from .database import get_database_session

def get_user_service(
    session: AsyncSession = Depends(get_database_session)
) -> UserService:
    """User service dependency"""
    return UserService(session)

def get_task_service(
    session: AsyncSession = Depends(get_database_session)
) -> TaskService:
    """Task service dependency"""
    return TaskService(session)

Create backend/app/dependencies/config.py:

import os
from functools import lru_cache
from typing import Optional
from pydantic import BaseSettings

class Settings(BaseSettings):
    """Application settings with environment variable support"""

    # Database
    database_url: str
    database_url_sync: str

    # Security
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # Application
    app_name: str = "Task Management API"
    debug: bool = False
    environment: str = "production"

    # Redis (for caching)
    redis_url: Optional[str] = None

    # Email (for notifications)
    smtp_server: Optional[str] = None
    smtp_port: int = 587
    smtp_username: Optional[str] = None
    smtp_password: Optional[str] = None

    class Config:
        env_file = ".env"
        case_sensitive = False

@lru_cache()
def get_settings() -> Settings:
    """Settings dependency with caching"""
    return Settings()


Step 4: Authentication Dependencies

JWT Authentication Service

Create backend/app/services/auth_service.py:

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession

from ..models import User
from ..schemas import UserCreate
from .user_service import UserService

class AuthService:
    """Authentication business logic"""

    def __init__(self, session: AsyncSession, secret_key: str, algorithm: str = "HS256"):
        self.session = session
        self.user_service = UserService(session)
        self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
        self.secret_key = secret_key
        self.algorithm = algorithm

    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """Verify password against hash"""
        return self.pwd_context.verify(plain_password, hashed_password)

    def get_password_hash(self, password: str) -> str:
        """Generate password hash"""
        return self.pwd_context.hash(password)

    async def authenticate_user(self, email: str, password: str) -> Optional[User]:
        """Authenticate user with email and password"""
        user = await self.user_service.get_by_email(email)
        if not user:
            return None

        # In a real app, users would have password hashes
        # For now, we'll create a simple mock verification
        if not self.verify_password(password, "mock_hash"):
            return None

        return user

    def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str:
        """Create JWT access token"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=15)

        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
        return encoded_jwt

    def verify_token(self, token: str) -> Optional[str]:
        """Verify JWT token and return username"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            username: str = payload.get("sub")
            return username
        except JWTError:
            return None

    async def get_user_from_token(self, token: str) -> Optional[User]:
        """Get user from JWT token"""
        username = self.verify_token(token)
        if username is None:
            return None

        user = await self.user_service.get_by_email(username)
        return user

Authentication Dependencies

Create backend/app/dependencies/auth.py:

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional

from ..models import User
from ..services.auth_service import AuthService
from .database import get_database_session
from .config import get_settings, Settings

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def get_auth_service(
    session: AsyncSession = Depends(get_database_session),
    settings: Settings = Depends(get_settings)
) -> AuthService:
    """Auth service dependency"""
    return AuthService(session, settings.secret_key, settings.algorithm)

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    auth_service: AuthService = Depends(get_auth_service)
) -> User:
    """Get current authenticated user"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    user = await auth_service.get_user_from_token(token)
    if user is None:
        raise credentials_exception

    return user

async def get_current_active_user(
    current_user: User = Depends(get_current_user)
) -> User:
    """Get current active user (not disabled)"""
    if not current_user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user

# Optional dependency for endpoints that can work with or without auth
async def get_optional_current_user(
    token: Optional[str] = Depends(oauth2_scheme),
    auth_service: AuthService = Depends(get_auth_service)
) -> Optional[User]:
    """Get current user if token provided, None otherwise"""
    if not token:
        return None

    try:
        user = await auth_service.get_user_from_token(token)
        return user if user and user.is_active else None
    except:
        return None


Step 5: Refactored API Endpoints

Update Main Application

Update backend/app/main.py:

from fastapi import FastAPI, Depends, HTTPException, Query, status
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional
from datetime import timedelta

from .dependencies import (
    get_user_service, get_task_service, get_auth_service,
    get_current_user, get_current_active_user, get_settings
)
from .services import UserService, TaskService, AuthService
from .schemas import (
    User, UserCreate, UserUpdate,
    Task, TaskCreate, TaskUpdate,
    Token
)
from .models import User as UserModel
from .dependencies.config import Settings

app = FastAPI(
    title="Task Management API with Dependency Injection",
    description="A clean, scalable task management API built with FastAPI and DI patterns",
    version="3.0.0"
)

# Configure CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173", "http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Authentication endpoints
@app.post("/token", response_model=Token)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    auth_service: AuthService = Depends(get_auth_service),
    settings: Settings = Depends(get_settings)
):
    """Login endpoint that returns JWT token"""
    user = await auth_service.authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
    access_token = auth_service.create_access_token(
        data={"sub": user.email}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# User endpoints with service layer
@app.post("/users", response_model=User)
async def create_user(
    user_data: UserCreate,
    user_service: UserService = Depends(get_user_service)
):
    """Create new user using service layer"""
    try:
        return await user_service.create(user_data)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/users/me", response_model=User)
async def get_current_user_info(
    current_user: UserModel = Depends(get_current_active_user)
):
    """Get current user information"""
    return current_user

@app.get("/users/me/stats")
async def get_user_stats(
    current_user: UserModel = Depends(get_current_active_user),
    user_service: UserService = Depends(get_user_service)
):
    """Get current user statistics"""
    return await user_service.get_user_stats(current_user.id)

# Task endpoints with service layer and authentication
@app.post("/tasks", response_model=Task)
async def create_task(
    task_data: TaskCreate,
    current_user: UserModel = Depends(get_current_active_user),
    task_service: TaskService = Depends(get_task_service)
):
    """Create task using service layer with auth"""
    try:
        return await task_service.create(task_data, current_user.id)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/tasks", response_model=List[Task])
async def get_user_tasks(
    skip: int = Query(0, ge=0),
    limit: int = Query(100, ge=1, le=1000),
    completed: Optional[bool] = Query(None),
    priority: Optional[str] = Query(None, regex="^(low|medium|high)$"),
    search: Optional[str] = Query(None),
    include_overdue: bool = Query(True),
    current_user: UserModel = Depends(get_current_active_user),
    task_service: TaskService = Depends(get_task_service)
):
    """Get user tasks with business logic filtering"""
    return await task_service.get_user_tasks_with_business_logic(
        owner_id=current_user.id,
        skip=skip,
        limit=limit,
        completed=completed,
        priority=priority,
        search=search,
        include_overdue=include_overdue
    )

@app.get("/tasks/{task_id}", response_model=Task)
async def get_task(
    task_id: int,
    current_user: UserModel = Depends(get_current_active_user),
    task_service: TaskService = Depends(get_task_service)
):
    """Get specific task with ownership validation"""
    task = await task_service.get_by_id(task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    if task.owner_id != current_user.id:
        raise HTTPException(status_code=403, detail="Not authorized to access this task")

    return task

@app.put("/tasks/{task_id}", response_model=Task)
async def update_task(
    task_id: int,
    task_data: TaskUpdate,
    current_user: UserModel = Depends(get_current_active_user),
    task_service: TaskService = Depends(get_task_service)
):
    """Update task using service layer with auth"""
    try:
        task = await task_service.update(task_id, task_data, current_user.id)
        if not task:
            raise HTTPException(status_code=404, detail="Task not found")
        return task
    except ValueError as e:
        raise HTTPException(status_code=403, detail=str(e))

@app.delete("/tasks/{task_id}")
async def delete_task(
    task_id: int,
    current_user: UserModel = Depends(get_current_active_user),
    task_service: TaskService = Depends(get_task_service)
):
    """Delete task using service layer with auth"""
    try:
        deleted = await task_service.delete(task_id, current_user.id)
        if not deleted:
            raise HTTPException(status_code=404, detail="Task not found")
        return {"message": "Task deleted successfully"}
    except ValueError as e:
        raise HTTPException(status_code=403, detail=str(e))

@app.get("/tasks/analytics")
async def get_task_analytics(
    current_user: UserModel = Depends(get_current_active_user),
    task_service: TaskService = Depends(get_task_service)
):
    """Get advanced task analytics"""
    return await task_service.get_task_analytics(current_user.id)

# Health check
@app.get("/health")
async def health_check(settings: Settings = Depends(get_settings)):
    """Health check endpoint"""
    return {
        "status": "healthy",
        "app_name": settings.app_name,
        "environment": settings.environment
    }


Step 6: Testing with Dependency Overrides

Create Testing Infrastructure

Create backend/tests/conftest.py:

import pytest
import asyncio
from typing import AsyncGenerator
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

from app.main import app
from app.database import Base, get_database_session
from app.dependencies import get_settings
from app.dependencies.config import Settings

# Test database URL
TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db"

# Test settings
class TestSettings(Settings):
    database_url: str = TEST_DATABASE_URL
    database_url_sync: str = "sqlite:///./test.db"
    secret_key: str = "test-secret-key"
    environment: str = "test"

@pytest.fixture(scope="session")
def event_loop():
    """Create event loop for async tests"""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

@pytest.fixture(scope="session")
async def test_engine():
    """Create test database engine"""
    engine = create_async_engine(TEST_DATABASE_URL, echo=True)

    # Create tables
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    yield engine

    # Cleanup
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
    await engine.dispose()

@pytest.fixture
async def test_session(test_engine) -> AsyncGenerator[AsyncSession, None]:
    """Create test database session"""
    TestSessionLocal = sessionmaker(
        test_engine, class_=AsyncSession, expire_on_commit=False
    )

    async with TestSessionLocal() as session:
        yield session
        await session.rollback()

@pytest.fixture
def test_settings():
    """Test settings fixture"""
    return TestSettings()

@pytest.fixture
def client(test_session, test_settings):
    """Test client with dependency overrides"""

    def override_get_db():
        yield test_session

    def override_get_settings():
        return test_settings

    app.dependency_overrides[get_database_session] = override_get_db
    app.dependency_overrides[get_settings] = override_get_settings

    with TestClient(app) as test_client:
        yield test_client

    # Cleanup overrides
    app.dependency_overrides.clear()

Service Layer Tests

Create backend/tests/test_services.py:

import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from app.services import UserService, TaskService
from app.schemas import UserCreate, TaskCreate
from app.models import User

@pytest.mark.asyncio
async def test_user_service_create(test_session: AsyncSession):
    """Test user service creation with business logic"""
    user_service = UserService(test_session)

    user_data = UserCreate(
        email="[email protected]",
        username="testuser",
        full_name="Test User"
    )

    user = await user_service.create(user_data)

    assert user.email == "[email protected]"
    assert user.username == "testuser"
    assert user.is_active is True

@pytest.mark.asyncio
async def test_user_service_duplicate_email(test_session: AsyncSession):
    """Test user service duplicate email validation"""
    user_service = UserService(test_session)

    user_data = UserCreate(
        email="[email protected]",
        username="testuser",
        full_name="Test User"
    )

    # Create first user
    await user_service.create(user_data)

    # Try to create second user with same email
    user_data2 = UserCreate(
        email="[email protected]",
        username="testuser2",
        full_name="Test User 2"
    )

    with pytest.raises(ValueError, match="Email already registered"):
        await user_service.create(user_data2)

@pytest.mark.asyncio
async def test_task_service_create_with_user_validation(test_session: AsyncSession):
    """Test task service creation with user validation"""
    user_service = UserService(test_session)
    task_service = TaskService(test_session)

    # Create user first
    user_data = UserCreate(
        email="[email protected]",
        username="testuser",
        full_name="Test User"
    )
    user = await user_service.create(user_data)

    # Create task
    task_data = TaskCreate(
        title="Test Task",
        description="Test Description",
        priority="high"
    )

    task = await task_service.create(task_data, user.id)

    assert task.title == "Test Task"
    assert task.owner_id == user.id
    assert task.priority == "high"
    # Business logic: high priority tasks should have due date set
    assert task.due_date is not None

@pytest.mark.asyncio
async def test_task_service_invalid_user(test_session: AsyncSession):
    """Test task service with invalid user"""
    task_service = TaskService(test_session)

    task_data = TaskCreate(
        title="Test Task",
        description="Test Description"
    )

    # Try to create task for non-existent user
    with pytest.raises(ValueError, match="User not found or inactive"):
        await task_service.create(task_data, 99999)

Dependency Override Tests

Create backend/tests/test_dependencies.py:

import pytest
from unittest.mock import AsyncMock
from fastapi.testclient import TestClient

from app.main import app
from app.dependencies import get_user_service, get_current_active_user
from app.models import User

def test_dependency_override():
    """Test dependency override functionality"""

    # Mock user service
    mock_user_service = AsyncMock()
    mock_user_service.get_user_stats.return_value = {"mock": "stats"}

    # Mock current user
    mock_user = User(
        id=1,
        email="[email protected]",
        username="testuser",
        is_active=True
    )

    # Override dependencies
    app.dependency_overrides[get_user_service] = lambda: mock_user_service
    app.dependency_overrides[get_current_active_user] = lambda: mock_user

    with TestClient(app) as client:
        response = client.get("/users/me/stats")
        assert response.status_code == 200
        assert response.json() == {"mock": "stats"}

    # Cleanup
    app.dependency_overrides.clear()


Troubleshooting

Common Issues & Solutions

Circular Import Errors:

# Avoid circular imports by using forward references
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .services import UserService

Dependency Resolution Issues:

# Use Depends() properly in function signatures
async def endpoint(
    service: UserService = Depends(get_user_service)  # Correct
    # service = Depends(get_user_service)  # Incorrect
):
    pass

Testing Dependency Overrides:

# Always clear overrides after tests
@pytest.fixture(autouse=True)
def clear_dependency_overrides():
    yield
    app.dependency_overrides.clear()

Performance Issues:

# Use lru_cache for expensive dependencies
from functools import lru_cache

@lru_cache()
def get_expensive_resource():
    return ExpensiveResource()


What You've Accomplished

Congratulations! You've mastered FastAPI dependency injection with:

  • Clean architecture with service layers and repository patterns
  • Authentication system with JWT-based security dependencies
  • Configuration management with environment-based settings
  • Testing framework with dependency overrides and mocks
  • Business logic separation from API endpoints
  • Scalable patterns for production applications
  • Error handling and validation at service layer
  • Performance optimization with dependency caching

Next Steps

Advanced Dependency Patterns:

  1. Caching dependencies - Redis integration for performance
  2. Background task dependencies - Celery or RQ integration
  3. External service dependencies - HTTP clients, third-party APIs
  4. Event-driven dependencies - Message queues and pub/sub
  5. Multi-tenant dependencies - Tenant-aware services

Production Considerations:

  1. Dependency monitoring - Track performance and health
  2. Graceful degradation - Fallbacks for failed dependencies
  3. Resource pooling - Efficient connection management
  4. Dependency injection containers - Advanced DI frameworks
  5. Microservice dependencies - Service-to-service communication

Pro Tips for Dependency Injection

Best Practices:

  1. Keep dependencies focused - Single responsibility principle
  2. Use type hints - Better IDE support and documentation
  3. Minimize dependency depth - Avoid complex dependency chains
  4. Test dependencies in isolation - Unit test service layers
  5. Document dependency lifecycles - Clear resource management

Performance Optimization:

  1. Cache expensive dependencies - Use lru_cache for singletons
  2. Lazy load dependencies - Only create when needed
  3. Pool connections - Reuse database and HTTP connections
  4. Monitor dependency performance - Track creation and execution time
  5. Use async dependencies - Maximize concurrency benefits

Ready to build scalable, maintainable applications? You now have the foundation to create clean, testable FastAPI applications using proper dependency injection patterns. Your code will be more modular, testable, and ready for production scale!