Skip to content

Background Tasks in FastAPI 2025 - Async Job Processing & Email Systems

Master background task processing in FastAPI to handle time-consuming operations without blocking your API responses. This tutorial builds on our authentication system, showing how to implement email notifications, file processing, and scalable task queues.

What You'll Learn

By completing this tutorial, you'll master:

  • FastAPI native background tasks for lightweight operations
  • Celery integration for heavy-duty job processing
  • Email notification systems with templates and queuing
  • File processing tasks with upload handling and conversion
  • Task monitoring and management with Redis and dashboards
  • Error handling and retry logic for reliable processing
  • Production deployment with proper scaling and monitoring

Prerequisites

What you need before starting:

  • Completed the JWT Authentication Tutorial - We'll extend that app
  • Basic understanding of Redis and message queues
  • Knowledge of async programming in Python
  • Familiarity with email systems (SMTP, templates)

Time to complete: 20 minutes


What We're Building

You'll add comprehensive background processing to the Task Management API with:

  • Email notifications - Welcome emails, task reminders, password resets
  • File processing - Document uploads, image resizing, PDF generation
  • Data export tasks - CSV/Excel reports, data backups
  • Scheduled jobs - Daily summaries, cleanup tasks, analytics
  • Real-time monitoring - Task status tracking and error handling
  • Scalable architecture - Multiple workers and queue management

Task Types: - Immediate tasks - Email sending, quick notifications - Scheduled tasks - Daily reports, weekly summaries - Heavy processing - File conversion, data analysis - Retry-enabled tasks - Critical operations with fallbacks


Step 1: Background Task Infrastructure

Install Task Processing Dependencies

Bash
cd backend

# Add Celery and Redis for background tasks
poetry add celery[redis] flower

# Add email and template support
poetry add fastapi-mail jinja2 aiofiles

# Add file processing capabilities
poetry add pillow python-multipart

# Add monitoring and scheduling
poetry add celery-beat schedule

Celery Configuration

Create backend/app/core/celery_app.py:

Python
from celery import Celery
from .config import settings

# Create Celery instance
celery_app = Celery(
    "task_management",
    broker=settings.redis_url,
    backend=settings.redis_url,
    include=[
        "app.tasks.email_tasks",
        "app.tasks.file_tasks",
        "app.tasks.export_tasks",
        "app.tasks.cleanup_tasks"
    ]
)

# Celery configuration
celery_app.conf.update(
    # Task serialization
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,

    # Task routing
    task_routes={
        "app.tasks.email_tasks.*": {"queue": "email"},
        "app.tasks.file_tasks.*": {"queue": "files"},
        "app.tasks.export_tasks.*": {"queue": "exports"},
        "app.tasks.cleanup_tasks.*": {"queue": "maintenance"},
    },

    # Worker configuration
    worker_prefetch_multiplier=1,
    task_acks_late=True,
    worker_max_tasks_per_child=1000,

    # Task results
    result_expires=3600,  # 1 hour
    result_backend_transport_options={
        "master_name": "mymaster"  # Redis Sentinel support
    },

    # Task retry settings
    task_default_retry_delay=60,  # 1 minute
    task_max_retries=3,

    # Monitoring
    worker_send_task_events=True,
    task_send_sent_event=True,
)

# Periodic tasks (Celery Beat)
celery_app.conf.beat_schedule = {
    "daily-task-summary": {
        "task": "app.tasks.email_tasks.send_daily_task_summary",
        "schedule": 86400.0,  # 24 hours
    },
    "cleanup-old-files": {
        "task": "app.tasks.cleanup_tasks.cleanup_old_files",
        "schedule": 3600.0,  # 1 hour
    },
    "backup-database": {
        "task": "app.tasks.cleanup_tasks.backup_database",
        "schedule": 21600.0,  # 6 hours
    },
}

# Auto-discover tasks
celery_app.autodiscover_tasks()

Enhanced Configuration

Update backend/app/dependencies/config.py:

Python
from typing import Optional, List
from pydantic import BaseSettings, EmailStr

