JWT Authentication in FastAPI 2025 - Secure Production APIs
Learn to implement production-ready JWT authentication in FastAPI with secure user management, password hashing, and comprehensive security patterns. This tutorial builds on our dependency injection patterns to create a complete authentication system.
What You'll Learn
By completing this tutorial, you'll master:
- JWT token authentication with secure implementation patterns
- User registration and login with password security
- Protected route implementation with role-based access
- Token refresh mechanisms for better user experience
- Password reset functionality with secure token handling
- Security best practices for production applications
- Integration with dependency injection for clean architecture
Prerequisites
What you need before starting:
- Completed the Dependency Injection Tutorial - We'll extend that app
- Basic understanding of JWT (JSON Web Tokens)
- Knowledge of cryptography concepts (hashing, salting)
- Understanding of HTTP security headers and practices
Time to complete: 35 minutes
What We're Building
You'll add complete authentication to the Task Management API with:
- User registration - Secure account creation with validation
- Login system - JWT token generation and validation
- Protected endpoints - Secure access to user resources
- Token refresh - Seamless user experience with token renewal
- Password management - Secure password reset and change
- Role-based access - Different permission levels
- Security headers - Protection against common attacks
Security Features: - bcrypt password hashing with salts - JWT tokens with expiration and refresh - Rate limiting for login attempts - CORS protection with proper configuration - Input validation against injection attacks
Step 1: Security Infrastructure Setup
Install Security Dependencies
Add to your project dependencies:
cd backend
# Add authentication and security dependencies
poetry add python-jose[cryptography] passlib[bcrypt] python-multipart
# Add rate limiting and security
poetry add slowapi redis
# Add email support for password reset
poetry add fastapi-mail
Security Configuration
Update backend/app/dependencies/config.py
:
import secrets
from functools import lru_cache
from typing import Optional, List
from pydantic import BaseSettings, validator, EmailStr
class Settings(BaseSettings):
"""Enhanced settings with security configuration"""
# Database
database_url: str
database_url_sync: str
# Security - JWT
secret_key: str = secrets.token_urlsafe(32)
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
# Security - Password
password_min_length: int = 8
password_require_special: bool = True
password_require_uppercase: bool = True
password_require_numbers: bool = True
# Security - Rate Limiting
rate_limit_per_minute: int = 60
login_rate_limit_per_minute: int = 5
# Security - CORS
cors_origins: List[str] = ["http://localhost:3000", "http://localhost:5173"]
cors_allow_credentials: bool = True
# Application
app_name: str = "Task Management API"
debug: bool = False
environment: str = "production"
# Redis for rate limiting and caching
redis_url: str = "redis://localhost:6379"
# Email configuration
mail_server: Optional[str] = None
mail_port: int = 587
mail_username: Optional[str] = None
mail_password: Optional[str] = None
mail_from: Optional[EmailStr] = None
mail_tls: bool = True
mail_ssl: bool = False
# Password reset
password_reset_expire_minutes: int = 30
password_reset_secret: str = secrets.token_urlsafe(32)
@validator("secret_key", pre=True)
def validate_secret_key(cls, v):
if not v or len(v) < 32:
return secrets.token_urlsafe(32)
return v
class Config:
env_file = ".env"
case_sensitive = False
@lru_cache()
def get_settings() -> Settings:
"""Settings dependency with caching"""
return Settings()
Enhanced User Model
Update backend/app/models.py
to add authentication fields:
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True, nullable=False)
username = Column(String(100), unique=True, index=True, nullable=False)
full_name = Column(String(255), nullable=True)
# Authentication fields
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
email_verified = Column(Boolean, default=False)
# Security tracking
last_login = Column(DateTime(timezone=True), nullable=True)
failed_login_attempts = Column(Integer, default=0)
locked_until = Column(DateTime(timezone=True), nullable=True)
# Password reset
password_reset_token = Column(String(255), nullable=True)
password_reset_expires = Column(DateTime(timezone=True), nullable=True)
# Timestamps
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# Relationships
tasks = relationship("Task", back_populates="owner", cascade="all, delete-orphan")
def __repr__(self):
return f"<User(id={self.id}, email='{self.email}', active={self.is_active})>"
# Add RefreshToken model for secure token management
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id = Column(Integer, primary_key=True, index=True)
token = Column(String(255), unique=True, index=True, nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
expires_at = Column(DateTime(timezone=True), nullable=False)
is_revoked = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationship
user = relationship("User")
Authentication Schemas
Update backend/app/schemas.py
:
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field, validator
import re
# Authentication Schemas
class UserRegister(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
full_name: Optional[str] = Field(None, max_length=255)
password: str = Field(..., min_length=8)
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Username can only contain letters, numbers, underscores, and hyphens')
return v
@validator('password')
def validate_password(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters long')
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain at least one number')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain at least one special character')
return v
class UserLogin(BaseModel):
email: EmailStr
password: str
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
class TokenRefresh(BaseModel):
refresh_token: str
class PasswordReset(BaseModel):
email: EmailStr
class PasswordResetConfirm(BaseModel):
token: str
new_password: str = Field(..., min_length=8)
@validator('new_password')
def validate_password(cls, v):
# Same validation as UserRegister
if len(v) < 8:
raise ValueError('Password must be at least 8 characters long')
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain at least one number')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain at least one special character')
return v
class PasswordChange(BaseModel):
current_password: str
new_password: str = Field(..., min_length=8)
@validator('new_password')
def validate_password(cls, v):
# Same validation as UserRegister
if len(v) < 8:
raise ValueError('Password must be at least 8 characters long')
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain at least one number')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain at least one special character')
return v
# Enhanced User Schemas
class User(BaseModel):
id: int
email: EmailStr
username: str
full_name: Optional[str] = None
is_active: bool
is_superuser: bool
email_verified: bool
last_login: Optional[datetime] = None
created_at: datetime
class Config:
from_attributes = True
class UserProfile(User):
"""Extended user profile with additional fields"""
task_count: Optional[int] = None
completed_task_count: Optional[int] = None
account_age_days: Optional[int] = None
Step 2: Enhanced Authentication Service
Complete Authentication Service
Create backend/app/services/auth_service.py
:
import secrets
from datetime import datetime, timedelta
from typing import Optional, Tuple
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import and_, or_
from ..models import User, RefreshToken
from ..schemas import UserRegister, UserLogin, PasswordChange, PasswordResetConfirm
from .user_service import UserService
from .email_service import EmailService
class AuthService:
"""Enhanced authentication service with security features"""
def __init__(self, session: AsyncSession, settings):
self.session = session
self.user_service = UserService(session)
self.email_service = EmailService(settings)
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.settings = settings
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return self.pwd_context.verify(plain_password, hashed_password)
def get_password_hash(self, password: str) -> str:
"""Generate password hash"""
return self.pwd_context.hash(password)
async def register_user(self, user_data: UserRegister) -> User:
"""Register new user with security checks"""
# Check if user already exists
existing_user = await self.user_service.get_by_email(user_data.email)
if existing_user:
raise ValueError("Email already registered")
existing_username = await self.user_service.get_by_username(user_data.username)
if existing_username:
raise ValueError("Username already taken")
# Create user with hashed password
hashed_password = self.get_password_hash(user_data.password)
user = User(
email=user_data.email,
username=user_data.username,
full_name=user_data.full_name,
hashed_password=hashed_password,
is_active=True,
email_verified=False # Require email verification
)
self.session.add(user)
await self.session.flush()
await self.session.refresh(user)
# Send verification email (optional)
if self.settings.mail_server:
await self.email_service.send_verification_email(user)
return user
async def authenticate_user(self, email: str, password: str) -> Optional[User]:
"""Authenticate user with security measures"""
user = await self.user_service.get_by_email(email)
if not user:
return None
# Check if account is locked
if user.locked_until and user.locked_until > datetime.utcnow():
raise ValueError(f"Account locked until {user.locked_until}")
# Check if account is active
if not user.is_active:
raise ValueError("Account is deactivated")
# Verify password
if not self.verify_password(password, user.hashed_password):
# Increment failed login attempts
user.failed_login_attempts += 1
# Lock account after 5 failed attempts
if user.failed_login_attempts >= 5:
user.locked_until = datetime.utcnow() + timedelta(minutes=30)
await self.session.commit()
return None
# Successful login - reset failed attempts and update last login
user.failed_login_attempts = 0
user.locked_until = None
user.last_login = datetime.utcnow()
await self.session.commit()
return user
def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=self.settings.access_token_expire_minutes)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, self.settings.secret_key, algorithm=self.settings.algorithm)
return encoded_jwt
async def create_refresh_token(self, user_id: int) -> str:
"""Create and store refresh token"""
token = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(days=self.settings.refresh_token_expire_days)
refresh_token = RefreshToken(
token=token,
user_id=user_id,
expires_at=expires_at
)
self.session.add(refresh_token)
await self.session.commit()
return token
async def create_tokens(self, user: User) -> Tuple[str, str]:
"""Create both access and refresh tokens"""
access_token = self.create_access_token(
data={"sub": user.email, "user_id": user.id}
)
refresh_token = await self.create_refresh_token(user.id)
return access_token, refresh_token
def verify_token(self, token: str) -> Optional[dict]:
"""Verify JWT token and return payload"""
try:
payload = jwt.decode(token, self.settings.secret_key, algorithms=[self.settings.algorithm])
return payload
except JWTError:
return None
async def get_user_from_token(self, token: str) -> Optional[User]:
"""Get user from JWT token"""
payload = self.verify_token(token)
if not payload:
return None
email: str = payload.get("sub")
if not email:
return None
user = await self.user_service.get_by_email(email)
return user
async def refresh_access_token(self, refresh_token: str) -> Optional[Tuple[str, str]]:
"""Refresh access token using refresh token"""
# Find refresh token in database
result = await self.session.execute(
select(RefreshToken).where(
and_(
RefreshToken.token == refresh_token,
RefreshToken.is_revoked == False,
RefreshToken.expires_at > datetime.utcnow()
)
)
)
token_record = result.scalar_one_or_none()
if not token_record:
return None
# Get user
user = await self.user_service.get_by_id(token_record.user_id)
if not user or not user.is_active:
return None
# Create new tokens
new_access_token, new_refresh_token = await self.create_tokens(user)
# Revoke old refresh token
token_record.is_revoked = True
await self.session.commit()
return new_access_token, new_refresh_token
async def revoke_refresh_token(self, refresh_token: str) -> bool:
"""Revoke refresh token (logout)"""
result = await self.session.execute(
select(RefreshToken).where(RefreshToken.token == refresh_token)
)
token_record = result.scalar_one_or_none()
if token_record:
token_record.is_revoked = True
await self.session.commit()
return True
return False
async def change_password(self, user_id: int, password_data: PasswordChange) -> bool:
"""Change user password"""
user = await self.user_service.get_by_id(user_id)
if not user:
return False
# Verify current password
if not self.verify_password(password_data.current_password, user.hashed_password):
raise ValueError("Current password is incorrect")
# Update password
user.hashed_password = self.get_password_hash(password_data.new_password)
await self.session.commit()
return True
async def request_password_reset(self, email: str) -> bool:
"""Request password reset"""
user = await self.user_service.get_by_email(email)
if not user:
# Don't reveal if email exists
return True
# Generate reset token
reset_token = secrets.token_urlsafe(32)
user.password_reset_token = reset_token
user.password_reset_expires = datetime.utcnow() + timedelta(
minutes=self.settings.password_reset_expire_minutes
)
await self.session.commit()
# Send reset email
if self.settings.mail_server:
await self.email_service.send_password_reset_email(user, reset_token)
return True
async def reset_password(self, reset_data: PasswordResetConfirm) -> bool:
"""Reset password using token"""
# Find user with valid reset token
result = await self.session.execute(
select(User).where(
and_(
User.password_reset_token == reset_data.token,
User.password_reset_expires > datetime.utcnow()
)
)
)
user = result.scalar_one_or_none()
if not user:
raise ValueError("Invalid or expired reset token")
# Update password and clear reset token
user.hashed_password = self.get_password_hash(reset_data.new_password)
user.password_reset_token = None
user.password_reset_expires = None
user.failed_login_attempts = 0 # Reset failed attempts
user.locked_until = None # Unlock account
await self.session.commit()
return True
Step 3: Security Middleware and Rate Limiting
Rate Limiting Service
Create backend/app/services/rate_limit_service.py
:
import redis
from datetime import datetime, timedelta
from typing import Optional
from fastapi import HTTPException, status
class RateLimitService:
"""Rate limiting service using Redis"""
def __init__(self, redis_url: str):
self.redis_client = redis.from_url(redis_url, decode_responses=True)
async def check_rate_limit(
self,
key: str,
limit: int,
window_minutes: int = 1
) -> bool:
"""Check if request is within rate limit"""
try:
current_time = datetime.utcnow()
window_start = current_time - timedelta(minutes=window_minutes)
# Use sliding window counter
pipe = self.redis_client.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start.timestamp())
# Count current entries
pipe.zcard(key)
# Execute pipeline
results = pipe.execute()
current_count = results[1]
if current_count >= limit:
return False
# Add current request
self.redis_client.zadd(key, {str(current_time.timestamp()): current_time.timestamp()})
self.redis_client.expire(key, window_minutes * 60)
return True
except Exception:
# If Redis is down, allow the request
return True
def get_rate_limit_key(self, identifier: str, endpoint: str) -> str:
"""Generate rate limit key"""
return f"rate_limit:{endpoint}:{identifier}"
Security Middleware
Create backend/app/middleware/security.py
:
from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from ..services.rate_limit_service import RateLimitService
from ..dependencies.config import get_settings
class SecurityMiddleware(BaseHTTPMiddleware):
"""Security middleware for rate limiting and protection"""
def __init__(self, app, rate_limit_service: RateLimitService):
super().__init__(app)
self.rate_limit_service = rate_limit_service
self.settings = get_settings()
async def dispatch(self, request: Request, call_next):
# Get client IP
client_ip = request.client.host
# Rate limiting for sensitive endpoints
if request.url.path in ["/token", "/auth/register", "/auth/password-reset"]:
limit = self.settings.login_rate_limit_per_minute
key = self.rate_limit_service.get_rate_limit_key(client_ip, request.url.path)
if not await self.rate_limit_service.check_rate_limit(key, limit):
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Rate limit exceeded. Please try again later."}
)
# General rate limiting
general_key = self.rate_limit_service.get_rate_limit_key(client_ip, "general")
if not await self.rate_limit_service.check_rate_limit(
general_key,
self.settings.rate_limit_per_minute
):
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Rate limit exceeded. Please try again later."}
)
# Add security headers
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response
Step 4: Authentication Endpoints
Authentication Router
Create backend/app/routers/auth.py
:
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from ..dependencies import get_database_session, get_auth_service, get_settings, get_current_active_user
from ..services import AuthService
from ..schemas import (
UserRegister, UserLogin, Token, TokenRefresh,
PasswordReset, PasswordResetConfirm, PasswordChange,
User, UserProfile
)
from ..models import User as UserModel
from ..dependencies.config import Settings
router = APIRouter(prefix="/auth", tags=["authentication"])
@router.post("/register", response_model=User)
async def register(
user_data: UserRegister,
auth_service: AuthService = Depends(get_auth_service),
settings: Settings = Depends(get_settings)
):
"""Register new user"""
try:
user = await auth_service.register_user(user_data)
return user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
auth_service: AuthService = Depends(get_auth_service),
settings: Settings = Depends(get_settings)
):
"""Login and get access token"""
try:
user = await auth_service.authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token, refresh_token = await auth_service.create_tokens(user)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": settings.access_token_expire_minutes * 60
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/refresh", response_model=Token)
async def refresh_token(
token_data: TokenRefresh,
auth_service: AuthService = Depends(get_auth_service),
settings: Settings = Depends(get_settings)
):
"""Refresh access token"""
result = await auth_service.refresh_access_token(token_data.refresh_token)
if not result:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
access_token, refresh_token = result
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": settings.access_token_expire_minutes * 60
}
@router.post("/logout")
async def logout(
token_data: TokenRefresh,
auth_service: AuthService = Depends(get_auth_service)
):
"""Logout and revoke refresh token"""
await auth_service.revoke_refresh_token(token_data.refresh_token)
return {"message": "Successfully logged out"}
@router.get("/me", response_model=UserProfile)
async def get_current_user_profile(
current_user: UserModel = Depends(get_current_active_user),
session: AsyncSession = Depends(get_database_session)
):
"""Get current user profile with additional data"""
# Calculate additional profile data
task_count = len(current_user.tasks) if current_user.tasks else 0
completed_task_count = len([t for t in current_user.tasks if t.completed]) if current_user.tasks else 0
account_age_days = (current_user.created_at - current_user.created_at).days
return UserProfile(
**current_user.__dict__,
task_count=task_count,
completed_task_count=completed_task_count,
account_age_days=account_age_days
)
@router.post("/change-password")
async def change_password(
password_data: PasswordChange,
current_user: UserModel = Depends(get_current_active_user),
auth_service: AuthService = Depends(get_auth_service)
):
"""Change user password"""
try:
success = await auth_service.change_password(current_user.id, password_data)
if success:
return {"message": "Password changed successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to change password")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/password-reset")
async def request_password_reset(
reset_data: PasswordReset,
auth_service: AuthService = Depends(get_auth_service),
background_tasks: BackgroundTasks = BackgroundTasks()
):
"""Request password reset"""
# Add to background tasks to avoid timing attacks
background_tasks.add_task(auth_service.request_password_reset, reset_data.email)
return {"message": "If the email exists, a password reset link has been sent"}
@router.post("/password-reset/confirm")
async def confirm_password_reset(
reset_data: PasswordResetConfirm,
auth_service: AuthService = Depends(get_auth_service)
):
"""Confirm password reset with token"""
try:
success = await auth_service.reset_password(reset_data)
if success:
return {"message": "Password reset successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to reset password")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
Step 5: Integration and Testing
Update Main Application
Update backend/app/main.py
:
from fastapi import FastAPI, Depends, HTTPException, Query, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from typing import List, Optional
from .dependencies import get_settings
from .dependencies.config import Settings
from .middleware.security import SecurityMiddleware
from .services.rate_limit_service import RateLimitService
from .routers import auth
from .routers import tasks # Your existing task router
# Create app with enhanced security
def create_app() -> FastAPI:
settings = get_settings()
app = FastAPI(
title="Secure Task Management API",
description="A production-ready task management API with JWT authentication",
version="4.0.0",
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
)
# Security middleware
rate_limit_service = RateLimitService(settings.redis_url)
app.add_middleware(SecurityMiddleware, rate_limit_service=rate_limit_service)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=["*"],
allow_headers=["*"],
)
# Trusted host middleware (production)
if not settings.debug:
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["api.yourdomain.com", "localhost"]
)
# Include routers
app.include_router(auth.router)
app.include_router(tasks.router) # Your existing task router
# Health check
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"app_name": settings.app_name,
"environment": settings.environment
}
return app
app = create_app()
Authentication Tests
Create backend/tests/test_auth.py
:
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.main import app
from app.services import AuthService
from app.schemas import UserRegister
@pytest.mark.asyncio
async def test_user_registration(test_session: AsyncSession, test_settings):
"""Test user registration"""
auth_service = AuthService(test_session, test_settings)
user_data = UserRegister(
email="[email protected]",
username="testuser",
password="SecurePass123!"
)
user = await auth_service.register_user(user_data)
assert user.email == "[email protected]"
assert user.username == "testuser"
assert user.hashed_password != "SecurePass123!" # Should be hashed
@pytest.mark.asyncio
async def test_user_authentication(test_session: AsyncSession, test_settings):
"""Test user authentication"""
auth_service = AuthService(test_session, test_settings)
# Register user
user_data = UserRegister(
email="[email protected]",
username="testuser",
password="SecurePass123!"
)
await auth_service.register_user(user_data)
# Authenticate user
user = await auth_service.authenticate_user("[email protected]", "SecurePass123!")
assert user is not None
assert user.email == "[email protected]"
# Test wrong password
user = await auth_service.authenticate_user("[email protected]", "wrongpassword")
assert user is None
def test_auth_endpoints(client):
"""Test authentication endpoints"""
# Test registration
response = client.post("/auth/register", json={
"email": "[email protected]",
"username": "testuser",
"password": "SecurePass123!"
})
assert response.status_code == 200
# Test login
response = client.post("/token", data={
"username": "[email protected]",
"password": "SecurePass123!"
})
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
# Test protected endpoint
token = data["access_token"]
response = client.get("/auth/me", headers={
"Authorization": f"Bearer {token}"
})
assert response.status_code == 200
Troubleshooting
Common Issues & Solutions
JWT Token Issues:
# Verify token expiration
from jose import jwt
from datetime import datetime
payload = jwt.decode(token, secret_key, algorithms=["HS256"])
exp = datetime.fromtimestamp(payload["exp"])
print(f"Token expires at: {exp}")
Password Hashing Problems:
# Test password hashing
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
hashed = pwd_context.hash("password123")
verified = pwd_context.verify("password123", hashed)
print(f"Verification result: {verified}")
Rate Limiting Issues:
What You've Accomplished
Congratulations! You've implemented production-ready JWT authentication with:
- Secure user registration with password validation and hashing
- JWT token authentication with access and refresh tokens
- Rate limiting to prevent abuse and attacks
- Password management with secure reset functionality
- Security middleware with proper headers and protection
- Role-based access control with user permissions
- Email integration for verification and password reset
- Comprehensive testing with authentication scenarios
Next Steps
Enhanced Security Features:
- Two-factor authentication - SMS or TOTP integration
- OAuth integration - Google, GitHub, social logins
- API key authentication - For service-to-service calls
- Session management - Track active sessions
- Audit logging - Security event tracking
Production Considerations:
- Token blacklisting - Revoke compromised tokens
- IP whitelisting - Restrict access by location
- Device fingerprinting - Track login devices
- Security monitoring - Real-time threat detection
- Compliance - GDPR, CCPA data protection
Ready to build secure, production-ready APIs? You now have a comprehensive authentication system that can handle real-world security requirements with confidence!