Background Tasks in FastAPI 2025 - Async Job Processing & Email Systems
Master background task processing in FastAPI to handle time-consuming operations without blocking your API responses. This tutorial builds on our authentication system, showing how to implement email notifications, file processing, and scalable task queues.
What You'll Learn
By completing this tutorial, you'll master:
- FastAPI native background tasks for lightweight operations
- Celery integration for heavy-duty job processing
- Email notification systems with templates and queuing
- File processing tasks with upload handling and conversion
- Task monitoring and management with Redis and dashboards
- Error handling and retry logic for reliable processing
- Production deployment with proper scaling and monitoring
Prerequisites
What you need before starting:
- Completed the JWT Authentication Tutorial - We'll extend that app
- Basic understanding of Redis and message queues
- Knowledge of async programming in Python
- Familiarity with email systems (SMTP, templates)
Time to complete: 20 minutes
What We're Building
You'll add comprehensive background processing to the Task Management API with:
- Email notifications - Welcome emails, task reminders, password resets
- File processing - Document uploads, image resizing, PDF generation
- Data export tasks - CSV/Excel reports, data backups
- Scheduled jobs - Daily summaries, cleanup tasks, analytics
- Real-time monitoring - Task status tracking and error handling
- Scalable architecture - Multiple workers and queue management
Task Types: - Immediate tasks - Email sending, quick notifications - Scheduled tasks - Daily reports, weekly summaries - Heavy processing - File conversion, data analysis - Retry-enabled tasks - Critical operations with fallbacks
Step 1: Background Task Infrastructure
Install Task Processing Dependencies
cd backend
# Add Celery and Redis for background tasks
poetry add celery[redis] flower
# Add email and template support
poetry add fastapi-mail jinja2 aiofiles
# Add file processing capabilities
poetry add pillow python-multipart
# Add monitoring and scheduling
poetry add celery-beat schedule
Celery Configuration
Create backend/app/core/celery_app.py
:
from celery import Celery
from .config import settings
# Create Celery instance
celery_app = Celery(
"task_management",
broker=settings.redis_url,
backend=settings.redis_url,
include=[
"app.tasks.email_tasks",
"app.tasks.file_tasks",
"app.tasks.export_tasks",
"app.tasks.cleanup_tasks"
]
)
# Celery configuration
celery_app.conf.update(
# Task serialization
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
# Task routing
task_routes={
"app.tasks.email_tasks.*": {"queue": "email"},
"app.tasks.file_tasks.*": {"queue": "files"},
"app.tasks.export_tasks.*": {"queue": "exports"},
"app.tasks.cleanup_tasks.*": {"queue": "maintenance"},
},
# Worker configuration
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_max_tasks_per_child=1000,
# Task results
result_expires=3600, # 1 hour
result_backend_transport_options={
"master_name": "mymaster" # Redis Sentinel support
},
# Task retry settings
task_default_retry_delay=60, # 1 minute
task_max_retries=3,
# Monitoring
worker_send_task_events=True,
task_send_sent_event=True,
)
# Periodic tasks (Celery Beat)
celery_app.conf.beat_schedule = {
"daily-task-summary": {
"task": "app.tasks.email_tasks.send_daily_task_summary",
"schedule": 86400.0, # 24 hours
},
"cleanup-old-files": {
"task": "app.tasks.cleanup_tasks.cleanup_old_files",
"schedule": 3600.0, # 1 hour
},
"backup-database": {
"task": "app.tasks.cleanup_tasks.backup_database",
"schedule": 21600.0, # 6 hours
},
}
# Auto-discover tasks
celery_app.autodiscover_tasks()
Enhanced Configuration
Update backend/app/dependencies/config.py
:
from typing import Optional, List
from pydantic import BaseSettings, EmailStr
class Settings(BaseSettings):
# ... existing settings ...
# Celery & Redis
redis_url: str = "redis://localhost:6379/0"
celery_broker_url: str = "redis://localhost:6379/0"
celery_result_backend: str = "redis://localhost:6379/0"
# Email configuration
mail_server: str = "smtp.gmail.com"
mail_port: int = 587
mail_username: Optional[str] = None
mail_password: Optional[str] = None
mail_from: Optional[EmailStr] = None
mail_from_name: str = "Task Management"
mail_tls: bool = True
mail_ssl: bool = False
# File processing
upload_dir: str = "uploads"
max_file_size: int = 10 * 1024 * 1024 # 10MB
allowed_file_types: List[str] = ["jpg", "jpeg", "png", "pdf", "txt", "docx"]
# Background tasks
task_result_expire: int = 3600 # 1 hour
max_task_retries: int = 3
task_retry_delay: int = 60 # 1 minute
class Config:
env_file = ".env"
Step 2: Email Task System
Email Service
Create backend/app/services/email_service.py
:
import asyncio
from typing import List, Optional, Dict, Any
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
from jinja2 import Environment, FileSystemLoader
from pathlib import Path
from ..models import User, Task
from ..dependencies.config import Settings
class EmailService:
"""Enhanced email service with templates and background processing"""
def __init__(self, settings: Settings):
self.settings = settings
self.conf = ConnectionConfig(
MAIL_SERVER=settings.mail_server,
MAIL_PORT=settings.mail_port,
MAIL_USERNAME=settings.mail_username,
MAIL_PASSWORD=settings.mail_password,
MAIL_FROM=settings.mail_from,
MAIL_FROM_NAME=settings.mail_from_name,
MAIL_TLS=settings.mail_tls,
MAIL_SSL=settings.mail_ssl,
USE_CREDENTIALS=True,
VALIDATE_CERTS=True,
TEMPLATE_FOLDER=Path(__file__).parent.parent / "templates" / "email"
)
self.fast_mail = FastMail(self.conf)
# Setup Jinja2 template environment
template_dir = Path(__file__).parent.parent / "templates" / "email"
self.template_env = Environment(loader=FileSystemLoader(template_dir))
def render_template(self, template_name: str, context: Dict[str, Any]) -> str:
"""Render email template with context"""
template = self.template_env.get_template(template_name)
return template.render(**context)
async def send_email(
self,
recipients: List[str],
subject: str,
template_name: str,
context: Dict[str, Any],
attachments: Optional[List[str]] = None
) -> bool:
"""Send email with template"""
try:
html_body = self.render_template(template_name, context)
message = MessageSchema(
subject=subject,
recipients=recipients,
body=html_body,
subtype="html",
attachments=attachments or []
)
await self.fast_mail.send_message(message)
return True
except Exception as e:
print(f"Email sending failed: {e}")
return False
async def send_welcome_email(self, user: User) -> bool:
"""Send welcome email to new user"""
context = {
"user_name": user.full_name or user.username,
"username": user.username,
"app_name": self.settings.app_name,
"support_email": self.settings.mail_from
}
return await self.send_email(
recipients=[user.email],
subject=f"Welcome to {self.settings.app_name}!",
template_name="welcome.html",
context=context
)
async def send_password_reset_email(self, user: User, reset_token: str) -> bool:
"""Send password reset email"""
reset_url = f"{self.settings.frontend_url}/reset-password?token={reset_token}"
context = {
"user_name": user.full_name or user.username,
"reset_url": reset_url,
"app_name": self.settings.app_name,
"expire_minutes": self.settings.password_reset_expire_minutes
}
return await self.send_email(
recipients=[user.email],
subject="Password Reset Request",
template_name="password_reset.html",
context=context
)
async def send_task_reminder_email(self, user: User, tasks: List[Task]) -> bool:
"""Send task reminder email"""
context = {
"user_name": user.full_name or user.username,
"tasks": tasks,
"task_count": len(tasks),
"app_name": self.settings.app_name,
"dashboard_url": f"{self.settings.frontend_url}/dashboard"
}
return await self.send_email(
recipients=[user.email],
subject=f"Task Reminder - {len(tasks)} pending tasks",
template_name="task_reminder.html",
context=context
)
Email Templates
Create email templates directory and files:
backend/app/templates/email/welcome.html
:
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Welcome to {{ app_name }}</title>
<style>
body { font-family: Arial, sans-serif; line-height: 1.6; color: #333; }
.container { max-width: 600px; margin: 0 auto; padding: 20px; }
.header { background: #007bff; color: white; padding: 20px; text-align: center; }
.content { padding: 20px; background: #f8f9fa; }
.button { display: inline-block; padding: 12px 24px; background: #007bff; color: white; text-decoration: none; border-radius: 4px; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>Welcome to {{ app_name }}!</h1>
</div>
<div class="content">
<h2>Hello {{ user_name }}!</h2>
<p>Thank you for joining {{ app_name }}. We're excited to help you manage your tasks more efficiently.</p>
<p>Your username is: <strong>{{ username }}</strong></p>
<p>Here's what you can do next:</p>
<ul>
<li>Create your first task</li>
<li>Set up task reminders</li>
<li>Explore the dashboard features</li>
</ul>
<p>If you have any questions, feel free to contact us at <a href="mailto:{{ support_email }}">{{ support_email }}</a>.</p>
<p>Happy task managing!</p>
<p>The {{ app_name }} Team</p>
</div>
</div>
</body>
</html>
Email Tasks with Celery
Create backend/app/tasks/email_tasks.py
:
from typing import List, Dict, Any
from celery import current_task
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from ..core.celery_app import celery_app
from ..services.email_service import EmailService
from ..models import User, Task
from ..dependencies.config import get_settings
# Database session for Celery tasks
settings = get_settings()
engine = create_engine(settings.database_url_sync)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@celery_app.task(bind=True, max_retries=3)
def send_welcome_email_task(self, user_id: int):
"""Send welcome email in background"""
try:
# Update task status
current_task.update_state(state="PROGRESS", meta={"step": "Preparing email"})
# Get user from database
db = SessionLocal()
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise ValueError(f"User {user_id} not found")
# Send email
email_service = EmailService(settings)
success = email_service.send_welcome_email(user)
db.close()
if success:
return {"status": "completed", "message": f"Welcome email sent to {user.email}"}
else:
raise Exception("Failed to send email")
except Exception as exc:
current_task.update_state(
state="FAILURE",
meta={"error": str(exc), "step": "Email sending failed"}
)
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@celery_app.task(bind=True, max_retries=3)
def send_password_reset_email_task(self, user_id: int, reset_token: str):
"""Send password reset email in background"""
try:
current_task.update_state(state="PROGRESS", meta={"step": "Sending password reset"})
db = SessionLocal()
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise ValueError(f"User {user_id} not found")
email_service = EmailService(settings)
success = email_service.send_password_reset_email(user, reset_token)
db.close()
if success:
return {"status": "completed", "message": f"Reset email sent to {user.email}"}
else:
raise Exception("Failed to send reset email")
except Exception as exc:
current_task.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@celery_app.task
def send_task_reminder_emails():
"""Send daily task reminder emails to all users"""
db = SessionLocal()
try:
# Get users with pending tasks
users_with_tasks = db.query(User).join(Task).filter(
Task.completed == False,
User.is_active == True
).distinct().all()
email_service = EmailService(settings)
sent_count = 0
for user in users_with_tasks:
pending_tasks = db.query(Task).filter(
Task.owner_id == user.id,
Task.completed == False
).all()
if pending_tasks:
success = email_service.send_task_reminder_email(user, pending_tasks)
if success:
sent_count += 1
return {
"status": "completed",
"users_processed": len(users_with_tasks),
"emails_sent": sent_count
}
finally:
db.close()
@celery_app.task
def send_daily_task_summary():
"""Send daily task summary to all active users"""
db = SessionLocal()
try:
users = db.query(User).filter(User.is_active == True).all()
email_service = EmailService(settings)
for user in users:
# Get user's task statistics
total_tasks = db.query(Task).filter(Task.owner_id == user.id).count()
completed_today = db.query(Task).filter(
Task.owner_id == user.id,
Task.completed == True,
Task.updated_at >= datetime.utcnow().date()
).count()
# Send summary email
context = {
"user_name": user.full_name or user.username,
"total_tasks": total_tasks,
"completed_today": completed_today,
"app_name": settings.app_name
}
email_service.send_email(
recipients=[user.email],
subject="Daily Task Summary",
template_name="daily_summary.html",
context=context
)
return {"status": "completed", "users_processed": len(users)}
finally:
db.close()
Step 3: File Processing Tasks
File Processing Service
Create backend/app/services/file_service.py
:
import os
import uuid
from pathlib import Path
from typing import Optional, Tuple
from PIL import Image
import aiofiles
from fastapi import UploadFile, HTTPException
from ..dependencies.config import Settings
class FileService:
"""File processing service with background task support"""
def __init__(self, settings: Settings):
self.settings = settings
self.upload_dir = Path(settings.upload_dir)
self.upload_dir.mkdir(exist_ok=True)
def _get_file_path(self, filename: str) -> Path:
"""Get full file path"""
return self.upload_dir / filename
def _generate_filename(self, original_filename: str) -> str:
"""Generate unique filename"""
file_ext = Path(original_filename).suffix
unique_id = str(uuid.uuid4())
return f"{unique_id}{file_ext}"
def _validate_file(self, file: UploadFile) -> bool:
"""Validate file type and size"""
file_ext = Path(file.filename).suffix.lower().lstrip('.')
if file_ext not in self.settings.allowed_file_types:
raise HTTPException(
status_code=400,
detail=f"File type {file_ext} not allowed"
)
if file.size > self.settings.max_file_size:
raise HTTPException(
status_code=400,
detail=f"File too large. Max size: {self.settings.max_file_size} bytes"
)
return True
async def save_file(self, file: UploadFile) -> Tuple[str, str]:
"""Save uploaded file"""
self._validate_file(file)
filename = self._generate_filename(file.filename)
file_path = self._get_file_path(filename)
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)
return filename, str(file_path)
def resize_image(self, input_path: str, output_path: str, max_size: Tuple[int, int]) -> bool:
"""Resize image to maximum dimensions"""
try:
with Image.open(input_path) as img:
# Calculate new size maintaining aspect ratio
img.thumbnail(max_size, Image.Resampling.LANCZOS)
# Save resized image
img.save(output_path, optimize=True, quality=85)
return True
except Exception as e:
print(f"Image resize failed: {e}")
return False
def delete_file(self, filename: str) -> bool:
"""Delete file"""
try:
file_path = self._get_file_path(filename)
if file_path.exists():
file_path.unlink()
return True
return False
except Exception:
return False
File Processing Tasks
Create backend/app/tasks/file_tasks.py
:
import os
from pathlib import Path
from celery import current_task
from ..core.celery_app import celery_app
from ..services.file_service import FileService
from ..dependencies.config import get_settings
settings = get_settings()
@celery_app.task(bind=True)
def process_image_upload(self, filename: str, user_id: int):
"""Process uploaded image in background"""
try:
current_task.update_state(
state="PROGRESS",
meta={"step": "Processing image", "progress": 25}
)
file_service = FileService(settings)
input_path = file_service._get_file_path(filename)
if not input_path.exists():
raise FileNotFoundError(f"File {filename} not found")
# Create thumbnails
thumbnail_sizes = [
(150, 150, "thumb"),
(300, 300, "medium"),
(800, 800, "large")
]
processed_files = []
for width, height, size_name in thumbnail_sizes:
current_task.update_state(
state="PROGRESS",
meta={
"step": f"Creating {size_name} thumbnail",
"progress": 25 + (len(processed_files) * 20)
}
)
# Generate output filename
file_stem = Path(filename).stem
file_ext = Path(filename).suffix
output_filename = f"{file_stem}_{size_name}{file_ext}"
output_path = file_service._get_file_path(output_filename)
# Resize image
success = file_service.resize_image(
str(input_path),
str(output_path),
(width, height)
)
if success:
processed_files.append({
"size": size_name,
"filename": output_filename,
"dimensions": f"{width}x{height}"
})
current_task.update_state(
state="PROGRESS",
meta={"step": "Finalizing", "progress": 90}
)
return {
"status": "completed",
"original_file": filename,
"processed_files": processed_files,
"user_id": user_id
}
except Exception as exc:
current_task.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise
@celery_app.task(bind=True)
def generate_pdf_report(self, user_id: int, report_type: str):
"""Generate PDF report for user"""
try:
current_task.update_state(
state="PROGRESS",
meta={"step": "Gathering data", "progress": 20}
)
# In a real implementation, you would:
# 1. Query user data
# 2. Generate PDF using reportlab or weasyprint
# 3. Save to file
# 4. Optionally email to user
import time
time.sleep(2) # Simulate processing time
current_task.update_state(
state="PROGRESS",
meta={"step": "Generating PDF", "progress": 60}
)
time.sleep(2) # Simulate more processing
current_task.update_state(
state="PROGRESS",
meta={"step": "Finalizing report", "progress": 90}
)
# Generate filename
report_filename = f"report_{user_id}_{report_type}_{int(time.time())}.pdf"
return {
"status": "completed",
"report_file": report_filename,
"report_type": report_type,
"user_id": user_id
}
except Exception as exc:
current_task.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise
Step 4: Task Monitoring and Management
Task Status Service
Create backend/app/services/task_status_service.py
:
from typing import Optional, Dict, Any, List
from celery.result import AsyncResult
from celery.app.control import Control
from ..core.celery_app import celery_app
class TaskStatusService:
"""Service for monitoring and managing background tasks"""
def __init__(self):
self.celery_app = celery_app
self.control = Control(celery_app)
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""Get detailed task status"""
result = AsyncResult(task_id, app=self.celery_app)
response = {
"task_id": task_id,
"status": result.status,
"result": result.result,
"traceback": result.traceback,
"date_done": result.date_done,
"successful": result.successful(),
"failed": result.failed(),
}
# Add progress information if available
if result.status == "PROGRESS":
if isinstance(result.result, dict):
response.update(result.result)
return response
def cancel_task(self, task_id: str) -> bool:
"""Cancel a running task"""
try:
self.celery_app.control.revoke(task_id, terminate=True)
return True
except Exception:
return False
def get_active_tasks(self) -> List[Dict[str, Any]]:
"""Get list of active tasks"""
try:
active_tasks = self.control.inspect().active()
tasks = []
for worker, worker_tasks in active_tasks.items():
for task in worker_tasks:
tasks.append({
"task_id": task["id"],
"name": task["name"],
"worker": worker,
"args": task["args"],
"kwargs": task["kwargs"],
})
return tasks
except Exception:
return []
def get_worker_stats(self) -> Dict[str, Any]:
"""Get worker statistics"""
try:
stats = self.control.inspect().stats()
return stats or {}
except Exception:
return {}
def get_queue_lengths(self) -> Dict[str, int]:
"""Get queue lengths"""
try:
# This would require Redis inspection in a real implementation
# For now, return mock data
return {
"email": 0,
"files": 0,
"exports": 0,
"maintenance": 0
}
except Exception:
return {}
Task Management Endpoints
Create backend/app/routers/tasks_bg.py
:
from typing import Dict, Any, List
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, UploadFile, File
from sqlalchemy.ext.asyncio import AsyncSession
from ..dependencies import get_current_active_user, get_database_session
from ..services.task_status_service import TaskStatusService
from ..services.file_service import FileService
from ..tasks.email_tasks import (
send_welcome_email_task,
send_task_reminder_emails,
send_password_reset_email_task
)
from ..tasks.file_tasks import process_image_upload, generate_pdf_report
from ..models import User
router = APIRouter(prefix="/background-tasks", tags=["background-tasks"])
# Task status service
task_status_service = TaskStatusService()
@router.get("/status/{task_id}")
async def get_task_status(
task_id: str,
current_user: User = Depends(get_current_active_user)
):
"""Get background task status"""
status = task_status_service.get_task_status(task_id)
return status
@router.post("/cancel/{task_id}")
async def cancel_task(
task_id: str,
current_user: User = Depends(get_current_active_user)
):
"""Cancel a background task"""
success = task_status_service.cancel_task(task_id)
if success:
return {"message": "Task cancelled successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to cancel task")
@router.get("/active")
async def get_active_tasks(
current_user: User = Depends(get_current_active_user)
):
"""Get list of active background tasks"""
if not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")
tasks = task_status_service.get_active_tasks()
return {"active_tasks": tasks}
@router.get("/stats")
async def get_worker_stats(
current_user: User = Depends(get_current_active_user)
):
"""Get worker statistics"""
if not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")
stats = task_status_service.get_worker_stats()
queue_lengths = task_status_service.get_queue_lengths()
return {
"worker_stats": stats,
"queue_lengths": queue_lengths
}
# Email task endpoints
@router.post("/email/welcome")
async def trigger_welcome_email(
current_user: User = Depends(get_current_active_user)
):
"""Trigger welcome email for current user"""
task = send_welcome_email_task.delay(current_user.id)
return {"task_id": task.id, "message": "Welcome email task queued"}
@router.post("/email/reminders")
async def trigger_task_reminders(
current_user: User = Depends(get_current_active_user)
):
"""Trigger task reminder emails (admin only)"""
if not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")
task = send_task_reminder_emails.delay()
return {"task_id": task.id, "message": "Task reminder emails queued"}
# File processing endpoints
@router.post("/file/upload")
async def upload_and_process_file(
file: UploadFile = File(...),
current_user: User = Depends(get_current_active_user),
file_service: FileService = Depends(lambda: FileService(get_settings()))
):
"""Upload and process file in background"""
try:
# Save file immediately
filename, file_path = await file_service.save_file(file)
# Queue background processing
task = process_image_upload.delay(filename, current_user.id)
return {
"task_id": task.id,
"filename": filename,
"message": "File uploaded and queued for processing"
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/reports/generate")
async def generate_report(
report_type: str,
current_user: User = Depends(get_current_active_user)
):
"""Generate user report in background"""
valid_types = ["tasks", "analytics", "export"]
if report_type not in valid_types:
raise HTTPException(
status_code=400,
detail=f"Invalid report type. Must be one of: {valid_types}"
)
task = generate_pdf_report.delay(current_user.id, report_type)
return {
"task_id": task.id,
"report_type": report_type,
"message": "Report generation queued"
}
Step 5: Production Deployment
Docker Configuration
Create docker-compose.yml
for development:
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
db:
image: postgres:15
environment:
POSTGRES_DB: taskdb
POSTGRES_USER: taskuser
POSTGRES_PASSWORD: taskpass
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
web:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://taskuser:taskpass@db:5432/taskdb
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
volumes:
- ./uploads:/app/uploads
celery_worker:
build: .
command: celery -A app.core.celery_app worker --loglevel=info --queues=email,files,exports,maintenance
environment:
- DATABASE_URL=postgresql+asyncpg://taskuser:taskpass@db:5432/taskdb
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
volumes:
- ./uploads:/app/uploads
celery_beat:
build: .
command: celery -A app.core.celery_app beat --loglevel=info
environment:
- DATABASE_URL=postgresql+asyncpg://taskuser:taskpass@db:5432/taskdb
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
flower:
build: .
command: celery -A app.core.celery_app flower --port=5555
ports:
- "5555:5555"
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
volumes:
postgres_data:
redis_data:
Process Management Script
Create backend/scripts/start_workers.sh
:
#!/bin/bash
# Start Celery workers for different queues
celery -A app.core.celery_app worker --loglevel=info --queues=email --concurrency=4 --hostname=email@%h &
celery -A app.core.celery_app worker --loglevel=info --queues=files --concurrency=2 --hostname=files@%h &
celery -A app.core.celery_app worker --loglevel=info --queues=exports --concurrency=2 --hostname=exports@%h &
celery -A app.core.celery_app worker --loglevel=info --queues=maintenance --concurrency=1 --hostname=maintenance@%h &
# Start Celery Beat scheduler
celery -A app.core.celery_app beat --loglevel=info &
# Start Flower monitoring
celery -A app.core.celery_app flower --port=5555 &
wait
Troubleshooting
Common Issues & Solutions
Celery Worker Connection Issues:
# Check Redis connection
redis-cli ping
# Check Celery workers
celery -A app.core.celery_app status
# Restart workers
pkill -f celery
./scripts/start_workers.sh
Task Failures:
# Monitor failed tasks
from celery import current_app
failed_tasks = current_app.control.inspect().stats()
Email Delivery Issues:
# Test email configuration
from app.services.email_service import EmailService
from app.dependencies.config import get_settings
email_service = EmailService(get_settings())
# Test with a simple email
What You've Accomplished
Congratulations! You've implemented comprehensive background task processing with:
- FastAPI native tasks for lightweight operations
- Celery integration with Redis for heavy-duty processing
- Email notification system with templates and queuing
- File processing capabilities with image resizing and PDF generation
- Task monitoring with status tracking and worker management
- Production deployment with Docker and process management
- Error handling with retry logic and failure recovery
Next Steps
Advanced Features:
- Task chaining - Complex workflows with dependent tasks
- Distributed processing - Multiple worker nodes
- Real-time notifications - WebSocket updates for task progress
- Advanced monitoring - Grafana dashboards and alerting
- Task prioritization - Priority queues and SLA management
Production Enhancements:
- High availability - Redis Sentinel and worker redundancy
- Auto-scaling - Dynamic worker scaling based on load
- Task archival - Long-term storage of task results
- Performance optimization - Task batching and bulk processing
- Security - Task authentication and data encryption
Ready to build scalable, asynchronous applications? You now have a robust background task system that can handle real-world processing requirements efficiently and reliably!