JWT Authentication in FastAPI 2025 - Secure Production APIs
Learn to implement production-ready JWT authentication in FastAPI with secure user management, password hashing, and comprehensive security patterns. This tutorial builds on our dependency injection patterns to create a complete authentication system.
What You'll Learn
By completing this tutorial, you'll master:
- JWT token authentication with secure implementation patterns
- User registration and login with password security
- Protected route implementation with role-based access
- Token refresh mechanisms for better user experience
- Password reset functionality with secure token handling
- Security best practices for production applications
- Integration with dependency injection for clean architecture
Prerequisites
What you need before starting:
- Completed the Dependency Injection Tutorial - We'll extend that app
- Basic understanding of JWT (JSON Web Tokens)
- Knowledge of cryptography concepts (hashing, salting)
- Understanding of HTTP security headers and practices
Time to complete: 35 minutes
What We're Building
You'll add complete authentication to the Task Management API with:
- User registration - Secure account creation with validation
- Login system - JWT token generation and validation
- Protected endpoints - Secure access to user resources
- Token refresh - Seamless user experience with token renewal
- Password management - Secure password reset and change
- Role-based access - Different permission levels
- Security headers - Protection against common attacks
Security Features:
- bcrypt password hashing with salts
- JWT tokens with expiration and refresh
- Rate limiting for login attempts
- CORS protection with proper configuration
- Input validation against injection attacks
Step 1: Security Infrastructure Setup
Install Security Dependencies
Add to your project dependencies:
| Bash | 
|---|
|  | cd backend
# Add authentication and security dependencies
poetry add python-jose[cryptography] passlib[bcrypt] python-multipart
# Add rate limiting and security
poetry add slowapi redis
# Add email support for password reset
poetry add fastapi-mail
 | 
Security Configuration
Update backend/app/dependencies/config.py:
| Python | 
|---|
|  | import secrets
from functools import lru_cache
from typing import Optional, List
from pydantic import BaseSettings, validator, EmailStr
class Settings(BaseSettings):
    """Enhanced settings with security configuration"""
    # Database
    database_url: str
    database_url_sync: str
    # Security - JWT
    secret_key: str = secrets.token_urlsafe(32)
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7
    # Security - Password
    password_min_length: int = 8
    password_require_special: bool = True
    password_require_uppercase: bool = True
    password_require_numbers: bool = True
    # Security - Rate Limiting
    rate_limit_per_minute: int = 60
    login_rate_limit_per_minute: int = 5
    # Security - CORS
    cors_origins: List[str] = ["http://localhost:3000", "http://localhost:5173"]
    cors_allow_credentials: bool = True
    # Application
    app_name: str = "Task Management API"
    debug: bool = False
    environment: str = "production"
    # Redis for rate limiting and caching
    redis_url: str = "redis://localhost:6379"
    # Email configuration
    mail_server: Optional[str] = None
    mail_port: int = 587
    mail_username: Optional[str] = None
    mail_password: Optional[str] = None
    mail_from: Optional[EmailStr] = None
    mail_tls: bool = True
    mail_ssl: bool = False
    # Password reset
    password_reset_expire_minutes: int = 30
    password_reset_secret: str = secrets.token_urlsafe(32)
    @validator("secret_key", pre=True)
    def validate_secret_key(cls, v):
        if not v or len(v) < 32:
            return secrets.token_urlsafe(32)
        return v
    class Config:
        env_file = ".env"
        case_sensitive = False
@lru_cache()
def get_settings() -> Settings:
    """Settings dependency with caching"""
    return Settings()
 | 
