Skip to content

FastAPI Application Lifecycle Management 2025 - Startup & Shutdown Events

Master FastAPI's application lifecycle events to build robust, production-ready applications with proper resource management, health monitoring, and graceful shutdowns. This tutorial completes our series by showing how to orchestrate all components for production deployment.

What You'll Learn

By completing this tutorial, you'll master:

  • Startup event patterns for resource initialization
  • Shutdown event handling for graceful cleanup
  • Health check systems with comprehensive monitoring
  • Database connection management with connection pooling
  • Background service coordination with other systems
  • Error handling and recovery during startup/shutdown
  • Production deployment with proper lifecycle management

Prerequisites

What you need before starting:

  • Completed previous tutorials in this series
  • Understanding of async patterns in Python
  • Basic knowledge of system administration concepts
  • Familiarity with Docker and containerization

Time to complete: 15 minutes


What We're Building

You'll implement comprehensive lifecycle management for the Task Management API with:

  • Coordinated startup - Database, Redis, background services
  • Health monitoring - Deep health checks and metrics
  • Graceful shutdown - Clean resource cleanup and data persistence
  • Error recovery - Startup failure handling and retries
  • Service coordination - Multi-service dependency management
  • Production monitoring - Integration with observability tools

Lifecycle Components: - Database initialization - Connection pools, migrations - Cache warm-up - Redis initialization and data preloading - Background services - Celery workers, schedulers - Health endpoints - Comprehensive system status - Cleanup procedures - Resource deallocation and data safety


Step 1: Advanced Startup Event Management

Application Factory Pattern

Create backend/app/core/app_factory.py:

import asyncio
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware

from ..core.database import engine, AsyncSessionLocal
from ..core.redis_client import redis_client
from ..core.celery_app import celery_app
from ..dependencies.config import get_settings
from ..middleware.security import SecurityMiddleware
from ..routers import auth, tasks, background_tasks
from ..services.health_service import HealthService

logger = logging.getLogger(__name__)