class Settings(BaseSettings):
    # ... existing settings ...

    # Celery & Redis
    redis_url: str = "redis://localhost:6379/0"
    celery_broker_url: str = "redis://localhost:6379/0"
    celery_result_backend: str = "redis://localhost:6379/0"

    # Email configuration
    mail_server: str = "smtp.gmail.com"
    mail_port: int = 587
    mail_username: Optional[str] = None
    mail_password: Optional[str] = None
    mail_from: Optional[EmailStr] = None
    mail_from_name: str = "Task Management"
    mail_tls: bool = True
    mail_ssl: bool = False

    # File processing
    upload_dir: str = "uploads"
    max_file_size: int = 10 * 1024 * 1024  # 10MB
    allowed_file_types: List[str] = ["jpg", "jpeg", "png", "pdf", "txt", "docx"]

    # Background tasks
    task_result_expire: int = 3600  # 1 hour
    max_task_retries: int = 3
    task_retry_delay: int = 60  # 1 minute

    class Config:
        env_file = ".env"

Step 2: Email Task System

Email Service

Create backend/app/services/email_service.py:

Python
import asyncio
from typing import List, Optional, Dict, Any
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
from jinja2 import Environment, FileSystemLoader
from pathlib import Path

from ..models import User, Task
from ..dependencies.config import Settings

class EmailService:
    """Enhanced email service with templates and background processing"""

    def __init__(self, settings: Settings):
        self.settings = settings
        self.conf = ConnectionConfig(
            MAIL_SERVER=settings.mail_server,
            MAIL_PORT=settings.mail_port,
            MAIL_USERNAME=settings.mail_username,
            MAIL_PASSWORD=settings.mail_password,
            MAIL_FROM=settings.mail_from,
            MAIL_FROM_NAME=settings.mail_from_name,
            MAIL_TLS=settings.mail_tls,
            MAIL_SSL=settings.mail_ssl,
            USE_CREDENTIALS=True,
            VALIDATE_CERTS=True,
            TEMPLATE_FOLDER=Path(__file__).parent.parent / "templates" / "email"
        )
        self.fast_mail = FastMail(self.conf)

        # Setup Jinja2 template environment
        template_dir = Path(__file__).parent.parent / "templates" / "email"
        self.template_env = Environment(loader=FileSystemLoader(template_dir))

    def render_template(self, template_name: str, context: Dict[str, Any]) -> str:
        """Render email template with context"""
        template = self.template_env.get_template(template_name)
        return template.render(**context)

    async def send_email(
        self,
        recipients: List[str],
        subject: str,
        template_name: str,
        context: Dict[str, Any],
        attachments: Optional[List[str]] = None
    ) -> bool:
        """Send email with template"""
        try:
            html_body = self.render_template(template_name, context)

            message = MessageSchema(
                subject=subject,
                recipients=recipients,
                body=html_body,
                subtype="html",
                attachments=attachments or []
            )

            await self.fast_mail.send_message(message)
            return True

        except Exception as e:
            print(f"Email sending failed: {e}")
            return False

    async def send_welcome_email(self, user: User) -> bool:
        """Send welcome email to new user"""
        context = {
            "user_name": user.full_name or user.username,
            "username": user.username,
            "app_name": self.settings.app_name,
            "support_email": self.settings.mail_from
        }

        return await self.send_email(
            recipients=[user.email],
            subject=f"Welcome to {self.settings.app_name}!",
            template_name="welcome.html",
            context=context
        )

    async def send_password_reset_email(self, user: User, reset_token: str) -> bool:
        """Send password reset email"""
        reset_url = f"{self.settings.frontend_url}/reset-password?token={reset_token}"

        context = {
            "user_name": user.full_name or user.username,
            "reset_url": reset_url,
            "app_name": self.settings.app_name,
            "expire_minutes": self.settings.password_reset_expire_minutes
        }

        return await self.send_email(
            recipients=[user.email],
            subject="Password Reset Request",
            template_name="password_reset.html",
            context=context
        )

    async def send_task_reminder_email(self, user: User, tasks: List[Task]) -> bool:
        """Send task reminder email"""
        context = {
            "user_name": user.full_name or user.username,
            "tasks": tasks,
            "task_count": len(tasks),
            "app_name": self.settings.app_name,
            "dashboard_url": f"{self.settings.frontend_url}/dashboard"
        }

        return await self.send_email(
            recipients=[user.email],
            subject=f"Task Reminder - {len(tasks)} pending tasks",
            template_name="task_reminder.html",
            context=context
        )