Enhanced User Model
Update backend/app/models.py to add authentication fields:
| Python | 
|---|
|  | from datetime import datetime
from typing import Optional
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True, nullable=False)
    username = Column(String(100), unique=True, index=True, nullable=False)
    full_name = Column(String(255), nullable=True)
    # Authentication fields
    hashed_password = Column(String(255), nullable=False)
    is_active = Column(Boolean, default=True)
    is_superuser = Column(Boolean, default=False)
    email_verified = Column(Boolean, default=False)
    # Security tracking
    last_login = Column(DateTime(timezone=True), nullable=True)
    failed_login_attempts = Column(Integer, default=0)
    locked_until = Column(DateTime(timezone=True), nullable=True)
    # Password reset
    password_reset_token = Column(String(255), nullable=True)
    password_reset_expires = Column(DateTime(timezone=True), nullable=True)
    # Timestamps
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
    # Relationships
    tasks = relationship("Task", back_populates="owner", cascade="all, delete-orphan")
    def __repr__(self):
        return f"<User(id={self.id}, email='{self.email}', active={self.is_active})>"
# Add RefreshToken model for secure token management
class RefreshToken(Base):
    __tablename__ = "refresh_tokens"
    id = Column(Integer, primary_key=True, index=True)
    token = Column(String(255), unique=True, index=True, nullable=False)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    expires_at = Column(DateTime(timezone=True), nullable=False)
    is_revoked = Column(Boolean, default=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    # Relationship
    user = relationship("User")
 | 
Authentication Schemas
Update backend/app/schemas.py:
| Python | 
|---|
|  | from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field, validator
import re
# Authentication Schemas
class UserRegister(BaseModel):
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=50)
    full_name: Optional[str] = Field(None, max_length=255)
    password: str = Field(..., min_length=8)
    @validator('username')
    def validate_username(cls, v):
        if not re.match(r'^[a-zA-Z0-9_-]+$', v):
            raise ValueError('Username can only contain letters, numbers, underscores, and hyphens')
        return v
    @validator('password')
    def validate_password(cls, v):
        if len(v) < 8:
            raise ValueError('Password must be at least 8 characters long')
        if not re.search(r'[A-Z]', v):
            raise ValueError('Password must contain at least one uppercase letter')
        if not re.search(r'[a-z]', v):
            raise ValueError('Password must contain at least one lowercase letter')
        if not re.search(r'\d', v):
            raise ValueError('Password must contain at least one number')
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
            raise ValueError('Password must contain at least one special character')
        return v
class UserLogin(BaseModel):
    email: EmailStr
    password: str
class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int
class TokenRefresh(BaseModel):
    refresh_token: str
class PasswordReset(BaseModel):
    email: EmailStr
class PasswordResetConfirm(BaseModel):
    token: str
    new_password: str = Field(..., min_length=8)
    @validator('new_password')
    def validate_password(cls, v):
        # Same validation as UserRegister
        if len(v) < 8:
            raise ValueError('Password must be at least 8 characters long')
        if not re.search(r'[A-Z]', v):
            raise ValueError('Password must contain at least one uppercase letter')
        if not re.search(r'[a-z]', v):
            raise ValueError('Password must contain at least one lowercase letter')
        if not re.search(r'\d', v):
            raise ValueError('Password must contain at least one number')
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
            raise ValueError('Password must contain at least one special character')
        return v
class PasswordChange(BaseModel):
    current_password: str
    new_password: str = Field(..., min_length=8)
    @validator('new_password')
    def validate_password(cls, v):
        # Same validation as UserRegister
        if len(v) < 8:
            raise ValueError('Password must be at least 8 characters long')
        if not re.search(r'[A-Z]', v):
            raise ValueError('Password must contain at least one uppercase letter')
        if not re.search(r'[a-z]', v):
            raise ValueError('Password must contain at least one lowercase letter')
        if not re.search(r'\d', v):
            raise ValueError('Password must contain at least one number')
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
            raise ValueError('Password must contain at least one special character')
        return v
# Enhanced User Schemas
class User(BaseModel):
    id: int
    email: EmailStr
    username: str
    full_name: Optional[str] = None
    is_active: bool
    is_superuser: bool
    email_verified: bool
    last_login: Optional[datetime] = None
    created_at: datetime
    class Config:
        from_attributes = True