class AppLifecycleManager:
    """Manages application startup and shutdown lifecycle"""

    def __init__(self):
        self.settings = get_settings()
        self.health_service = HealthService()
        self._startup_tasks = []
        self._shutdown_tasks = []

    async def startup_sequence(self):
        """Execute startup sequence with error handling"""
        logger.info("Starting application startup sequence...")

        startup_tasks = [
            ("Database", self._initialize_database),
            ("Redis", self._initialize_redis),
            ("Background Services", self._initialize_background_services),
            ("Health Monitoring", self._initialize_health_monitoring),
            ("Cache Warm-up", self._warm_up_cache),
        ]

        for name, task in startup_tasks:
            try:
                logger.info(f"Initializing {name}...")
                await task()
                logger.info(f"✓ {name} initialized successfully")
            except Exception as e:
                logger.error(f"✗ Failed to initialize {name}: {e}")
                raise

        logger.info("Application startup completed successfully")

    async def shutdown_sequence(self):
        """Execute graceful shutdown sequence"""
        logger.info("Starting application shutdown sequence...")

        shutdown_tasks = [
            ("Background Services", self._shutdown_background_services),
            ("Database Connections", self._shutdown_database),
            ("Redis Connections", self._shutdown_redis),
            ("Health Monitoring", self._shutdown_health_monitoring),
        ]

        for name, task in shutdown_tasks:
            try:
                logger.info(f"Shutting down {name}...")
                await task()
                logger.info(f"✓ {name} shut down successfully")
            except Exception as e:
                logger.error(f"✗ Error during {name} shutdown: {e}")
                # Continue with other shutdowns even if one fails

        logger.info("Application shutdown completed")

    async def _initialize_database(self):
        """Initialize database connections and run health checks"""
        try:
            # Test database connection
            async with engine.begin() as conn:
                await conn.execute("SELECT 1")

            # Check if migrations are needed (in production, this would be automated)
            # await self._check_database_migrations()

            # Pre-create some connections for the pool
            async with AsyncSessionLocal() as session:
                await session.execute("SELECT 1")

            logger.info("Database initialization completed")

        except Exception as e:
            logger.error(f"Database initialization failed: {e}")
            raise

    async def _initialize_redis(self):
        """Initialize Redis connection and warm up cache"""
        try:
            # Test Redis connection
            await redis_client.ping()

            # Initialize cache keys if needed
            await redis_client.setex("app:startup_time", 3600, "startup_timestamp")

            logger.info("Redis initialization completed")

        except Exception as e:
            logger.error(f"Redis initialization failed: {e}")
            raise

    async def _initialize_background_services(self):
        """Initialize background services and check worker health"""
        try:
            # Check if Celery workers are available
            inspect = celery_app.control.inspect()
            active_workers = inspect.active()

            if not active_workers:
                logger.warning("No Celery workers detected")
            else:
                logger.info(f"Found {len(active_workers)} active Celery workers")

            # Queue a health check task
            from ..tasks.health_tasks import system_health_check
            health_task = system_health_check.delay()

            # Wait for health check completion (with timeout)
            try:
                result = health_task.get(timeout=10)
                logger.info(f"Background service health check: {result}")
            except Exception as e:
                logger.warning(f"Background service health check timeout: {e}")

        except Exception as e:
            logger.error(f"Background services initialization failed: {e}")
            # Don't raise here - app can function without background services

    async def _initialize_health_monitoring(self):
        """Initialize health monitoring systems"""
        try:
            # Start health check scheduler
            await self.health_service.start_monitoring()

            # Register health check endpoints
            await self.health_service.register_checks()

            logger.info("Health monitoring initialized")

        except Exception as e:
            logger.error(f"Health monitoring initialization failed: {e}")
            raise

    async def _warm_up_cache(self):
        """Warm up application cache with frequently accessed data"""
        try:
            # Pre-load common data into cache
            # This could include user preferences, configuration data, etc.

            # Example: Cache application settings
            await redis_client.setex(
                "app:settings", 
                3600, 
                str(self.settings.dict())
            )

            logger.info("Cache warm-up completed")

        except Exception as e:
            logger.warning(f"Cache warm-up failed: {e}")
            # Don't raise - app can function without cache

    async def _shutdown_background_services(self):
        """Gracefully shutdown background services"""
        try:
            # Signal workers to finish current tasks
            celery_app.control.cancel_consumer('celery')

            # Wait for tasks to complete (with timeout)
            await asyncio.sleep(5)

            logger.info("Background services shutdown completed")

        except Exception as e:
            logger.error(f"Background services shutdown error: {e}")

    async def _shutdown_database(self):
        """Close database connections"""
        try:
            await engine.dispose()
            logger.info("Database connections closed")

        except Exception as e:
            logger.error(f"Database shutdown error: {e}")

    async def _shutdown_redis(self):
        """Close Redis connections"""
        try:
            await redis_client.close()
            logger.info("Redis connections closed")

        except Exception as e:
            logger.error(f"Redis shutdown error: {e}")

    async def _shutdown_health_monitoring(self):
        """Stop health monitoring"""
        try:
            await self.health_service.stop_monitoring()
            logger.info("Health monitoring stopped")

        except Exception as e:
            logger.error(f"Health monitoring shutdown error: {e}")

# Global lifecycle manager
lifecycle_manager = AppLifecycleManager()

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
    """Application lifespan context manager"""
    # Startup
    try:
        await lifecycle_manager.startup_sequence()
        yield
    finally:
        # Shutdown
        await lifecycle_manager.shutdown_sequence()