Email Templates

Create email templates directory and files:

backend/app/templates/email/welcome.html:

HTML
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Welcome to {{ app_name }}</title>
    <style>
        body { font-family: Arial, sans-serif; line-height: 1.6; color: #333; }
        .container { max-width: 600px; margin: 0 auto; padding: 20px; }
        .header { background: #007bff; color: white; padding: 20px; text-align: center; }
        .content { padding: 20px; background: #f8f9fa; }
        .button { display: inline-block; padding: 12px 24px; background: #007bff; color: white; text-decoration: none; border-radius: 4px; }
    </style>
</head>
<body>
    <div class="container">
        <div class="header">
            <h1>Welcome to {{ app_name }}!</h1>
        </div>
        <div class="content">
            <h2>Hello {{ user_name }}!</h2>
            <p>Thank you for joining {{ app_name }}. We're excited to help you manage your tasks more efficiently.</p>

            <p>Your username is: <strong>{{ username }}</strong></p>

            <p>Here's what you can do next:</p>
            <ul>
                <li>Create your first task</li>
                <li>Set up task reminders</li>
                <li>Explore the dashboard features</li>
            </ul>

            <p>If you have any questions, feel free to contact us at <a href="mailto:{{ support_email }}">{{ support_email }}</a>.</p>

            <p>Happy task managing!</p>
            <p>The {{ app_name }} Team</p>
        </div>
    </div>
</body>
</html>

Email Tasks with Celery

Create backend/app/tasks/email_tasks.py:

Python
from typing import List, Dict, Any
from celery import current_task
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

from ..core.celery_app import celery_app
from ..services.email_service import EmailService
from ..models import User, Task
from ..dependencies.config import get_settings

# Database session for Celery tasks
settings = get_settings()
engine = create_engine(settings.database_url_sync)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

@celery_app.task(bind=True, max_retries=3)
def send_welcome_email_task(self, user_id: int):
    """Send welcome email in background"""
    try:
        # Update task status
        current_task.update_state(state="PROGRESS", meta={"step": "Preparing email"})

        # Get user from database
        db = SessionLocal()
        user = db.query(User).filter(User.id == user_id).first()

        if not user:
            raise ValueError(f"User {user_id} not found")

        # Send email
        email_service = EmailService(settings)
        success = email_service.send_welcome_email(user)

        db.close()

        if success:
            return {"status": "completed", "message": f"Welcome email sent to {user.email}"}
        else:
            raise Exception("Failed to send email")

    except Exception as exc:
        current_task.update_state(
            state="FAILURE",
            meta={"error": str(exc), "step": "Email sending failed"}
        )
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@celery_app.task(bind=True, max_retries=3)
def send_password_reset_email_task(self, user_id: int, reset_token: str):
    """Send password reset email in background"""
    try:
        current_task.update_state(state="PROGRESS", meta={"step": "Sending password reset"})

        db = SessionLocal()
        user = db.query(User).filter(User.id == user_id).first()

        if not user:
            raise ValueError(f"User {user_id} not found")

        email_service = EmailService(settings)
        success = email_service.send_password_reset_email(user, reset_token)

        db.close()

        if success:
            return {"status": "completed", "message": f"Reset email sent to {user.email}"}
        else:
            raise Exception("Failed to send reset email")

    except Exception as exc:
        current_task.update_state(
            state="FAILURE",
            meta={"error": str(exc)}
        )
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@celery_app.task
def send_task_reminder_emails():
    """Send daily task reminder emails to all users"""
    db = SessionLocal()

    try:
        # Get users with pending tasks
        users_with_tasks = db.query(User).join(Task).filter(
            Task.completed == False,
            User.is_active == True
        ).distinct().all()

        email_service = EmailService(settings)
        sent_count = 0

        for user in users_with_tasks:
            pending_tasks = db.query(Task).filter(
                Task.owner_id == user.id,
                Task.completed == False
            ).all()

            if pending_tasks:
                success = email_service.send_task_reminder_email(user, pending_tasks)
                if success:
                    sent_count += 1

        return {
            "status": "completed",
            "users_processed": len(users_with_tasks),
            "emails_sent": sent_count
        }

    finally:
        db.close()

@celery_app.task
def send_daily_task_summary():
    """Send daily task summary to all active users"""
    db = SessionLocal()

    try:
        users = db.query(User).filter(User.is_active == True).all()
        email_service = EmailService(settings)

        for user in users:
            # Get user's task statistics
            total_tasks = db.query(Task).filter(Task.owner_id == user.id).count()
            completed_today = db.query(Task).filter(
                Task.owner_id == user.id,
                Task.completed == True,
                Task.updated_at >= datetime.utcnow().date()
            ).count()

            # Send summary email
            context = {
                "user_name": user.full_name or user.username,
                "total_tasks": total_tasks,
                "completed_today": completed_today,
                "app_name": settings.app_name
            }

            email_service.send_email(
                recipients=[user.email],
                subject="Daily Task Summary",
                template_name="daily_summary.html",
                context=context
            )

        return {"status": "completed", "users_processed": len(users)}

    finally:
        db.close()

Step 3: File Processing Tasks

File Processing Service

Create backend/app/services/file_service.py:

Python
import os
import uuid
from pathlib import Path
from typing import Optional, Tuple
from PIL import Image
import aiofiles
from fastapi import UploadFile, HTTPException

from ..dependencies.config import Settings

class FileService:
    """File processing service with background task support"""

    def __init__(self, settings: Settings):
        self.settings = settings
        self.upload_dir = Path(settings.upload_dir)
        self.upload_dir.mkdir(exist_ok=True)

    def _get_file_path(self, filename: str) -> Path:
        """Get full file path"""
        return self.upload_dir / filename

    def _generate_filename(self, original_filename: str) -> str:
        """Generate unique filename"""
        file_ext = Path(original_filename).suffix
        unique_id = str(uuid.uuid4())
        return f"{unique_id}{file_ext}"

    def _validate_file(self, file: UploadFile) -> bool:
        """Validate file type and size"""
        file_ext = Path(file.filename).suffix.lower().lstrip('.')

        if file_ext not in self.settings.allowed_file_types:
            raise HTTPException(
                status_code=400,
                detail=f"File type {file_ext} not allowed"
            )

        if file.size > self.settings.max_file_size:
            raise HTTPException(
                status_code=400,
                detail=f"File too large. Max size: {self.settings.max_file_size} bytes"
            )

        return True

    async def save_file(self, file: UploadFile) -> Tuple[str, str]:
        """Save uploaded file"""
        self._validate_file(file)

        filename = self._generate_filename(file.filename)
        file_path = self._get_file_path(filename)

        async with aiofiles.open(file_path, 'wb') as f:
            content = await file.read()
            await f.write(content)

        return filename, str(file_path)

    def resize_image(self, input_path: str, output_path: str, max_size: Tuple[int, int]) -> bool:
        """Resize image to maximum dimensions"""
        try:
            with Image.open(input_path) as img:
                # Calculate new size maintaining aspect ratio
                img.thumbnail(max_size, Image.Resampling.LANCZOS)

                # Save resized image
                img.save(output_path, optimize=True, quality=85)

            return True
        except Exception as e:
            print(f"Image resize failed: {e}")
            return False

    def delete_file(self, filename: str) -> bool:
        """Delete file"""
        try:
            file_path = self._get_file_path(filename)
            if file_path.exists():
                file_path.unlink()
                return True
            return False
        except Exception:
            return False

File Processing Tasks

Create backend/app/tasks/file_tasks.py:

Python
import os
from pathlib import Path
from celery import current_task

from ..core.celery_app import celery_app
from ..services.file_service import FileService
from ..dependencies.config import get_settings

settings = get_settings()

@celery_app.task(bind=True)
def process_image_upload(self, filename: str, user_id: int):
    """Process uploaded image in background"""
    try:
        current_task.update_state(
            state="PROGRESS",
            meta={"step": "Processing image", "progress": 25}
        )

        file_service = FileService(settings)
        input_path = file_service._get_file_path(filename)

        if not input_path.exists():
            raise FileNotFoundError(f"File {filename} not found")

        # Create thumbnails
        thumbnail_sizes = [
            (150, 150, "thumb"),
            (300, 300, "medium"),
            (800, 800, "large")
        ]

        processed_files = []

        for width, height, size_name in thumbnail_sizes:
            current_task.update_state(
                state="PROGRESS",
                meta={
                    "step": f"Creating {size_name} thumbnail",
                    "progress": 25 + (len(processed_files) * 20)
                }
            )

            # Generate output filename
            file_stem = Path(filename).stem
            file_ext = Path(filename).suffix
            output_filename = f"{file_stem}_{size_name}{file_ext}"
            output_path = file_service._get_file_path(output_filename)

            # Resize image
            success = file_service.resize_image(
                str(input_path),
                str(output_path),
                (width, height)
            )

            if success:
                processed_files.append({
                    "size": size_name,
                    "filename": output_filename,
                    "dimensions": f"{width}x{height}"
                })

        current_task.update_state(
            state="PROGRESS",
            meta={"step": "Finalizing", "progress": 90}
        )

        return {
            "status": "completed",
            "original_file": filename,
            "processed_files": processed_files,
            "user_id": user_id
        }

    except Exception as exc:
        current_task.update_state(
            state="FAILURE",
            meta={"error": str(exc)}
        )
        raise

@celery_app.task(bind=True)
def generate_pdf_report(self, user_id: int, report_type: str):
    """Generate PDF report for user"""
    try:
        current_task.update_state(
            state="PROGRESS",
            meta={"step": "Gathering data", "progress": 20}
        )

        # In a real implementation, you would:
        # 1. Query user data
        # 2. Generate PDF using reportlab or weasyprint
        # 3. Save to file
        # 4. Optionally email to user

        import time
        time.sleep(2)  # Simulate processing time

        current_task.update_state(
            state="PROGRESS",
            meta={"step": "Generating PDF", "progress": 60}
        )

        time.sleep(2)  # Simulate more processing

        current_task.update_state(
            state="PROGRESS",
            meta={"step": "Finalizing report", "progress": 90}
        )

        # Generate filename
        report_filename = f"report_{user_id}_{report_type}_{int(time.time())}.pdf"

        return {
            "status": "completed",
            "report_file": report_filename,
            "report_type": report_type,
            "user_id": user_id
        }

    except Exception as exc:
        current_task.update_state(
            state="FAILURE",
            meta={"error": str(exc)}
        )
        raise

Step 4: Task Monitoring and Management

Task Status Service

Create backend/app/services/task_status_service.py:

Python
from typing import Optional, Dict, Any, List
from celery.result import AsyncResult
from celery.app.control import Control

from ..core.celery_app import celery_app

class TaskStatusService:
    """Service for monitoring and managing background tasks"""

    def __init__(self):
        self.celery_app = celery_app
        self.control = Control(celery_app)

    def get_task_status(self, task_id: str) -> Dict[str, Any]:
        """Get detailed task status"""
        result = AsyncResult(task_id, app=self.celery_app)

        response = {
            "task_id": task_id,
            "status": result.status,
            "result": result.result,
            "traceback": result.traceback,
            "date_done": result.date_done,
            "successful": result.successful(),
            "failed": result.failed(),
        }

        # Add progress information if available
        if result.status == "PROGRESS":
            if isinstance(result.result, dict):
                response.update(result.result)

        return response

    def cancel_task(self, task_id: str) -> bool:
        """Cancel a running task"""
        try:
            self.celery_app.control.revoke(task_id, terminate=True)
            return True
        except Exception:
            return False

    def get_active_tasks(self) -> List[Dict[str, Any]]:
        """Get list of active tasks"""
        try:
            active_tasks = self.control.inspect().active()
            tasks = []

            for worker, worker_tasks in active_tasks.items():
                for task in worker_tasks:
                    tasks.append({
                        "task_id": task["id"],
                        "name": task["name"],
                        "worker": worker,
                        "args": task["args"],
                        "kwargs": task["kwargs"],
                    })

            return tasks
        except Exception:
            return []

    def get_worker_stats(self) -> Dict[str, Any]:
        """Get worker statistics"""
        try:
            stats = self.control.inspect().stats()
            return stats or {}
        except Exception:
            return {}

    def get_queue_lengths(self) -> Dict[str, int]:
        """Get queue lengths"""
        try:
            # This would require Redis inspection in a real implementation
            # For now, return mock data
            return {
                "email": 0,
                "files": 0,
                "exports": 0,
                "maintenance": 0
            }
        except Exception:
            return {}

Task Management Endpoints

Create backend/app/routers/tasks_bg.py:

Python
from typing import Dict, Any, List
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, UploadFile, File
from sqlalchemy.ext.asyncio import AsyncSession

from ..dependencies import get_current_active_user, get_database_session
from ..services.task_status_service import TaskStatusService
from ..services.file_service import FileService
from ..tasks.email_tasks import (
    send_welcome_email_task,
    send_task_reminder_emails,
    send_password_reset_email_task
)
from ..tasks.file_tasks import process_image_upload, generate_pdf_report
from ..models import User

router = APIRouter(prefix="/background-tasks", tags=["background-tasks"])

# Task status service
task_status_service = TaskStatusService()

@router.get("/status/{task_id}")
async def get_task_status(
    task_id: str,
    current_user: User = Depends(get_current_active_user)
):
    """Get background task status"""
    status = task_status_service.get_task_status(task_id)
    return status

@router.post("/cancel/{task_id}")
async def cancel_task(
    task_id: str,
    current_user: User = Depends(get_current_active_user)
):
    """Cancel a background task"""
    success = task_status_service.cancel_task(task_id)
    if success:
        return {"message": "Task cancelled successfully"}
    else:
        raise HTTPException(status_code=400, detail="Failed to cancel task")

@router.get("/active")
async def get_active_tasks(
    current_user: User = Depends(get_current_active_user)
):
    """Get list of active background tasks"""
    if not current_user.is_superuser:
        raise HTTPException(status_code=403, detail="Admin access required")

    tasks = task_status_service.get_active_tasks()
    return {"active_tasks": tasks}

@router.get("/stats")
async def get_worker_stats(
    current_user: User = Depends(get_current_active_user)
):
    """Get worker statistics"""
    if not current_user.is_superuser:
        raise HTTPException(status_code=403, detail="Admin access required")

    stats = task_status_service.get_worker_stats()
    queue_lengths = task_status_service.get_queue_lengths()

    return {
        "worker_stats": stats,
        "queue_lengths": queue_lengths
    }

# Email task endpoints
@router.post("/email/welcome")
async def trigger_welcome_email(
    current_user: User = Depends(get_current_active_user)
):
    """Trigger welcome email for current user"""
    task = send_welcome_email_task.delay(current_user.id)
    return {"task_id": task.id, "message": "Welcome email task queued"}

@router.post("/email/reminders")
async def trigger_task_reminders(
    current_user: User = Depends(get_current_active_user)
):
    """Trigger task reminder emails (admin only)"""
    if not current_user.is_superuser:
        raise HTTPException(status_code=403, detail="Admin access required")

    task = send_task_reminder_emails.delay()
    return {"task_id": task.id, "message": "Task reminder emails queued"}

# File processing endpoints
@router.post("/file/upload")
async def upload_and_process_file(
    file: UploadFile = File(...),
    current_user: User = Depends(get_current_active_user),
    file_service: FileService = Depends(lambda: FileService(get_settings()))
):
    """Upload and process file in background"""
    try:
        # Save file immediately
        filename, file_path = await file_service.save_file(file)

        # Queue background processing
        task = process_image_upload.delay(filename, current_user.id)

        return {
            "task_id": task.id,
            "filename": filename,
            "message": "File uploaded and queued for processing"
        }
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.post("/reports/generate")
async def generate_report(
    report_type: str,
    current_user: User = Depends(get_current_active_user)
):
    """Generate user report in background"""
    valid_types = ["tasks", "analytics", "export"]
    if report_type not in valid_types:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid report type. Must be one of: {valid_types}"
        )

    task = generate_pdf_report.delay(current_user.id, report_type)

    return {
        "task_id": task.id,
        "report_type": report_type,
        "message": "Report generation queued"
    }

Step 5: Production Deployment

Docker Configuration

Create docker-compose.yml for development:

YAML
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  db:
    image: postgres:15
    environment:
      POSTGRES_DB: taskdb
      POSTGRES_USER: taskuser
      POSTGRES_PASSWORD: taskpass
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://taskuser:taskpass@db:5432/taskdb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    volumes:
      - ./uploads:/app/uploads

  celery_worker:
    build: .
    command: celery -A app.core.celery_app worker --loglevel=info --queues=email,files,exports,maintenance
    environment:
      - DATABASE_URL=postgresql+asyncpg://taskuser:taskpass@db:5432/taskdb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    volumes:
      - ./uploads:/app/uploads

  celery_beat:
    build: .
    command: celery -A app.core.celery_app beat --loglevel=info
    environment:
      - DATABASE_URL=postgresql+asyncpg://taskuser:taskpass@db:5432/taskdb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis

  flower:
    build: .
    command: celery -A app.core.celery_app flower --port=5555
    ports:
      - "5555:5555"
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis

volumes:
  postgres_data:
  redis_data:

Process Management Script

Create backend/scripts/start_workers.sh:

Bash
#!/bin/bash

# Start Celery workers for different queues
celery -A app.core.celery_app worker --loglevel=info --queues=email --concurrency=4 --hostname=email@%h &
celery -A app.core.celery_app worker --loglevel=info --queues=files --concurrency=2 --hostname=files@%h &
celery -A app.core.celery_app worker --loglevel=info --queues=exports --concurrency=2 --hostname=exports@%h &
celery -A app.core.celery_app worker --loglevel=info --queues=maintenance --concurrency=1 --hostname=maintenance@%h &

# Start Celery Beat scheduler
celery -A app.core.celery_app beat --loglevel=info &

# Start Flower monitoring
celery -A app.core.celery_app flower --port=5555 &

wait

Troubleshooting

Common Issues & Solutions

Celery Worker Connection Issues:

Bash
1
2
3
4
5
6
7
8
9
# Check Redis connection
redis-cli ping

# Check Celery workers
celery -A app.core.celery_app status

# Restart workers
pkill -f celery
./scripts/start_workers.sh

Task Failures:

Python
1
2
3
# Monitor failed tasks
from celery import current_app
failed_tasks = current_app.control.inspect().stats()

Email Delivery Issues:

Python
1
2
3
4
5
6
# Test email configuration
from app.services.email_service import EmailService
from app.dependencies.config import get_settings

email_service = EmailService(get_settings())
# Test with a simple email


What You've Accomplished

Congratulations! You've implemented comprehensive background task processing with:

  • FastAPI native tasks for lightweight operations
  • Celery integration with Redis for heavy-duty processing
  • Email notification system with templates and queuing
  • File processing capabilities with image resizing and PDF generation
  • Task monitoring with status tracking and worker management
  • Production deployment with Docker and process management
  • Error handling with retry logic and failure recovery

Next Steps

Advanced Features:

  1. Task chaining - Complex workflows with dependent tasks
  2. Distributed processing - Multiple worker nodes
  3. Real-time notifications - WebSocket updates for task progress
  4. Advanced monitoring - Grafana dashboards and alerting
  5. Task prioritization - Priority queues and SLA management

Production Enhancements:

  1. High availability - Redis Sentinel and worker redundancy
  2. Auto-scaling - Dynamic worker scaling based on load
  3. Task archival - Long-term storage of task results
  4. Performance optimization - Task batching and bulk processing
  5. Security - Task authentication and data encryption

Ready to build scalable, asynchronous applications? You now have a robust background task system that can handle real-world processing requirements efficiently and reliably!