PostgreSQL with FastAPI Tutorial 2025 - Build Production-Ready Database Apps
Learn to integrate PostgreSQL with FastAPI using modern async patterns, SQLAlchemy 2.0, and production-ready database management. This tutorial converts our Task Management app from in-memory storage to persistent PostgreSQL database.
What You'll Learn
By completing this tutorial, you'll master:
- Async PostgreSQL integration with FastAPI and SQLAlchemy 2.0
- Database modeling with relationships and constraints
- Migration management using Alembic for schema changes
- Connection pooling and performance optimization
- Environment configuration for development and production
- Error handling and database transaction management
- Production deployment with proper database security
Prerequisites
What you need before starting:
- Completed the Full-Stack Tutorial - We'll convert that app
- PostgreSQL 13+ installed locally or via Docker
- Python 3.8+ with Poetry dependency management
- Basic SQL knowledge (CREATE, SELECT, INSERT, UPDATE, DELETE)
Time to complete: 30 minutes
What We're Building
You'll upgrade the Task Management Dashboard from the previous tutorial with:
- Persistent PostgreSQL storage - Tasks survive server restarts
- Database relationships - Users can own multiple tasks
- Advanced queries - Search, filtering, and pagination
- Database migrations - Schema versioning and updates
- Production configuration - Connection pooling and security
- Backup and recovery - Data protection strategies
Before vs After: - Before: Tasks stored in memory, lost on restart - After: Tasks permanently stored in PostgreSQL with full CRUD operations
Step 1: Database Setup and Configuration
Install PostgreSQL
Option A: Docker (Recommended for Development)
# Start PostgreSQL container
docker run --name task-postgres \
-e POSTGRES_USER=taskuser \
-e POSTGRES_PASSWORD=taskpass \
-e POSTGRES_DB=taskdb \
-p 5432:5432 \
-d postgres:15
# Verify it's running
docker ps
Option B: Local Installation
# macOS
brew install postgresql
brew services start postgresql
# Ubuntu/Debian
sudo apt update
sudo apt install postgresql postgresql-contrib
# Create database and user
sudo -u postgres psql
CREATE DATABASE taskdb;
CREATE USER taskuser WITH PASSWORD 'taskpass';
GRANT ALL PRIVILEGES ON DATABASE taskdb TO taskuser;
\q
Install Python Dependencies
Navigate to your full-stack project from the previous tutorial:
cd fullstack-task-app/backend
# Add database dependencies
poetry add sqlalchemy[asyncio] asyncpg alembic
# Add development dependencies
poetry add --group dev pytest-asyncio
Environment Configuration
Create backend/.env
:
# Database Configuration
DATABASE_URL=postgresql+asyncpg://taskuser:taskpass@localhost:5432/taskdb
DATABASE_URL_SYNC=postgresql://taskuser:taskpass@localhost:5432/taskdb
# Application Settings
ENV=development
DEBUG=true
SECRET_KEY=your-secret-key-here
# Connection Pool Settings
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30
Create backend/.env.example
for team sharing:
DATABASE_URL=postgresql+asyncpg://username:password@localhost:5432/dbname
DATABASE_URL_SYNC=postgresql://username:password@localhost:5432/dbname
ENV=development
DEBUG=true
SECRET_KEY=change-this-in-production
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30
Step 2: Database Models and Schema
Create Database Configuration
Create backend/app/database.py
:
import os
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy.pool import NullPool
# Load environment variables
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
raise ValueError("DATABASE_URL environment variable is required")
# Create async engine with connection pooling
engine = create_async_engine(
DATABASE_URL,
echo=os.getenv("DEBUG", "false").lower() == "true",
pool_size=int(os.getenv("DB_POOL_SIZE", "10")),
max_overflow=int(os.getenv("DB_MAX_OVERFLOW", "20")),
pool_timeout=int(os.getenv("DB_POOL_TIMEOUT", "30")),
# Use NullPool for async if needed
poolclass=NullPool if os.getenv("ENV") == "test" else None,
)
# Create async session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
# Database base class
Base = declarative_base()
# Dependency for getting database session
async def get_database_session() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
Create Database Models
Update backend/app/models.py
:
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True, nullable=False)
username = Column(String(100), unique=True, index=True, nullable=False)
full_name = Column(String(255), nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# Relationship to tasks
tasks = relationship("Task", back_populates="owner", cascade="all, delete-orphan")
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(255), nullable=False, index=True)
description = Column(Text, nullable=True)
completed = Column(Boolean, default=False, index=True)
priority = Column(String(20), default="medium", index=True) # low, medium, high
due_date = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# Foreign key to users
owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Relationship to user
owner = relationship("User", back_populates="tasks")
def __repr__(self):
return f"<Task(id={self.id}, title='{self.title}', completed={self.completed})>"
Create Pydantic Schemas
Create backend/app/schemas.py
:
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field
# User Schemas
class UserBase(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=100)
full_name: Optional[str] = None
class UserCreate(UserBase):
pass
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = Field(None, min_length=3, max_length=100)
full_name: Optional[str] = None
is_active: Optional[bool] = None
class User(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Task Schemas
class TaskBase(BaseModel):
title: str = Field(..., min_length=1, max_length=255)
description: Optional[str] = None
priority: Optional[str] = Field("medium", regex="^(low|medium|high)$")
due_date: Optional[datetime] = None
class TaskCreate(TaskBase):
pass
class TaskUpdate(BaseModel):
title: Optional[str] = Field(None, min_length=1, max_length=255)
description: Optional[str] = None
completed: Optional[bool] = None
priority: Optional[str] = Field(None, regex="^(low|medium|high)$")
due_date: Optional[datetime] = None
class Task(TaskBase):
id: int
completed: bool
created_at: datetime
updated_at: datetime
owner_id: int
class Config:
from_attributes = True
class TaskWithOwner(Task):
owner: User
class UserWithTasks(User):
tasks: List[Task] = []
Step 3: Database Migrations with Alembic
Initialize Alembic
cd backend
# Initialize Alembic in the project
poetry run alembic init alembic
# This creates an alembic/ directory with configuration
Configure Alembic
Update backend/alembic.ini
:
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
timezone =
# max length of characters to apply to the
# "slug" field
truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
sourceless = false
# version number format
version_num_format = %04d
# version path separator; default is 'os.pathsep'
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
Update backend/alembic/env.py
:
import os
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
# Import your models
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
from app.database import Base
from app.models import User, Task # Import all models
# This is the Alembic Config object
config = context.config
# Set the database URL from environment
database_url = os.getenv("DATABASE_URL_SYNC")
if database_url:
config.set_main_option("sqlalchemy.url", database_url)
# Interpret the config file for Python logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Set target metadata for autogenerate support
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode."""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Create Initial Migration
# Load environment variables (if using direnv or manual export)
export DATABASE_URL_SYNC="postgresql://taskuser:taskpass@localhost:5432/taskdb"
# Generate initial migration
poetry run alembic revision --autogenerate -m "Initial migration: users and tasks tables"
# Apply migration
poetry run alembic upgrade head
Step 4: Database Service Layer
Create Repository Pattern
Create backend/app/crud.py
:
from typing import List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from sqlalchemy import and_, or_, desc, asc
from models import User, Task
from schemas import UserCreate, UserUpdate, TaskCreate, TaskUpdate
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, user_data: UserCreate) -> User:
user = User(**user_data.dict())
self.session.add(user)
await self.session.flush()
await self.session.refresh(user)
return user
async def get_by_id(self, user_id: int) -> Optional[User]:
result = await self.session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> Optional[User]:
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def get_by_username(self, username: str) -> Optional[User]:
result = await self.session.execute(
select(User).where(User.username == username)
)
return result.scalar_one_or_none()
async def update(self, user_id: int, user_data: UserUpdate) -> Optional[User]:
user = await self.get_by_id(user_id)
if not user:
return None
update_data = user_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
await self.session.flush()
await self.session.refresh(user)
return user
async def delete(self, user_id: int) -> bool:
user = await self.get_by_id(user_id)
if not user:
return False
await self.session.delete(user)
await self.session.flush()
return True
class TaskRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, task_data: TaskCreate, owner_id: int) -> Task:
task = Task(**task_data.dict(), owner_id=owner_id)
self.session.add(task)
await self.session.flush()
await self.session.refresh(task)
return task
async def get_by_id(self, task_id: int) -> Optional[Task]:
result = await self.session.execute(
select(Task)
.options(selectinload(Task.owner))
.where(Task.id == task_id)
)
return result.scalar_one_or_none()
async def get_user_tasks(
self,
owner_id: int,
skip: int = 0,
limit: int = 100,
completed: Optional[bool] = None,
priority: Optional[str] = None,
search: Optional[str] = None
) -> List[Task]:
query = select(Task).where(Task.owner_id == owner_id)
# Add filters
if completed is not None:
query = query.where(Task.completed == completed)
if priority:
query = query.where(Task.priority == priority)
if search:
query = query.where(
or_(
Task.title.ilike(f"%{search}%"),
Task.description.ilike(f"%{search}%")
)
)
# Add ordering and pagination
query = query.order_by(desc(Task.created_at)).offset(skip).limit(limit)
result = await self.session.execute(query)
return result.scalars().all()
async def update(self, task_id: int, task_data: TaskUpdate) -> Optional[Task]:
task = await self.get_by_id(task_id)
if not task:
return None
update_data = task_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
await self.session.flush()
await self.session.refresh(task)
return task
async def delete(self, task_id: int) -> bool:
task = await self.get_by_id(task_id)
if not task:
return False
await self.session.delete(task)
await self.session.flush()
return True
async def get_task_stats(self, owner_id: int) -> dict:
"""Get task statistics for a user"""
total_query = select(Task).where(Task.owner_id == owner_id)
completed_query = select(Task).where(
and_(Task.owner_id == owner_id, Task.completed == True)
)
total_result = await self.session.execute(total_query)
completed_result = await self.session.execute(completed_query)
total_tasks = len(total_result.scalars().all())
completed_tasks = len(completed_result.scalars().all())
return {
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"pending_tasks": total_tasks - completed_tasks,
"completion_rate": completed_tasks / total_tasks if total_tasks > 0 else 0
}
Step 5: Update FastAPI Application
Update Main Application
Update backend/app/main.py
:
import os
from fastapi import FastAPI, Depends, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from database import get_database_session
from crud import UserRepository, TaskRepository
from schemas import (
User, UserCreate, UserUpdate, UserWithTasks,
Task, TaskCreate, TaskUpdate, TaskWithOwner
)
# Load environment
from dotenv import load_dotenv
load_dotenv()
app = FastAPI(
title="Task Management API with PostgreSQL",
description="A production-ready task management API built with FastAPI and PostgreSQL",
version="2.0.0"
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy", "message": "Task Management API is running"}
# User endpoints
@app.post("/users", response_model=User)
async def create_user(
user_data: UserCreate,
session: AsyncSession = Depends(get_database_session)
):
user_repo = UserRepository(session)
# Check if user already exists
existing_user = await user_repo.get_by_email(user_data.email)
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
existing_username = await user_repo.get_by_username(user_data.username)
if existing_username:
raise HTTPException(status_code=400, detail="Username already taken")
return await user_repo.create(user_data)
@app.get("/users/{user_id}", response_model=UserWithTasks)
async def get_user(
user_id: int,
session: AsyncSession = Depends(get_database_session)
):
user_repo = UserRepository(session)
user = await user_repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# Task endpoints
@app.post("/users/{user_id}/tasks", response_model=Task)
async def create_task(
user_id: int,
task_data: TaskCreate,
session: AsyncSession = Depends(get_database_session)
):
user_repo = UserRepository(session)
task_repo = TaskRepository(session)
# Verify user exists
user = await user_repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return await task_repo.create(task_data, user_id)
@app.get("/users/{user_id}/tasks", response_model=List[Task])
async def get_user_tasks(
user_id: int,
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
completed: Optional[bool] = Query(None),
priority: Optional[str] = Query(None, regex="^(low|medium|high)$"),
search: Optional[str] = Query(None),
session: AsyncSession = Depends(get_database_session)
):
user_repo = UserRepository(session)
task_repo = TaskRepository(session)
# Verify user exists
user = await user_repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return await task_repo.get_user_tasks(
owner_id=user_id,
skip=skip,
limit=limit,
completed=completed,
priority=priority,
search=search
)
@app.get("/tasks/{task_id}", response_model=TaskWithOwner)
async def get_task(
task_id: int,
session: AsyncSession = Depends(get_database_session)
):
task_repo = TaskRepository(session)
task = await task_repo.get_by_id(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.put("/tasks/{task_id}", response_model=Task)
async def update_task(
task_id: int,
task_data: TaskUpdate,
session: AsyncSession = Depends(get_database_session)
):
task_repo = TaskRepository(session)
task = await task_repo.update(task_id, task_data)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.delete("/tasks/{task_id}")
async def delete_task(
task_id: int,
session: AsyncSession = Depends(get_database_session)
):
task_repo = TaskRepository(session)
deleted = await task_repo.delete(task_id)
if not deleted:
raise HTTPException(status_code=404, detail="Task not found")
return {"message": "Task deleted successfully"}
@app.get("/users/{user_id}/tasks/stats")
async def get_task_stats(
user_id: int,
session: AsyncSession = Depends(get_database_session)
):
user_repo = UserRepository(session)
task_repo = TaskRepository(session)
# Verify user exists
user = await user_repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return await task_repo.get_task_stats(user_id)
# Legacy endpoints for compatibility with frontend
@app.get("/", response_model=dict)
async def root():
return {"message": "Task Management API with PostgreSQL", "version": "2.0.0"}
# For demo purposes - create a default user and return their tasks
@app.get("/tasks", response_model=List[Task])
async def get_demo_tasks(
session: AsyncSession = Depends(get_database_session)
):
user_repo = UserRepository(session)
task_repo = TaskRepository(session)
# Get or create demo user
demo_user = await user_repo.get_by_username("demo")
if not demo_user:
demo_user_data = UserCreate(
email="[email protected]",
username="demo",
full_name="Demo User"
)
demo_user = await user_repo.create(demo_user_data)
return await task_repo.get_user_tasks(demo_user.id)
@app.post("/tasks", response_model=Task)
async def create_demo_task(
task_data: TaskCreate,
session: AsyncSession = Depends(get_database_session)
):
user_repo = UserRepository(session)
task_repo = TaskRepository(session)
# Get or create demo user
demo_user = await user_repo.get_by_username("demo")
if not demo_user:
demo_user_data = UserCreate(
email="[email protected]",
username="demo",
full_name="Demo User"
)
demo_user = await user_repo.create(demo_user_data)
return await task_repo.create(task_data, demo_user.id)
Step 6: Test the Database Integration
Start the Application
# Make sure PostgreSQL is running
docker ps # Check if container is running
# Start FastAPI with database
cd backend
poetry run uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
Test Database Operations
-
Check API Documentation: Visit
http://localhost:8000/docs
-
Test User Creation:
curl -X POST "http://localhost:8000/users" \ -H "Content-Type: application/json" \ -d '{ "email": "[email protected]", "username": "john_doe", "full_name": "John Doe" }'
-
Test Task Creation:
-
Test Task Filtering:
Verify Database State
# Connect to PostgreSQL
docker exec -it task-postgres psql -U taskuser -d taskdb
# Check tables
\dt
# View users
SELECT * FROM users;
# View tasks with owners
SELECT t.title, t.priority, u.username
FROM tasks t
JOIN users u ON t.owner_id = u.id;
# Exit
\q
Step 7: Production Configuration
Environment Management
Create backend/app/config.py
:
import os
from typing import Optional
from pydantic import BaseSettings, validator
class Settings(BaseSettings):
# Application
app_name: str = "Task Management API"
debug: bool = False
environment: str = "production"
# Database
database_url: str
database_url_sync: str
# Connection Pool
db_pool_size: int = 10
db_max_overflow: int = 20
db_pool_timeout: int = 30
db_pool_recycle: int = 3600
# Security
secret_key: str
access_token_expire_minutes: int = 30
# CORS
cors_origins: list = ["http://localhost:3000", "http://localhost:5173"]
@validator("database_url", pre=True)
def validate_database_url(cls, v):
if not v:
raise ValueError("DATABASE_URL is required")
return v
class Config:
env_file = ".env"
case_sensitive = False
settings = Settings()
Production Database Optimizations
Update backend/app/database.py
for production:
import os
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy.pool import QueuePool
from app.config import settings
# Create production-optimized engine
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
# Connection pooling
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_timeout=settings.db_pool_timeout,
pool_recycle=settings.db_pool_recycle,
poolclass=QueuePool,
# Performance optimizations
pool_pre_ping=True, # Validate connections
pool_reset_on_return="commit", # Reset connections
# Connection arguments
connect_args={
"server_settings": {
"application_name": settings.app_name,
"jit": "off", # Disable JIT for better connection times
}
}
)
# Session factory with optimizations
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
Base = declarative_base()
async def get_database_session() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception as e:
await session.rollback()
# Log error in production
if settings.environment == "production":
import logging
logging.error(f"Database error: {e}")
raise
finally:
await session.close()
Database Backup Strategy
Create backend/scripts/backup_db.sh
:
#!/bin/bash
# Database backup script
DB_NAME="taskdb"
DB_USER="taskuser"
DB_HOST="localhost"
DB_PORT="5432"
BACKUP_DIR="/backups"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
# Create backup directory
mkdir -p $BACKUP_DIR
# Create backup
pg_dump -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME \
--no-password --clean --if-exists \
-f "${BACKUP_DIR}/taskdb_backup_${TIMESTAMP}.sql"
# Compress backup
gzip "${BACKUP_DIR}/taskdb_backup_${TIMESTAMP}.sql"
# Remove backups older than 7 days
find $BACKUP_DIR -name "taskdb_backup_*.sql.gz" -mtime +7 -delete
echo "Backup completed: taskdb_backup_${TIMESTAMP}.sql.gz"
Troubleshooting
Common Issues & Solutions
Connection Errors:
# Check PostgreSQL status
docker ps
# Check logs
docker logs task-postgres
# Test connection
poetry run python -c "
import asyncio
from app.database import engine
async def test():
async with engine.begin() as conn:
result = await conn.execute('SELECT 1')
print('Connection successful:', result.scalar())
asyncio.run(test())
"
Migration Issues:
# Check current migration status
poetry run alembic current
# View migration history
poetry run alembic history
# Rollback migration
poetry run alembic downgrade -1
# Reset database (development only)
poetry run alembic downgrade base
poetry run alembic upgrade head
Performance Issues:
-- Check active connections
SELECT count(*) FROM pg_stat_activity WHERE datname = 'taskdb';
-- Check slow queries
SELECT query, mean_exec_time, calls
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 5;
-- Create indexes for better performance
CREATE INDEX idx_tasks_owner_id ON tasks(owner_id);
CREATE INDEX idx_tasks_completed ON tasks(completed);
CREATE INDEX idx_tasks_priority ON tasks(priority);
Environment Issues:
# Load environment variables
export DATABASE_URL="postgresql+asyncpg://taskuser:taskpass@localhost:5432/taskdb"
export DATABASE_URL_SYNC="postgresql://taskuser:taskpass@localhost:5432/taskdb"
# Test environment loading
poetry run python -c "from app.config import settings; print(settings.database_url)"
What You've Accomplished
Congratulations! You've successfully integrated PostgreSQL with FastAPI featuring:
- Async PostgreSQL integration with SQLAlchemy 2.0 and asyncpg driver
- Database modeling with relationships, constraints, and proper schema design
- Migration management using Alembic for version control
- Repository pattern for clean separation of database logic
- Advanced querying with filtering, searching, and pagination
- Production configuration with connection pooling and optimization
- Error handling and transaction management
- Database testing and debugging capabilities
Next Steps
Immediate Enhancements:
- Add authentication - Secure your endpoints with JWT tokens
- Implement caching - Use Redis for better performance
- Add full-text search - PostgreSQL FTS or Elasticsearch
- Create database indexes - Optimize query performance
- Add database monitoring - Track performance metrics
Advanced Features:
- Database replication - Master-slave setup for high availability
- Connection pooling optimization - PgBouncer or built-in pooling
- Database sharding - Horizontal scaling strategies
- Backup automation - Scheduled backups and point-in-time recovery
- Query optimization - Analyze and optimize slow queries
Production Deployment:
- Managed PostgreSQL: AWS RDS, Google Cloud SQL, or Railway PostgreSQL
- Container orchestration: Docker Compose or Kubernetes
- Environment management: Separate dev/staging/production databases
- Monitoring: Database performance and health monitoring
Pro Tips for Production PostgreSQL
Performance Optimization:
- Use connection pooling - Reduce connection overhead
- Create proper indexes - Speed up common queries
- Monitor query performance - Identify and fix slow queries
- Use EXPLAIN ANALYZE - Understand query execution plans
- Configure PostgreSQL settings - Tune for your workload
Security Best Practices:
- Use environment variables - Never hardcode credentials
- Create limited-privilege users - Principle of least privilege
- Enable SSL connections - Encrypt data in transit
- Regular security updates - Keep PostgreSQL updated
- Audit database access - Log and monitor connections
Ready to build production-ready database applications? You now have the foundation to create scalable, performant applications with PostgreSQL and FastAPI. Your apps can handle real-world data persistence with confidence!