def create_app() -> FastAPI:
    """Application factory with lifecycle management"""
    settings = get_settings()

    app = FastAPI(
        title="Task Management API - Production Ready",
        description="A production-ready task management API with comprehensive lifecycle management",
        version="5.0.0",
        lifespan=lifespan,
        docs_url="/docs" if settings.debug else None,
        redoc_url="/redoc" if settings.debug else None,
    )

    # Add middleware
    app.add_middleware(SecurityMiddleware)
    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.cors_origins,
        allow_credentials=settings.cors_allow_credentials,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    if not settings.debug:
        app.add_middleware(
            TrustedHostMiddleware,
            allowed_hosts=settings.allowed_hosts
        )

    # Include routers
    app.include_router(auth.router)
    app.include_router(tasks.router)
    app.include_router(background_tasks.router)

    return app

Health Check System

Create backend/app/services/health_service.py:

import asyncio
import time
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from enum import Enum

from sqlalchemy import text
from ..core.database import AsyncSessionLocal
from ..core.redis_client import redis_client
from ..core.celery_app import celery_app
from ..dependencies.config import get_settings

class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

class HealthCheck:
    """Individual health check"""

    def __init__(self, name: str, check_func, timeout: int = 5, critical: bool = True):
        self.name = name
        self.check_func = check_func
        self.timeout = timeout
        self.critical = critical
        self.last_check = None
        self.last_result = None

    async def run(self) -> Dict[str, Any]:
        """Run health check with timeout"""
        start_time = time.time()

        try:
            result = await asyncio.wait_for(
                self.check_func(),
                timeout=self.timeout
            )

            duration = time.time() - start_time
            self.last_check = datetime.utcnow()
            self.last_result = {
                "status": HealthStatus.HEALTHY,
                "duration_ms": round(duration * 1000, 2),
                "details": result,
                "timestamp": self.last_check.isoformat(),
                "critical": self.critical
            }

        except asyncio.TimeoutError:
            duration = time.time() - start_time
            self.last_result = {
                "status": HealthStatus.UNHEALTHY,
                "duration_ms": round(duration * 1000, 2),
                "error": f"Health check timeout after {self.timeout}s",
                "timestamp": datetime.utcnow().isoformat(),
                "critical": self.critical
            }

        except Exception as e:
            duration = time.time() - start_time
            self.last_result = {
                "status": HealthStatus.UNHEALTHY,
                "duration_ms": round(duration * 1000, 2),
                "error": str(e),
                "timestamp": datetime.utcnow().isoformat(),
                "critical": self.critical
            }

        return self.last_result

