FastAPI Application Lifecycle Management 2025 - Startup & Shutdown Events
Master FastAPI's application lifecycle events to build robust, production-ready applications with proper resource management, health monitoring, and graceful shutdowns. This tutorial completes our series by showing how to orchestrate all components for production deployment.
What You'll Learn
By completing this tutorial, you'll master:
- Startup event patterns for resource initialization
- Shutdown event handling for graceful cleanup
- Health check systems with comprehensive monitoring
- Database connection management with connection pooling
- Background service coordination with other systems
- Error handling and recovery during startup/shutdown
- Production deployment with proper lifecycle management
Prerequisites
What you need before starting:
- Completed previous tutorials in this series
- Understanding of async patterns in Python
- Basic knowledge of system administration concepts
- Familiarity with Docker and containerization
Time to complete: 15 minutes
What We're Building
You'll implement comprehensive lifecycle management for the Task Management API with:
- Coordinated startup - Database, Redis, background services
- Health monitoring - Deep health checks and metrics
- Graceful shutdown - Clean resource cleanup and data persistence
- Error recovery - Startup failure handling and retries
- Service coordination - Multi-service dependency management
- Production monitoring - Integration with observability tools
Lifecycle Components: - Database initialization - Connection pools, migrations - Cache warm-up - Redis initialization and data preloading - Background services - Celery workers, schedulers - Health endpoints - Comprehensive system status - Cleanup procedures - Resource deallocation and data safety
Step 1: Advanced Startup Event Management
Application Factory Pattern
Create backend/app/core/app_factory.py
:
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from ..core.database import engine, AsyncSessionLocal
from ..core.redis_client import redis_client
from ..core.celery_app import celery_app
from ..dependencies.config import get_settings
from ..middleware.security import SecurityMiddleware
from ..routers import auth, tasks, background_tasks
from ..services.health_service import HealthService
logger = logging.getLogger(__name__)
class AppLifecycleManager:
"""Manages application startup and shutdown lifecycle"""
def __init__(self):
self.settings = get_settings()
self.health_service = HealthService()
self._startup_tasks = []
self._shutdown_tasks = []
async def startup_sequence(self):
"""Execute startup sequence with error handling"""
logger.info("Starting application startup sequence...")
startup_tasks = [
("Database", self._initialize_database),
("Redis", self._initialize_redis),
("Background Services", self._initialize_background_services),
("Health Monitoring", self._initialize_health_monitoring),
("Cache Warm-up", self._warm_up_cache),
]
for name, task in startup_tasks:
try:
logger.info(f"Initializing {name}...")
await task()
logger.info(f"✓ {name} initialized successfully")
except Exception as e:
logger.error(f"✗ Failed to initialize {name}: {e}")
raise
logger.info("Application startup completed successfully")
async def shutdown_sequence(self):
"""Execute graceful shutdown sequence"""
logger.info("Starting application shutdown sequence...")
shutdown_tasks = [
("Background Services", self._shutdown_background_services),
("Database Connections", self._shutdown_database),
("Redis Connections", self._shutdown_redis),
("Health Monitoring", self._shutdown_health_monitoring),
]
for name, task in shutdown_tasks:
try:
logger.info(f"Shutting down {name}...")
await task()
logger.info(f"✓ {name} shut down successfully")
except Exception as e:
logger.error(f"✗ Error during {name} shutdown: {e}")
# Continue with other shutdowns even if one fails
logger.info("Application shutdown completed")
async def _initialize_database(self):
"""Initialize database connections and run health checks"""
try:
# Test database connection
async with engine.begin() as conn:
await conn.execute("SELECT 1")
# Check if migrations are needed (in production, this would be automated)
# await self._check_database_migrations()
# Pre-create some connections for the pool
async with AsyncSessionLocal() as session:
await session.execute("SELECT 1")
logger.info("Database initialization completed")
except Exception as e:
logger.error(f"Database initialization failed: {e}")
raise
async def _initialize_redis(self):
"""Initialize Redis connection and warm up cache"""
try:
# Test Redis connection
await redis_client.ping()
# Initialize cache keys if needed
await redis_client.setex("app:startup_time", 3600, "startup_timestamp")
logger.info("Redis initialization completed")
except Exception as e:
logger.error(f"Redis initialization failed: {e}")
raise
async def _initialize_background_services(self):
"""Initialize background services and check worker health"""
try:
# Check if Celery workers are available
inspect = celery_app.control.inspect()
active_workers = inspect.active()
if not active_workers:
logger.warning("No Celery workers detected")
else:
logger.info(f"Found {len(active_workers)} active Celery workers")
# Queue a health check task
from ..tasks.health_tasks import system_health_check
health_task = system_health_check.delay()
# Wait for health check completion (with timeout)
try:
result = health_task.get(timeout=10)
logger.info(f"Background service health check: {result}")
except Exception as e:
logger.warning(f"Background service health check timeout: {e}")
except Exception as e:
logger.error(f"Background services initialization failed: {e}")
# Don't raise here - app can function without background services
async def _initialize_health_monitoring(self):
"""Initialize health monitoring systems"""
try:
# Start health check scheduler
await self.health_service.start_monitoring()
# Register health check endpoints
await self.health_service.register_checks()
logger.info("Health monitoring initialized")
except Exception as e:
logger.error(f"Health monitoring initialization failed: {e}")
raise
async def _warm_up_cache(self):
"""Warm up application cache with frequently accessed data"""
try:
# Pre-load common data into cache
# This could include user preferences, configuration data, etc.
# Example: Cache application settings
await redis_client.setex(
"app:settings",
3600,
str(self.settings.dict())
)
logger.info("Cache warm-up completed")
except Exception as e:
logger.warning(f"Cache warm-up failed: {e}")
# Don't raise - app can function without cache
async def _shutdown_background_services(self):
"""Gracefully shutdown background services"""
try:
# Signal workers to finish current tasks
celery_app.control.cancel_consumer('celery')
# Wait for tasks to complete (with timeout)
await asyncio.sleep(5)
logger.info("Background services shutdown completed")
except Exception as e:
logger.error(f"Background services shutdown error: {e}")
async def _shutdown_database(self):
"""Close database connections"""
try:
await engine.dispose()
logger.info("Database connections closed")
except Exception as e:
logger.error(f"Database shutdown error: {e}")
async def _shutdown_redis(self):
"""Close Redis connections"""
try:
await redis_client.close()
logger.info("Redis connections closed")
except Exception as e:
logger.error(f"Redis shutdown error: {e}")
async def _shutdown_health_monitoring(self):
"""Stop health monitoring"""
try:
await self.health_service.stop_monitoring()
logger.info("Health monitoring stopped")
except Exception as e:
logger.error(f"Health monitoring shutdown error: {e}")
# Global lifecycle manager
lifecycle_manager = AppLifecycleManager()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
"""Application lifespan context manager"""
# Startup
try:
await lifecycle_manager.startup_sequence()
yield
finally:
# Shutdown
await lifecycle_manager.shutdown_sequence()
def create_app() -> FastAPI:
"""Application factory with lifecycle management"""
settings = get_settings()
app = FastAPI(
title="Task Management API - Production Ready",
description="A production-ready task management API with comprehensive lifecycle management",
version="5.0.0",
lifespan=lifespan,
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
)
# Add middleware
app.add_middleware(SecurityMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=["*"],
allow_headers=["*"],
)
if not settings.debug:
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=settings.allowed_hosts
)
# Include routers
app.include_router(auth.router)
app.include_router(tasks.router)
app.include_router(background_tasks.router)
return app
Health Check System
Create backend/app/services/health_service.py
:
import asyncio
import time
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from enum import Enum
from sqlalchemy import text
from ..core.database import AsyncSessionLocal
from ..core.redis_client import redis_client
from ..core.celery_app import celery_app
from ..dependencies.config import get_settings
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
class HealthCheck:
"""Individual health check"""
def __init__(self, name: str, check_func, timeout: int = 5, critical: bool = True):
self.name = name
self.check_func = check_func
self.timeout = timeout
self.critical = critical
self.last_check = None
self.last_result = None
async def run(self) -> Dict[str, Any]:
"""Run health check with timeout"""
start_time = time.time()
try:
result = await asyncio.wait_for(
self.check_func(),
timeout=self.timeout
)
duration = time.time() - start_time
self.last_check = datetime.utcnow()
self.last_result = {
"status": HealthStatus.HEALTHY,
"duration_ms": round(duration * 1000, 2),
"details": result,
"timestamp": self.last_check.isoformat(),
"critical": self.critical
}
except asyncio.TimeoutError:
duration = time.time() - start_time
self.last_result = {
"status": HealthStatus.UNHEALTHY,
"duration_ms": round(duration * 1000, 2),
"error": f"Health check timeout after {self.timeout}s",
"timestamp": datetime.utcnow().isoformat(),
"critical": self.critical
}
except Exception as e:
duration = time.time() - start_time
self.last_result = {
"status": HealthStatus.UNHEALTHY,
"duration_ms": round(duration * 1000, 2),
"error": str(e),
"timestamp": datetime.utcnow().isoformat(),
"critical": self.critical
}
return self.last_result
class HealthService:
"""Comprehensive health monitoring service"""
def __init__(self):
self.settings = get_settings()
self.checks: List[HealthCheck] = []
self.monitoring_task = None
self._register_default_checks()
def _register_default_checks(self):
"""Register default health checks"""
self.checks = [
HealthCheck("database", self._check_database, timeout=5, critical=True),
HealthCheck("redis", self._check_redis, timeout=3, critical=True),
HealthCheck("celery_workers", self._check_celery_workers, timeout=10, critical=False),
HealthCheck("disk_space", self._check_disk_space, timeout=2, critical=False),
HealthCheck("memory", self._check_memory, timeout=2, critical=False),
]
async def start_monitoring(self):
"""Start background health monitoring"""
if self.monitoring_task is None:
self.monitoring_task = asyncio.create_task(self._monitoring_loop())
async def stop_monitoring(self):
"""Stop background health monitoring"""
if self.monitoring_task:
self.monitoring_task.cancel()
try:
await self.monitoring_task
except asyncio.CancelledError:
pass
self.monitoring_task = None
async def _monitoring_loop(self):
"""Background monitoring loop"""
while True:
try:
await self.run_all_checks()
await asyncio.sleep(30) # Check every 30 seconds
except asyncio.CancelledError:
break
except Exception as e:
print(f"Health monitoring error: {e}")
await asyncio.sleep(10) # Retry after 10 seconds
async def run_all_checks(self) -> Dict[str, Any]:
"""Run all health checks"""
results = {}
overall_status = HealthStatus.HEALTHY
# Run all checks concurrently
check_tasks = [check.run() for check in self.checks]
check_results = await asyncio.gather(*check_tasks, return_exceptions=True)
for check, result in zip(self.checks, check_results):
if isinstance(result, Exception):
result = {
"status": HealthStatus.UNHEALTHY,
"error": str(result),
"critical": check.critical
}
results[check.name] = result
# Determine overall status
if result["status"] == HealthStatus.UNHEALTHY and result["critical"]:
overall_status = HealthStatus.UNHEALTHY
elif result["status"] == HealthStatus.UNHEALTHY and overall_status == HealthStatus.HEALTHY:
overall_status = HealthStatus.DEGRADED
health_report = {
"status": overall_status,
"timestamp": datetime.utcnow().isoformat(),
"checks": results,
"uptime": self._get_uptime(),
"version": "5.0.0"
}
# Store health report in Redis for monitoring
try:
await redis_client.setex(
"health:latest",
60,
str(health_report)
)
except:
pass # Don't fail if Redis is down
return health_report
async def get_health_summary(self) -> Dict[str, Any]:
"""Get current health summary"""
return await self.run_all_checks()
async def _check_database(self) -> Dict[str, Any]:
"""Check database connectivity and performance"""
async with AsyncSessionLocal() as session:
# Test basic connectivity
start_time = time.time()
await session.execute(text("SELECT 1"))
query_time = time.time() - start_time
# Test connection pool
pool_info = session.get_bind().pool.status()
return {
"query_time_ms": round(query_time * 1000, 2),
"pool_size": session.get_bind().pool.size(),
"pool_checked_out": session.get_bind().pool.checkedout(),
"pool_overflow": session.get_bind().pool.overflow(),
}
async def _check_redis(self) -> Dict[str, Any]:
"""Check Redis connectivity and performance"""
start_time = time.time()
# Test ping
await redis_client.ping()
ping_time = time.time() - start_time
# Get Redis info
info = await redis_client.info()
return {
"ping_time_ms": round(ping_time * 1000, 2),
"connected_clients": info.get("connected_clients", 0),
"used_memory": info.get("used_memory_human", "unknown"),
"redis_version": info.get("redis_version", "unknown"),
}
async def _check_celery_workers(self) -> Dict[str, Any]:
"""Check Celery worker status"""
inspect = celery_app.control.inspect()
# Get active workers
active_workers = inspect.active() or {}
# Get worker stats
stats = inspect.stats() or {}
return {
"active_workers": len(active_workers),
"worker_names": list(active_workers.keys()),
"total_tasks": sum(
len(tasks) for tasks in active_workers.values()
),
"worker_stats": stats
}
async def _check_disk_space(self) -> Dict[str, Any]:
"""Check available disk space"""
import shutil
total, used, free = shutil.disk_usage("/")
free_percent = (free / total) * 100
return {
"total_gb": round(total / (1024**3), 2),
"used_gb": round(used / (1024**3), 2),
"free_gb": round(free / (1024**3), 2),
"free_percent": round(free_percent, 2),
"warning": free_percent < 20 # Warn if less than 20% free
}
async def _check_memory(self) -> Dict[str, Any]:
"""Check memory usage"""
import psutil
memory = psutil.virtual_memory()
return {
"total_gb": round(memory.total / (1024**3), 2),
"available_gb": round(memory.available / (1024**3), 2),
"used_percent": memory.percent,
"warning": memory.percent > 80 # Warn if over 80% used
}
def _get_uptime(self) -> str:
"""Get application uptime"""
# This would be calculated from app start time
# For now, return a placeholder
return "unknown"
async def register_checks(self):
"""Register health check endpoints"""
# This would register the health endpoints with the FastAPI app
pass
Background Health Tasks
Create backend/app/tasks/health_tasks.py
:
import time
from celery import current_task
from ..core.celery_app import celery_app
from ..services.health_service import HealthService
@celery_app.task(bind=True)
def system_health_check(self):
"""Run system health check as background task"""
try:
current_task.update_state(
state="PROGRESS",
meta={"step": "Running health checks"}
)
# Note: This would need to be adapted for sync execution
# In a real implementation, you'd have sync versions of health checks
return {
"status": "completed",
"message": "Health check completed successfully",
"timestamp": time.time()
}
except Exception as exc:
current_task.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise
@celery_app.task
def cleanup_old_health_data():
"""Clean up old health monitoring data"""
try:
# Clean up old health records from Redis
# Clean up old log files
# Archive old metrics
return {"status": "completed", "cleaned_items": 0}
except Exception as e:
return {"status": "failed", "error": str(e)}
Step 2: Health Check Endpoints
Health Router
Create backend/app/routers/health.py
:
from fastapi import APIRouter, Depends, HTTPException, status
from typing import Dict, Any
from ..services.health_service import HealthService, HealthStatus
from ..dependencies import get_current_active_user
from ..models import User
router = APIRouter(prefix="/health", tags=["health"])
# Health service instance
health_service = HealthService()
@router.get("/")
async def health_check() -> Dict[str, Any]:
"""Basic health check endpoint"""
return {
"status": "healthy",
"timestamp": "2025-01-01T00:00:00Z",
"service": "task-management-api",
"version": "5.0.0"
}
@router.get("/detailed")
async def detailed_health_check() -> Dict[str, Any]:
"""Detailed health check with all components"""
health_report = await health_service.get_health_summary()
# Set appropriate HTTP status code based on health
if health_report["status"] == HealthStatus.UNHEALTHY:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=health_report
)
return health_report
@router.get("/live")
async def liveness_probe() -> Dict[str, str]:
"""Kubernetes liveness probe endpoint"""
# This should only check if the application is running
# Don't check external dependencies here
return {"status": "alive"}
@router.get("/ready")
async def readiness_probe() -> Dict[str, Any]:
"""Kubernetes readiness probe endpoint"""
# Check if the application is ready to receive traffic
health_report = await health_service.run_all_checks()
# Only check critical dependencies for readiness
critical_checks = {
name: result for name, result in health_report["checks"].items()
if result.get("critical", False)
}
unhealthy_critical = [
name for name, result in critical_checks.items()
if result["status"] == HealthStatus.UNHEALTHY
]
if unhealthy_critical:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"status": "not_ready",
"failed_checks": unhealthy_critical
}
)
return {
"status": "ready",
"critical_checks": len(critical_checks),
"healthy_checks": len(critical_checks) - len(unhealthy_critical)
}
@router.get("/metrics")
async def health_metrics(
current_user: User = Depends(get_current_active_user)
) -> Dict[str, Any]:
"""Health metrics endpoint (requires authentication)"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
health_report = await health_service.get_health_summary()
# Convert to metrics format
metrics = {
"health_status": health_report["status"],
"total_checks": len(health_report["checks"]),
"healthy_checks": len([
c for c in health_report["checks"].values()
if c["status"] == HealthStatus.HEALTHY
]),
"check_details": health_report["checks"],
"uptime": health_report["uptime"],
"timestamp": health_report["timestamp"]
}
return metrics
@router.post("/checks/run")
async def run_health_checks(
current_user: User = Depends(get_current_active_user)
) -> Dict[str, Any]:
"""Manually trigger health checks"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
return await health_service.run_all_checks()
Step 3: Production Configuration
Environment-based Configuration
Update backend/app/dependencies/config.py
:
from typing import List, Optional
from pydantic import BaseSettings, validator
class Settings(BaseSettings):
# ... existing settings ...
# Application lifecycle
startup_timeout: int = 60 # seconds
shutdown_timeout: int = 30 # seconds
health_check_interval: int = 30 # seconds
# Production settings
allowed_hosts: List[str] = ["localhost", "127.0.0.1"]
enable_health_monitoring: bool = True
enable_metrics: bool = True
# Monitoring integrations
prometheus_enabled: bool = False
prometheus_port: int = 9090
grafana_enabled: bool = False
# Logging
log_level: str = "INFO"
log_format: str = "json" # json or text
log_file: Optional[str] = None
# Performance
worker_connections: int = 1000
keepalive_timeout: int = 5
max_requests: int = 1000
max_requests_jitter: int = 50
@validator("allowed_hosts", pre=True)
def parse_allowed_hosts(cls, v):
if isinstance(v, str):
return [host.strip() for host in v.split(",")]
return v
class Config:
env_file = ".env"
case_sensitive = False
Logging Configuration
Create backend/app/core/logging_config.py
:
import logging
import logging.config
import sys
from typing import Dict, Any
from ..dependencies.config import get_settings
def setup_logging() -> None:
"""Configure application logging"""
settings = get_settings()
log_config: Dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
"json": {
"format": '{"timestamp": "%(asctime)s", "name": "%(name)s", "level": "%(levelname)s", "message": "%(message)s"}',
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": settings.log_format,
"stream": sys.stdout,
},
},
"root": {
"level": settings.log_level,
"handlers": ["console"],
},
"loggers": {
"uvicorn": {
"level": "INFO",
"handlers": ["console"],
"propagate": False,
},
"sqlalchemy.engine": {
"level": "WARNING",
"handlers": ["console"],
"propagate": False,
},
"celery": {
"level": "INFO",
"handlers": ["console"],
"propagate": False,
},
},
}
# Add file handler if specified
if settings.log_file:
log_config["handlers"]["file"] = {
"class": "logging.handlers.RotatingFileHandler",
"filename": settings.log_file,
"maxBytes": 10485760, # 10MB
"backupCount": 5,
"formatter": settings.log_format,
}
log_config["root"]["handlers"].append("file")
logging.config.dictConfig(log_config)
Step 4: Container and Deployment Configuration
Production Dockerfile
Create backend/Dockerfile.prod
:
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Set work directory
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Change ownership to appuser
RUN chown -R appuser:appuser /app
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD curl -f http://localhost:8000/health/live || exit 1
# Default command
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Production Docker Compose
Create docker-compose.prod.yml
:
version: '3.8'
services:
web:
build:
context: .
dockerfile: Dockerfile.prod
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://taskuser:${DB_PASSWORD}@db:5432/taskdb
- REDIS_URL=redis://redis:6379/0
- ENVIRONMENT=production
- DEBUG=false
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/live"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
db:
image: postgres:15
environment:
POSTGRES_DB: taskdb
POSTGRES_USER: taskuser
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U taskuser -d taskdb"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
celery_worker:
build:
context: .
dockerfile: Dockerfile.prod
command: celery -A app.core.celery_app worker --loglevel=info
environment:
- DATABASE_URL=postgresql+asyncpg://taskuser:${DB_PASSWORD}@db:5432/taskdb
- REDIS_URL=redis://redis:6379/0
- ENVIRONMENT=production
depends_on:
- db
- redis
restart: unless-stopped
celery_beat:
build:
context: .
dockerfile: Dockerfile.prod
command: celery -A app.core.celery_app beat --loglevel=info
environment:
- DATABASE_URL=postgresql+asyncpg://taskuser:${DB_PASSWORD}@db:5432/taskdb
- REDIS_URL=redis://redis:6379/0
- ENVIRONMENT=production
depends_on:
- db
- redis
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- web
restart: unless-stopped
volumes:
postgres_data:
redis_data:
Process Management Script
Create backend/scripts/start_production.sh
:
#!/bin/bash
set -e
echo "Starting production deployment..."
# Check environment variables
if [ -z "$DB_PASSWORD" ]; then
echo "Error: DB_PASSWORD environment variable is required"
exit 1
fi
# Run database migrations
echo "Running database migrations..."
alembic upgrade head
# Start services with health checks
echo "Starting services..."
docker-compose -f docker-compose.prod.yml up -d
# Wait for services to be healthy
echo "Waiting for services to be healthy..."
sleep 30
# Check health
echo "Checking application health..."
curl -f http://localhost:8000/health/ready || {
echo "Health check failed"
docker-compose -f docker-compose.prod.yml logs web
exit 1
}
echo "Production deployment completed successfully!"
Troubleshooting
Common Issues & Solutions
Startup Failures:
# Check service logs
docker-compose logs web
# Check specific service health
curl http://localhost:8000/health/detailed
# Debug startup sequence
docker-compose exec web python -c "
from app.core.app_factory import lifecycle_manager
import asyncio
asyncio.run(lifecycle_manager.startup_sequence())
"
Health Check Failures:
# Manual health check
curl -v http://localhost:8000/health/ready
# Check individual components
curl http://localhost:8000/health/metrics
# Redis connectivity
docker-compose exec redis redis-cli ping
# Database connectivity
docker-compose exec db pg_isready -U taskuser -d taskdb
Graceful Shutdown Issues:
# Send SIGTERM to test graceful shutdown
docker-compose kill -s SIGTERM web
# Check shutdown logs
docker-compose logs web | grep shutdown
What You've Accomplished
Congratulations! You've implemented comprehensive application lifecycle management with:
- Coordinated startup sequence with error handling and retries
- Comprehensive health monitoring with detailed component checks
- Graceful shutdown procedures with proper resource cleanup
- Production-ready configuration with environment management
- Container health checks and monitoring integration
- Error recovery patterns for robust deployment
- Observability integration for production monitoring
Next Steps
Production Enhancements:
- Observability stack - Prometheus, Grafana, ELK stack integration
- Circuit breakers - Fault tolerance patterns
- Blue-green deployment - Zero-downtime deployments
- Auto-scaling - Kubernetes HPA and VPA
- Disaster recovery - Backup and restore procedures
Advanced Monitoring:
- Custom metrics - Business metrics and KPIs
- Distributed tracing - Request flow monitoring
- Log aggregation - Centralized logging with search
- Alerting - Real-time notification systems
- Performance profiling - Application performance monitoring
Ready to deploy production-ready applications? You now have a complete application lifecycle management system that can handle real-world production requirements with confidence and reliability!
Series Completion
🎉 Congratulations! You've completed the entire FastAPI Tutorial Series 2025!
You've built a comprehensive, production-ready Task Management API featuring:
- Modern FastAPI patterns with TypeScript-like development experience
- PostgreSQL integration with async operations and migrations
- Clean architecture with dependency injection and service layers
- JWT authentication with comprehensive security features
- Background task processing with Celery and Redis
- Application lifecycle management with health monitoring
Your API is now ready for production deployment with enterprise-grade patterns and best practices!