class UserProfile(User):
    """Extended user profile with additional fields"""
    task_count: Optional[int] = None
    completed_task_count: Optional[int] = None
    account_age_days: Optional[int] = None
 | 
Step 2: Enhanced Authentication Service
Complete Authentication Service
Create backend/app/services/auth_service.py:
| Python | 
|---|
|  | import secrets
from datetime import datetime, timedelta
from typing import Optional, Tuple
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import and_, or_
from ..models import User, RefreshToken
from ..schemas import UserRegister, UserLogin, PasswordChange, PasswordResetConfirm
from .user_service import UserService
from .email_service import EmailService
class AuthService:
    """Enhanced authentication service with security features"""
    def __init__(self, session: AsyncSession, settings):
        self.session = session
        self.user_service = UserService(session)
        self.email_service = EmailService(settings)
        self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
        self.settings = settings
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """Verify password against hash"""
        return self.pwd_context.verify(plain_password, hashed_password)
    def get_password_hash(self, password: str) -> str:
        """Generate password hash"""
        return self.pwd_context.hash(password)
    async def register_user(self, user_data: UserRegister) -> User:
        """Register new user with security checks"""
        # Check if user already exists
        existing_user = await self.user_service.get_by_email(user_data.email)
        if existing_user:
            raise ValueError("Email already registered")
        existing_username = await self.user_service.get_by_username(user_data.username)
        if existing_username:
            raise ValueError("Username already taken")
        # Create user with hashed password
        hashed_password = self.get_password_hash(user_data.password)
        user = User(
            email=user_data.email,
            username=user_data.username,
            full_name=user_data.full_name,
            hashed_password=hashed_password,
            is_active=True,
            email_verified=False  # Require email verification
        )
        self.session.add(user)
        await self.session.flush()
        await self.session.refresh(user)
        # Send verification email (optional)
        if self.settings.mail_server:
            await self.email_service.send_verification_email(user)
        return user
    async def authenticate_user(self, email: str, password: str) -> Optional[User]:
        """Authenticate user with security measures"""
        user = await self.user_service.get_by_email(email)
        if not user:
            return None
        # Check if account is locked
        if user.locked_until and user.locked_until > datetime.utcnow():
            raise ValueError(f"Account locked until {user.locked_until}")
        # Check if account is active
        if not user.is_active:
            raise ValueError("Account is deactivated")
        # Verify password
        if not self.verify_password(password, user.hashed_password):
            # Increment failed login attempts
            user.failed_login_attempts += 1
            # Lock account after 5 failed attempts
            if user.failed_login_attempts >= 5:
                user.locked_until = datetime.utcnow() + timedelta(minutes=30)
            await self.session.commit()
            return None
        # Successful login - reset failed attempts and update last login
        user.failed_login_attempts = 0
        user.locked_until = None
        user.last_login = datetime.utcnow()
        await self.session.commit()
        return user
    def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str:
        """Create JWT access token"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=self.settings.access_token_expire_minutes)
        to_encode.update({"exp": expire, "type": "access"})
        encoded_jwt = jwt.encode(to_encode, self.settings.secret_key, algorithm=self.settings.algorithm)
        return encoded_jwt
    async def create_refresh_token(self, user_id: int) -> str:
        """Create and store refresh token"""
        token = secrets.token_urlsafe(32)
        expires_at = datetime.utcnow() + timedelta(days=self.settings.refresh_token_expire_days)
        refresh_token = RefreshToken(
            token=token,
            user_id=user_id,
            expires_at=expires_at
        )
        self.session.add(refresh_token)
        await self.session.commit()
        return token
    async def create_tokens(self, user: User) -> Tuple[str, str]:
        """Create both access and refresh tokens"""
        access_token = self.create_access_token(
            data={"sub": user.email, "user_id": user.id}
        )
        refresh_token = await self.create_refresh_token(user.id)
        return access_token, refresh_token
    def verify_token(self, token: str) -> Optional[dict]:
        """Verify JWT token and return payload"""
        try:
            payload = jwt.decode(token, self.settings.secret_key, algorithms=[self.settings.algorithm])
            return payload
        except JWTError:
            return None
    async def get_user_from_token(self, token: str) -> Optional[User]:
        """Get user from JWT token"""
        payload = self.verify_token(token)
        if not payload:
            return None
        email: str = payload.get("sub")
        if not email:
            return None
        user = await self.user_service.get_by_email(email)
        return user
    async def refresh_access_token(self, refresh_token: str) -> Optional[Tuple[str, str]]:
        """Refresh access token using refresh token"""
        # Find refresh token in database
        result = await self.session.execute(
            select(RefreshToken).where(
                and_(
                    RefreshToken.token == refresh_token,
                    RefreshToken.is_revoked == False,
                    RefreshToken.expires_at > datetime.utcnow()
                )
            )
        )
        token_record = result.scalar_one_or_none()
        if not token_record:
            return None
        # Get user
        user = await self.user_service.get_by_id(token_record.user_id)
        if not user or not user.is_active:
            return None
        # Create new tokens
        new_access_token, new_refresh_token = await self.create_tokens(user)
        # Revoke old refresh token
        token_record.is_revoked = True
        await self.session.commit()
        return new_access_token, new_refresh_token
    async def revoke_refresh_token(self, refresh_token: str) -> bool:
        """Revoke refresh token (logout)"""
        result = await self.session.execute(
            select(RefreshToken).where(RefreshToken.token == refresh_token)
        )
        token_record = result.scalar_one_or_none()
        if token_record:
            token_record.is_revoked = True
            await self.session.commit()
            return True
        return False
    async def change_password(self, user_id: int, password_data: PasswordChange) -> bool:
        """Change user password"""
        user = await self.user_service.get_by_id(user_id)
        if not user:
            return False
        # Verify current password
        if not self.verify_password(password_data.current_password, user.hashed_password):
            raise ValueError("Current password is incorrect")
        # Update password
        user.hashed_password = self.get_password_hash(password_data.new_password)
        await self.session.commit()
        return True
    async def request_password_reset(self, email: str) -> bool:
        """Request password reset"""
        user = await self.user_service.get_by_email(email)
        if not user:
            # Don't reveal if email exists
            return True
        # Generate reset token
        reset_token = secrets.token_urlsafe(32)
        user.password_reset_token = reset_token
        user.password_reset_expires = datetime.utcnow() + timedelta(
            minutes=self.settings.password_reset_expire_minutes
        )
        await self.session.commit()
        # Send reset email
        if self.settings.mail_server:
            await self.email_service.send_password_reset_email(user, reset_token)
        return True
    async def reset_password(self, reset_data: PasswordResetConfirm) -> bool:
        """Reset password using token"""
        # Find user with valid reset token
        result = await self.session.execute(
            select(User).where(
                and_(
                    User.password_reset_token == reset_data.token,
                    User.password_reset_expires > datetime.utcnow()
                )
            )
        )
        user = result.scalar_one_or_none()
        if not user:
            raise ValueError("Invalid or expired reset token")
        # Update password and clear reset token
        user.hashed_password = self.get_password_hash(reset_data.new_password)
        user.password_reset_token = None
        user.password_reset_expires = None
        user.failed_login_attempts = 0  # Reset failed attempts
        user.locked_until = None  # Unlock account
        await self.session.commit()
        return True
 | 
Step 3: Security Middleware and Rate Limiting
Rate Limiting Service
Create backend/app/services/rate_limit_service.py:
| Python | 
|---|
|  | import redis
from datetime import datetime, timedelta
from typing import Optional
from fastapi import HTTPException, status
class RateLimitService:
    """Rate limiting service using Redis"""
    def __init__(self, redis_url: str):
        self.redis_client = redis.from_url(redis_url, decode_responses=True)
    async def check_rate_limit(
        self, 
        key: str, 
        limit: int, 
        window_minutes: int = 1
    ) -> bool:
        """Check if request is within rate limit"""
        try:
            current_time = datetime.utcnow()
            window_start = current_time - timedelta(minutes=window_minutes)
            # Use sliding window counter
            pipe = self.redis_client.pipeline()
            # Remove old entries
            pipe.zremrangebyscore(key, 0, window_start.timestamp())
            # Count current entries
            pipe.zcard(key)
            # Execute pipeline
            results = pipe.execute()
            current_count = results[1]
            if current_count >= limit:
                return False
            # Add current request
            self.redis_client.zadd(key, {str(current_time.timestamp()): current_time.timestamp()})
            self.redis_client.expire(key, window_minutes * 60)
            return True
        except Exception:
            # If Redis is down, allow the request
            return True
    def get_rate_limit_key(self, identifier: str, endpoint: str) -> str:
        """Generate rate limit key"""
        return f"rate_limit:{endpoint}:{identifier}"
 | 
Security Middleware
Create backend/app/middleware/security.py:
| Python | 
|---|
|  | from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from ..services.rate_limit_service import RateLimitService
from ..dependencies.config import get_settings
class SecurityMiddleware(BaseHTTPMiddleware):
    """Security middleware for rate limiting and protection"""
    def __init__(self, app, rate_limit_service: RateLimitService):
        super().__init__(app)
        self.rate_limit_service = rate_limit_service
        self.settings = get_settings()
    async def dispatch(self, request: Request, call_next):
        # Get client IP
        client_ip = request.client.host
        # Rate limiting for sensitive endpoints
        if request.url.path in ["/token", "/auth/register", "/auth/password-reset"]:
            limit = self.settings.login_rate_limit_per_minute
            key = self.rate_limit_service.get_rate_limit_key(client_ip, request.url.path)
            if not await self.rate_limit_service.check_rate_limit(key, limit):
                return JSONResponse(
                    status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                    content={"detail": "Rate limit exceeded. Please try again later."}
                )
        # General rate limiting
        general_key = self.rate_limit_service.get_rate_limit_key(client_ip, "general")
        if not await self.rate_limit_service.check_rate_limit(
            general_key, 
            self.settings.rate_limit_per_minute
        ):
            return JSONResponse(
                status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                content={"detail": "Rate limit exceeded. Please try again later."}
            )
        # Add security headers
        response = await call_next(request)
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        return response
 | 
Step 4: Authentication Endpoints
Authentication Router
Create backend/app/routers/auth.py:
| Python | 
|---|
|  | from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from ..dependencies import get_database_session, get_auth_service, get_settings, get_current_active_user
from ..services import AuthService
from ..schemas import (
    UserRegister, UserLogin, Token, TokenRefresh, 
    PasswordReset, PasswordResetConfirm, PasswordChange,
    User, UserProfile
)
from ..models import User as UserModel
from ..dependencies.config import Settings
router = APIRouter(prefix="/auth", tags=["authentication"])
@router.post("/register", response_model=User)
async def register(
    user_data: UserRegister,
    auth_service: AuthService = Depends(get_auth_service),
    settings: Settings = Depends(get_settings)
):
    """Register new user"""
    try:
        user = await auth_service.register_user(user_data)
        return user
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
@router.post("/login", response_model=Token)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    auth_service: AuthService = Depends(get_auth_service),
    settings: Settings = Depends(get_settings)
):
    """Login and get access token"""
    try:
        user = await auth_service.authenticate_user(form_data.username, form_data.password)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Incorrect email or password",
                headers={"WWW-Authenticate": "Bearer"},
            )
        access_token, refresh_token = await auth_service.create_tokens(user)
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "bearer",
            "expires_in": settings.access_token_expire_minutes * 60
        }
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
@router.post("/refresh", response_model=Token)
async def refresh_token(
    token_data: TokenRefresh,
    auth_service: AuthService = Depends(get_auth_service),
    settings: Settings = Depends(get_settings)
):
    """Refresh access token"""
    result = await auth_service.refresh_access_token(token_data.refresh_token)
    if not result:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token"
        )
    access_token, refresh_token = result
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
        "expires_in": settings.access_token_expire_minutes * 60
    }
@router.post("/logout")
async def logout(
    token_data: TokenRefresh,
    auth_service: AuthService = Depends(get_auth_service)
):
    """Logout and revoke refresh token"""
    await auth_service.revoke_refresh_token(token_data.refresh_token)
    return {"message": "Successfully logged out"}
@router.get("/me", response_model=UserProfile)
async def get_current_user_profile(
    current_user: UserModel = Depends(get_current_active_user),
    session: AsyncSession = Depends(get_database_session)
):
    """Get current user profile with additional data"""
    # Calculate additional profile data
    task_count = len(current_user.tasks) if current_user.tasks else 0
    completed_task_count = len([t for t in current_user.tasks if t.completed]) if current_user.tasks else 0
    account_age_days = (current_user.created_at - current_user.created_at).days
    return UserProfile(
        **current_user.__dict__,
        task_count=task_count,
        completed_task_count=completed_task_count,
        account_age_days=account_age_days
    )
@router.post("/change-password")
async def change_password(
    password_data: PasswordChange,
    current_user: UserModel = Depends(get_current_active_user),
    auth_service: AuthService = Depends(get_auth_service)
):
    """Change user password"""
    try:
        success = await auth_service.change_password(current_user.id, password_data)
        if success:
            return {"message": "Password changed successfully"}
        else:
            raise HTTPException(status_code=400, detail="Failed to change password")
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
@router.post("/password-reset")
async def request_password_reset(
    reset_data: PasswordReset,
    auth_service: AuthService = Depends(get_auth_service),
    background_tasks: BackgroundTasks = BackgroundTasks()
):
    """Request password reset"""
    # Add to background tasks to avoid timing attacks
    background_tasks.add_task(auth_service.request_password_reset, reset_data.email)
    return {"message": "If the email exists, a password reset link has been sent"}
@router.post("/password-reset/confirm")
async def confirm_password_reset(
    reset_data: PasswordResetConfirm,
    auth_service: AuthService = Depends(get_auth_service)
):
    """Confirm password reset with token"""
    try:
        success = await auth_service.reset_password(reset_data)
        if success:
            return {"message": "Password reset successfully"}
        else:
            raise HTTPException(status_code=400, detail="Failed to reset password")
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
 | 
Step 5: Integration and Testing
Update Main Application
Update backend/app/main.py:
| Python | 
|---|
|  | from fastapi import FastAPI, Depends, HTTPException, Query, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from typing import List, Optional
from .dependencies import get_settings
from .dependencies.config import Settings
from .middleware.security import SecurityMiddleware
from .services.rate_limit_service import RateLimitService
from .routers import auth
from .routers import tasks  # Your existing task router
# Create app with enhanced security
def create_app() -> FastAPI:
    settings = get_settings()
    app = FastAPI(
        title="Secure Task Management API",
        description="A production-ready task management API with JWT authentication",
        version="4.0.0",
        docs_url="/docs" if settings.debug else None,
        redoc_url="/redoc" if settings.debug else None,
    )
    # Security middleware
    rate_limit_service = RateLimitService(settings.redis_url)
    app.add_middleware(SecurityMiddleware, rate_limit_service=rate_limit_service)
    # CORS middleware
    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.cors_origins,
        allow_credentials=settings.cors_allow_credentials,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    # Trusted host middleware (production)
    if not settings.debug:
        app.add_middleware(
            TrustedHostMiddleware,
            allowed_hosts=["api.yourdomain.com", "localhost"]
        )
    # Include routers
    app.include_router(auth.router)
    app.include_router(tasks.router)  # Your existing task router
    # Health check
    @app.get("/health")
    async def health_check():
        return {
            "status": "healthy",
            "app_name": settings.app_name,
            "environment": settings.environment
        }
    return app
app = create_app()
 | 
Authentication Tests
Create backend/tests/test_auth.py:
| Python | 
|---|
|  | import pytest
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.main import app
from app.services import AuthService
from app.schemas import UserRegister
@pytest.mark.asyncio
async def test_user_registration(test_session: AsyncSession, test_settings):
    """Test user registration"""
    auth_service = AuthService(test_session, test_settings)
    user_data = UserRegister(
        email="[email protected]",
        username="testuser",
        password="SecurePass123!"
    )
    user = await auth_service.register_user(user_data)
    assert user.email == "[email protected]"
    assert user.username == "testuser"
    assert user.hashed_password != "SecurePass123!"  # Should be hashed
@pytest.mark.asyncio
async def test_user_authentication(test_session: AsyncSession, test_settings):
    """Test user authentication"""
    auth_service = AuthService(test_session, test_settings)
    # Register user
    user_data = UserRegister(
        email="[email protected]",
        username="testuser",
        password="SecurePass123!"
    )
    await auth_service.register_user(user_data)
    # Authenticate user
    user = await auth_service.authenticate_user("[email protected]", "SecurePass123!")
    assert user is not None
    assert user.email == "[email protected]"
    # Test wrong password
    user = await auth_service.authenticate_user("[email protected]", "wrongpassword")
    assert user is None
def test_auth_endpoints(client):
    """Test authentication endpoints"""
    # Test registration
    response = client.post("/auth/register", json={
        "email": "[email protected]",
        "username": "testuser",
        "password": "SecurePass123!"
    })
    assert response.status_code == 200
    # Test login
    response = client.post("/token", data={
        "username": "[email protected]",
        "password": "SecurePass123!"
    })
    assert response.status_code == 200
    data = response.json()
    assert "access_token" in data
    assert "refresh_token" in data
    # Test protected endpoint
    token = data["access_token"]
    response = client.get("/auth/me", headers={
        "Authorization": f"Bearer {token}"
    })
    assert response.status_code == 200
 | 
Troubleshooting
Common Issues & Solutions
JWT Token Issues:
| Python | 
|---|
|  | # Verify token expiration
from jose import jwt
from datetime import datetime
payload = jwt.decode(token, secret_key, algorithms=["HS256"])
exp = datetime.fromtimestamp(payload["exp"])
print(f"Token expires at: {exp}")
 | 
Password Hashing Problems:
| Python | 
|---|
|  | # Test password hashing
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
hashed = pwd_context.hash("password123")
verified = pwd_context.verify("password123", hashed)
print(f"Verification result: {verified}")
 | 
Rate Limiting Issues:
| Bash | 
|---|
|  | # Check Redis connection
redis-cli ping
# Monitor rate limit keys
redis-cli keys "rate_limit:*"
 | 
What You've Accomplished
Congratulations! You've implemented production-ready JWT authentication with:
- Secure user registration with password validation and hashing
- JWT token authentication with access and refresh tokens
- Rate limiting to prevent abuse and attacks
- Password management with secure reset functionality
- Security middleware with proper headers and protection
- Role-based access control with user permissions
- Email integration for verification and password reset
- Comprehensive testing with authentication scenarios
Next Steps
Enhanced Security Features:
- Two-factor authentication - SMS or TOTP integration
- OAuth integration - Google, GitHub, social logins
- API key authentication - For service-to-service calls
- Session management - Track active sessions
- Audit logging - Security event tracking
Production Considerations:
- Token blacklisting - Revoke compromised tokens
- IP whitelisting - Restrict access by location
- Device fingerprinting - Track login devices
- Security monitoring - Real-time threat detection
- Compliance - GDPR, CCPA data protection
Ready to build secure, production-ready APIs? You now have a comprehensive authentication system that can handle real-world security requirements with confidence!
Learn More
Master FastAPI: Free interactive tutorials covering FastAPI from basics to advanced topics.