class HealthService:
    """Comprehensive health monitoring service"""

    def __init__(self):
        self.settings = get_settings()
        self.checks: List[HealthCheck] = []
        self.monitoring_task = None
        self._register_default_checks()

    def _register_default_checks(self):
        """Register default health checks"""
        self.checks = [
            HealthCheck("database", self._check_database, timeout=5, critical=True),
            HealthCheck("redis", self._check_redis, timeout=3, critical=True),
            HealthCheck("celery_workers", self._check_celery_workers, timeout=10, critical=False),
            HealthCheck("disk_space", self._check_disk_space, timeout=2, critical=False),
            HealthCheck("memory", self._check_memory, timeout=2, critical=False),
        ]

    async def start_monitoring(self):
        """Start background health monitoring"""
        if self.monitoring_task is None:
            self.monitoring_task = asyncio.create_task(self._monitoring_loop())

    async def stop_monitoring(self):
        """Stop background health monitoring"""
        if self.monitoring_task:
            self.monitoring_task.cancel()
            try:
                await self.monitoring_task
            except asyncio.CancelledError:
                pass
            self.monitoring_task = None

    async def _monitoring_loop(self):
        """Background monitoring loop"""
        while True:
            try:
                await self.run_all_checks()
                await asyncio.sleep(30)  # Check every 30 seconds
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Health monitoring error: {e}")
                await asyncio.sleep(10)  # Retry after 10 seconds

    async def run_all_checks(self) -> Dict[str, Any]:
        """Run all health checks"""
        results = {}
        overall_status = HealthStatus.HEALTHY

        # Run all checks concurrently
        check_tasks = [check.run() for check in self.checks]
        check_results = await asyncio.gather(*check_tasks, return_exceptions=True)

        for check, result in zip(self.checks, check_results):
            if isinstance(result, Exception):
                result = {
                    "status": HealthStatus.UNHEALTHY,
                    "error": str(result),
                    "critical": check.critical
                }

            results[check.name] = result

            # Determine overall status
            if result["status"] == HealthStatus.UNHEALTHY and result["critical"]:
                overall_status = HealthStatus.UNHEALTHY
            elif result["status"] == HealthStatus.UNHEALTHY and overall_status == HealthStatus.HEALTHY:
                overall_status = HealthStatus.DEGRADED

        health_report = {
            "status": overall_status,
            "timestamp": datetime.utcnow().isoformat(),
            "checks": results,
            "uptime": self._get_uptime(),
            "version": "5.0.0"
        }

        # Store health report in Redis for monitoring
        try:
            await redis_client.setex(
                "health:latest",
                60,
                str(health_report)
            )
        except:
            pass  # Don't fail if Redis is down

        return health_report

    async def get_health_summary(self) -> Dict[str, Any]:
        """Get current health summary"""
        return await self.run_all_checks()

    async def _check_database(self) -> Dict[str, Any]:
        """Check database connectivity and performance"""
        async with AsyncSessionLocal() as session:
            # Test basic connectivity
            start_time = time.time()
            await session.execute(text("SELECT 1"))
            query_time = time.time() - start_time

            # Test connection pool
            pool_info = session.get_bind().pool.status()

            return {
                "query_time_ms": round(query_time * 1000, 2),
                "pool_size": session.get_bind().pool.size(),
                "pool_checked_out": session.get_bind().pool.checkedout(),
                "pool_overflow": session.get_bind().pool.overflow(),
            }

    async def _check_redis(self) -> Dict[str, Any]:
        """Check Redis connectivity and performance"""
        start_time = time.time()

        # Test ping
        await redis_client.ping()
        ping_time = time.time() - start_time

        # Get Redis info
        info = await redis_client.info()

        return {
            "ping_time_ms": round(ping_time * 1000, 2),
            "connected_clients": info.get("connected_clients", 0),
            "used_memory": info.get("used_memory_human", "unknown"),
            "redis_version": info.get("redis_version", "unknown"),
        }

    async def _check_celery_workers(self) -> Dict[str, Any]:
        """Check Celery worker status"""
        inspect = celery_app.control.inspect()

        # Get active workers
        active_workers = inspect.active() or {}

        # Get worker stats
        stats = inspect.stats() or {}

        return {
            "active_workers": len(active_workers),
            "worker_names": list(active_workers.keys()),
            "total_tasks": sum(
                len(tasks) for tasks in active_workers.values()
            ),
            "worker_stats": stats
        }

    async def _check_disk_space(self) -> Dict[str, Any]:
        """Check available disk space"""
        import shutil

        total, used, free = shutil.disk_usage("/")

        free_percent = (free / total) * 100

        return {
            "total_gb": round(total / (1024**3), 2),
            "used_gb": round(used / (1024**3), 2),
            "free_gb": round(free / (1024**3), 2),
            "free_percent": round(free_percent, 2),
            "warning": free_percent < 20  # Warn if less than 20% free
        }

    async def _check_memory(self) -> Dict[str, Any]:
        """Check memory usage"""
        import psutil

        memory = psutil.virtual_memory()

        return {
            "total_gb": round(memory.total / (1024**3), 2),
            "available_gb": round(memory.available / (1024**3), 2),
            "used_percent": memory.percent,
            "warning": memory.percent > 80  # Warn if over 80% used
        }

    def _get_uptime(self) -> str:
        """Get application uptime"""
        # This would be calculated from app start time
        # For now, return a placeholder
        return "unknown"

    async def register_checks(self):
        """Register health check endpoints"""
        # This would register the health endpoints with the FastAPI app
        pass

Background Health Tasks

Create backend/app/tasks/health_tasks.py:

import time
from celery import current_task

from ..core.celery_app import celery_app
from ..services.health_service import HealthService

@celery_app.task(bind=True)
def system_health_check(self):
    """Run system health check as background task"""
    try:
        current_task.update_state(
            state="PROGRESS",
            meta={"step": "Running health checks"}
        )

        # Note: This would need to be adapted for sync execution
        # In a real implementation, you'd have sync versions of health checks

        return {
            "status": "completed",
            "message": "Health check completed successfully",
            "timestamp": time.time()
        }

    except Exception as exc:
        current_task.update_state(
            state="FAILURE",
            meta={"error": str(exc)}
        )
        raise

@celery_app.task
def cleanup_old_health_data():
    """Clean up old health monitoring data"""
    try:
        # Clean up old health records from Redis
        # Clean up old log files
        # Archive old metrics

        return {"status": "completed", "cleaned_items": 0}

    except Exception as e:
        return {"status": "failed", "error": str(e)}

Step 2: Health Check Endpoints

Health Router

Create backend/app/routers/health.py:

from fastapi import APIRouter, Depends, HTTPException, status
from typing import Dict, Any

from ..services.health_service import HealthService, HealthStatus
from ..dependencies import get_current_active_user
from ..models import User

router = APIRouter(prefix="/health", tags=["health"])

# Health service instance
health_service = HealthService()

@router.get("/")
async def health_check() -> Dict[str, Any]:
    """Basic health check endpoint"""
    return {
        "status": "healthy",
        "timestamp": "2025-01-01T00:00:00Z",
        "service": "task-management-api",
        "version": "5.0.0"
    }

@router.get("/detailed")
async def detailed_health_check() -> Dict[str, Any]:
    """Detailed health check with all components"""
    health_report = await health_service.get_health_summary()

    # Set appropriate HTTP status code based on health
    if health_report["status"] == HealthStatus.UNHEALTHY:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=health_report
        )

    return health_report

@router.get("/live")
async def liveness_probe() -> Dict[str, str]:
    """Kubernetes liveness probe endpoint"""
    # This should only check if the application is running
    # Don't check external dependencies here
    return {"status": "alive"}

@router.get("/ready")
async def readiness_probe() -> Dict[str, Any]:
    """Kubernetes readiness probe endpoint"""
    # Check if the application is ready to receive traffic
    health_report = await health_service.run_all_checks()

    # Only check critical dependencies for readiness
    critical_checks = {
        name: result for name, result in health_report["checks"].items()
        if result.get("critical", False)
    }

    unhealthy_critical = [
        name for name, result in critical_checks.items()
        if result["status"] == HealthStatus.UNHEALTHY
    ]

    if unhealthy_critical:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail={
                "status": "not_ready",
                "failed_checks": unhealthy_critical
            }
        )

    return {
        "status": "ready",
        "critical_checks": len(critical_checks),
        "healthy_checks": len(critical_checks) - len(unhealthy_critical)
    }

@router.get("/metrics")
async def health_metrics(
    current_user: User = Depends(get_current_active_user)
) -> Dict[str, Any]:
    """Health metrics endpoint (requires authentication)"""
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required"
        )

    health_report = await health_service.get_health_summary()

    # Convert to metrics format
    metrics = {
        "health_status": health_report["status"],
        "total_checks": len(health_report["checks"]),
        "healthy_checks": len([
            c for c in health_report["checks"].values()
            if c["status"] == HealthStatus.HEALTHY
        ]),
        "check_details": health_report["checks"],
        "uptime": health_report["uptime"],
        "timestamp": health_report["timestamp"]
    }

    return metrics

@router.post("/checks/run")
async def run_health_checks(
    current_user: User = Depends(get_current_active_user)
) -> Dict[str, Any]:
    """Manually trigger health checks"""
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required"
        )

    return await health_service.run_all_checks()

Step 3: Production Configuration

Environment-based Configuration

Update backend/app/dependencies/config.py:

from typing import List, Optional
from pydantic import BaseSettings, validator

class Settings(BaseSettings):
    # ... existing settings ...

    # Application lifecycle
    startup_timeout: int = 60  # seconds
    shutdown_timeout: int = 30  # seconds
    health_check_interval: int = 30  # seconds

    # Production settings
    allowed_hosts: List[str] = ["localhost", "127.0.0.1"]
    enable_health_monitoring: bool = True
    enable_metrics: bool = True

    # Monitoring integrations
    prometheus_enabled: bool = False
    prometheus_port: int = 9090
    grafana_enabled: bool = False

    # Logging
    log_level: str = "INFO"
    log_format: str = "json"  # json or text
    log_file: Optional[str] = None

    # Performance
    worker_connections: int = 1000
    keepalive_timeout: int = 5
    max_requests: int = 1000
    max_requests_jitter: int = 50

    @validator("allowed_hosts", pre=True)
    def parse_allowed_hosts(cls, v):
        if isinstance(v, str):
            return [host.strip() for host in v.split(",")]
        return v

    class Config:
        env_file = ".env"
        case_sensitive = False

Logging Configuration

Create backend/app/core/logging_config.py:

import logging
import logging.config
import sys
from typing import Dict, Any

from ..dependencies.config import get_settings

def setup_logging() -> None:
    """Configure application logging"""
    settings = get_settings()

    log_config: Dict[str, Any] = {
        "version": 1,
        "disable_existing_loggers": False,
        "formatters": {
            "default": {
                "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
            },
            "json": {
                "format": '{"timestamp": "%(asctime)s", "name": "%(name)s", "level": "%(levelname)s", "message": "%(message)s"}',
            },
        },
        "handlers": {
            "console": {
                "class": "logging.StreamHandler",
                "formatter": settings.log_format,
                "stream": sys.stdout,
            },
        },
        "root": {
            "level": settings.log_level,
            "handlers": ["console"],
        },
        "loggers": {
            "uvicorn": {
                "level": "INFO",
                "handlers": ["console"],
                "propagate": False,
            },
            "sqlalchemy.engine": {
                "level": "WARNING",
                "handlers": ["console"],
                "propagate": False,
            },
            "celery": {
                "level": "INFO",
                "handlers": ["console"],
                "propagate": False,
            },
        },
    }

    # Add file handler if specified
    if settings.log_file:
        log_config["handlers"]["file"] = {
            "class": "logging.handlers.RotatingFileHandler",
            "filename": settings.log_file,
            "maxBytes": 10485760,  # 10MB
            "backupCount": 5,
            "formatter": settings.log_format,
        }
        log_config["root"]["handlers"].append("file")

    logging.config.dictConfig(log_config)

Step 4: Container and Deployment Configuration

Production Dockerfile

Create backend/Dockerfile.prod:

FROM python:3.11-slim

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser

# Set work directory
WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Change ownership to appuser
RUN chown -R appuser:appuser /app

# Switch to non-root user
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
    CMD curl -f http://localhost:8000/health/live || exit 1

# Default command
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Production Docker Compose

Create docker-compose.prod.yml:

version: '3.8'

services:
  web:
    build:
      context: .
      dockerfile: Dockerfile.prod
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://taskuser:${DB_PASSWORD}@db:5432/taskdb
      - REDIS_URL=redis://redis:6379/0
      - ENVIRONMENT=production
      - DEBUG=false
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health/live"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  db:
    image: postgres:15
    environment:
      POSTGRES_DB: taskdb
      POSTGRES_USER: taskuser
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U taskuser -d taskdb"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 3

  celery_worker:
    build:
      context: .
      dockerfile: Dockerfile.prod
    command: celery -A app.core.celery_app worker --loglevel=info
    environment:
      - DATABASE_URL=postgresql+asyncpg://taskuser:${DB_PASSWORD}@db:5432/taskdb
      - REDIS_URL=redis://redis:6379/0
      - ENVIRONMENT=production
    depends_on:
      - db
      - redis
    restart: unless-stopped

  celery_beat:
    build:
      context: .
      dockerfile: Dockerfile.prod
    command: celery -A app.core.celery_app beat --loglevel=info
    environment:
      - DATABASE_URL=postgresql+asyncpg://taskuser:${DB_PASSWORD}@db:5432/taskdb
      - REDIS_URL=redis://redis:6379/0
      - ENVIRONMENT=production
    depends_on:
      - db
      - redis
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - web
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:

Process Management Script

Create backend/scripts/start_production.sh:

#!/bin/bash

set -e

echo "Starting production deployment..."

# Check environment variables
if [ -z "$DB_PASSWORD" ]; then
    echo "Error: DB_PASSWORD environment variable is required"
    exit 1
fi

# Run database migrations
echo "Running database migrations..."
alembic upgrade head

# Start services with health checks
echo "Starting services..."
docker-compose -f docker-compose.prod.yml up -d

# Wait for services to be healthy
echo "Waiting for services to be healthy..."
sleep 30

# Check health
echo "Checking application health..."
curl -f http://localhost:8000/health/ready || {
    echo "Health check failed"
    docker-compose -f docker-compose.prod.yml logs web
    exit 1
}

echo "Production deployment completed successfully!"

Troubleshooting

Common Issues & Solutions

Startup Failures:

# Check service logs
docker-compose logs web

# Check specific service health
curl http://localhost:8000/health/detailed

# Debug startup sequence
docker-compose exec web python -c "
from app.core.app_factory import lifecycle_manager
import asyncio
asyncio.run(lifecycle_manager.startup_sequence())
"

Health Check Failures:

# Manual health check
curl -v http://localhost:8000/health/ready

# Check individual components
curl http://localhost:8000/health/metrics

# Redis connectivity
docker-compose exec redis redis-cli ping

# Database connectivity
docker-compose exec db pg_isready -U taskuser -d taskdb

Graceful Shutdown Issues:

# Send SIGTERM to test graceful shutdown
docker-compose kill -s SIGTERM web

# Check shutdown logs
docker-compose logs web | grep shutdown


What You've Accomplished

Congratulations! You've implemented comprehensive application lifecycle management with:

  • Coordinated startup sequence with error handling and retries
  • Comprehensive health monitoring with detailed component checks
  • Graceful shutdown procedures with proper resource cleanup
  • Production-ready configuration with environment management
  • Container health checks and monitoring integration
  • Error recovery patterns for robust deployment
  • Observability integration for production monitoring

Next Steps

Production Enhancements:

  1. Observability stack - Prometheus, Grafana, ELK stack integration
  2. Circuit breakers - Fault tolerance patterns
  3. Blue-green deployment - Zero-downtime deployments
  4. Auto-scaling - Kubernetes HPA and VPA
  5. Disaster recovery - Backup and restore procedures

Advanced Monitoring:

  1. Custom metrics - Business metrics and KPIs
  2. Distributed tracing - Request flow monitoring
  3. Log aggregation - Centralized logging with search
  4. Alerting - Real-time notification systems
  5. Performance profiling - Application performance monitoring

Ready to deploy production-ready applications? You now have a complete application lifecycle management system that can handle real-world production requirements with confidence and reliability!

Series Completion

🎉 Congratulations! You've completed the entire FastAPI Tutorial Series 2025!

You've built a comprehensive, production-ready Task Management API featuring:

  • Modern FastAPI patterns with TypeScript-like development experience
  • PostgreSQL integration with async operations and migrations
  • Clean architecture with dependency injection and service layers
  • JWT authentication with comprehensive security features
  • Background task processing with Celery and Redis
  • Application lifecycle management with health monitoring

Your API is now ready for production deployment with enterprise-grade patterns and best practices!