Skip to content

PostgreSQL with FastAPI Tutorial 2025 - Build Production-Ready Database Apps

Learn to integrate PostgreSQL with FastAPI using modern async patterns, SQLAlchemy 2.0, and production-ready database management. This tutorial converts our Task Management app from in-memory storage to persistent PostgreSQL database.

What You'll Learn

By completing this tutorial, you'll master:

  • Async PostgreSQL integration with FastAPI and SQLAlchemy 2.0
  • Database modeling with relationships and constraints
  • Migration management using Alembic for schema changes
  • Connection pooling and performance optimization
  • Environment configuration for development and production
  • Error handling and database transaction management
  • Production deployment with proper database security

Prerequisites

What you need before starting:

  • Completed the Full-Stack Tutorial - We'll convert that app
  • PostgreSQL 13+ installed locally or via Docker
  • Python 3.8+ with Poetry dependency management
  • Basic SQL knowledge (CREATE, SELECT, INSERT, UPDATE, DELETE)

Time to complete: 30 minutes


What We're Building

You'll upgrade the Task Management Dashboard from the previous tutorial with:

  • Persistent PostgreSQL storage - Tasks survive server restarts
  • Database relationships - Users can own multiple tasks
  • Advanced queries - Search, filtering, and pagination
  • Database migrations - Schema versioning and updates
  • Production configuration - Connection pooling and security
  • Backup and recovery - Data protection strategies

Before vs After: - Before: Tasks stored in memory, lost on restart - After: Tasks permanently stored in PostgreSQL with full CRUD operations


Step 1: Database Setup and Configuration

Install PostgreSQL

Option A: Docker (Recommended for Development)

# Start PostgreSQL container
docker run --name task-postgres \
  -e POSTGRES_USER=taskuser \
  -e POSTGRES_PASSWORD=taskpass \
  -e POSTGRES_DB=taskdb \
  -p 5432:5432 \
  -d postgres:15

# Verify it's running
docker ps

Option B: Local Installation

# macOS
brew install postgresql
brew services start postgresql

# Ubuntu/Debian
sudo apt update
sudo apt install postgresql postgresql-contrib

# Create database and user
sudo -u postgres psql
CREATE DATABASE taskdb;
CREATE USER taskuser WITH PASSWORD 'taskpass';
GRANT ALL PRIVILEGES ON DATABASE taskdb TO taskuser;
\q

Install Python Dependencies

Navigate to your full-stack project from the previous tutorial:

cd fullstack-task-app/backend

# Add database dependencies
poetry add sqlalchemy[asyncio] asyncpg alembic

# Add development dependencies
poetry add --group dev pytest-asyncio

Environment Configuration

Create backend/.env:

# Database Configuration
DATABASE_URL=postgresql+asyncpg://taskuser:taskpass@localhost:5432/taskdb
DATABASE_URL_SYNC=postgresql://taskuser:taskpass@localhost:5432/taskdb

# Application Settings
ENV=development
DEBUG=true
SECRET_KEY=your-secret-key-here

# Connection Pool Settings
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30

Create backend/.env.example for team sharing:

DATABASE_URL=postgresql+asyncpg://username:password@localhost:5432/dbname
DATABASE_URL_SYNC=postgresql://username:password@localhost:5432/dbname
ENV=development
DEBUG=true
SECRET_KEY=change-this-in-production
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30


Step 2: Database Models and Schema

Create Database Configuration

Create backend/app/database.py:

import os
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy.pool import NullPool

# Load environment variables
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
    raise ValueError("DATABASE_URL environment variable is required")

# Create async engine with connection pooling
engine = create_async_engine(
    DATABASE_URL,
    echo=os.getenv("DEBUG", "false").lower() == "true",
    pool_size=int(os.getenv("DB_POOL_SIZE", "10")),
    max_overflow=int(os.getenv("DB_MAX_OVERFLOW", "20")),
    pool_timeout=int(os.getenv("DB_POOL_TIMEOUT", "30")),
    # Use NullPool for async if needed
    poolclass=NullPool if os.getenv("ENV") == "test" else None,
)

# Create async session factory
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False,
    autocommit=False,
)

# Database base class
Base = declarative_base()

# Dependency for getting database session
async def get_database_session() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

Create Database Models

Update backend/app/models.py:

from datetime import datetime
from typing import Optional
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True, nullable=False)
    username = Column(String(100), unique=True, index=True, nullable=False)
    full_name = Column(String(255), nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

    # Relationship to tasks
    tasks = relationship("Task", back_populates="owner", cascade="all, delete-orphan")

class Task(Base):
    __tablename__ = "tasks"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(255), nullable=False, index=True)
    description = Column(Text, nullable=True)
    completed = Column(Boolean, default=False, index=True)
    priority = Column(String(20), default="medium", index=True)  # low, medium, high
    due_date = Column(DateTime(timezone=True), nullable=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

    # Foreign key to users
    owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    # Relationship to user
    owner = relationship("User", back_populates="tasks")

    def __repr__(self):
        return f"<Task(id={self.id}, title='{self.title}', completed={self.completed})>"

Create Pydantic Schemas

Create backend/app/schemas.py:

from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field

# User Schemas
class UserBase(BaseModel):
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=100)
    full_name: Optional[str] = None

class UserCreate(UserBase):
    pass

class UserUpdate(BaseModel):
    email: Optional[EmailStr] = None
    username: Optional[str] = Field(None, min_length=3, max_length=100)
    full_name: Optional[str] = None
    is_active: Optional[bool] = None

class User(UserBase):
    id: int
    is_active: bool
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True

# Task Schemas
class TaskBase(BaseModel):
    title: str = Field(..., min_length=1, max_length=255)
    description: Optional[str] = None
    priority: Optional[str] = Field("medium", regex="^(low|medium|high)$")
    due_date: Optional[datetime] = None

class TaskCreate(TaskBase):
    pass

class TaskUpdate(BaseModel):
    title: Optional[str] = Field(None, min_length=1, max_length=255)
    description: Optional[str] = None
    completed: Optional[bool] = None
    priority: Optional[str] = Field(None, regex="^(low|medium|high)$")
    due_date: Optional[datetime] = None

class Task(TaskBase):
    id: int
    completed: bool
    created_at: datetime
    updated_at: datetime
    owner_id: int

    class Config:
        from_attributes = True

class TaskWithOwner(Task):
    owner: User

class UserWithTasks(User):
    tasks: List[Task] = []


Step 3: Database Migrations with Alembic

Initialize Alembic

cd backend

# Initialize Alembic in the project
poetry run alembic init alembic

# This creates an alembic/ directory with configuration

Configure Alembic

Update backend/alembic.ini:

# A generic, single database configuration.

[alembic]
# path to migration scripts
script_location = alembic

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
timezone =

# max length of characters to apply to the
# "slug" field
truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
sourceless = false

# version number format
version_num_format = %04d

# version path separator; default is 'os.pathsep'
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os

# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
recursive_version_locations = false

# the output encoding used when revision files
# are written from script.py.mako
output_encoding = utf-8

sqlalchemy.url = driver://user:pass@localhost/dbname

Update backend/alembic/env.py:

import os
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

# Import your models
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))

from app.database import Base
from app.models import User, Task  # Import all models

# This is the Alembic Config object
config = context.config

# Set the database URL from environment
database_url = os.getenv("DATABASE_URL_SYNC")
if database_url:
    config.set_main_option("sqlalchemy.url", database_url)

# Interpret the config file for Python logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Set target metadata for autogenerate support
target_metadata = Base.metadata

def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection: Connection) -> None:
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations() -> None:
    """Run migrations in 'online' mode."""
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""
    asyncio.run(run_async_migrations())

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Create Initial Migration

# Load environment variables (if using direnv or manual export)
export DATABASE_URL_SYNC="postgresql://taskuser:taskpass@localhost:5432/taskdb"

# Generate initial migration
poetry run alembic revision --autogenerate -m "Initial migration: users and tasks tables"

# Apply migration
poetry run alembic upgrade head

Step 4: Database Service Layer

Create Repository Pattern

Create backend/app/crud.py:

from typing import List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from sqlalchemy import and_, or_, desc, asc
from models import User, Task
from schemas import UserCreate, UserUpdate, TaskCreate, TaskUpdate

class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def create(self, user_data: UserCreate) -> User:
        user = User(**user_data.dict())
        self.session.add(user)
        await self.session.flush()
        await self.session.refresh(user)
        return user

    async def get_by_id(self, user_id: int) -> Optional[User]:
        result = await self.session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> Optional[User]:
        result = await self.session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def get_by_username(self, username: str) -> Optional[User]:
        result = await self.session.execute(
            select(User).where(User.username == username)
        )
        return result.scalar_one_or_none()

    async def update(self, user_id: int, user_data: UserUpdate) -> Optional[User]:
        user = await self.get_by_id(user_id)
        if not user:
            return None

        update_data = user_data.dict(exclude_unset=True)
        for field, value in update_data.items():
            setattr(user, field, value)

        await self.session.flush()
        await self.session.refresh(user)
        return user

    async def delete(self, user_id: int) -> bool:
        user = await self.get_by_id(user_id)
        if not user:
            return False

        await self.session.delete(user)
        await self.session.flush()
        return True

class TaskRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def create(self, task_data: TaskCreate, owner_id: int) -> Task:
        task = Task(**task_data.dict(), owner_id=owner_id)
        self.session.add(task)
        await self.session.flush()
        await self.session.refresh(task)
        return task

    async def get_by_id(self, task_id: int) -> Optional[Task]:
        result = await self.session.execute(
            select(Task)
            .options(selectinload(Task.owner))
            .where(Task.id == task_id)
        )
        return result.scalar_one_or_none()

    async def get_user_tasks(
        self, 
        owner_id: int, 
        skip: int = 0, 
        limit: int = 100,
        completed: Optional[bool] = None,
        priority: Optional[str] = None,
        search: Optional[str] = None
    ) -> List[Task]:
        query = select(Task).where(Task.owner_id == owner_id)

        # Add filters
        if completed is not None:
            query = query.where(Task.completed == completed)

        if priority:
            query = query.where(Task.priority == priority)

        if search:
            query = query.where(
                or_(
                    Task.title.ilike(f"%{search}%"),
                    Task.description.ilike(f"%{search}%")
                )
            )

        # Add ordering and pagination
        query = query.order_by(desc(Task.created_at)).offset(skip).limit(limit)

        result = await self.session.execute(query)
        return result.scalars().all()

    async def update(self, task_id: int, task_data: TaskUpdate) -> Optional[Task]:
        task = await self.get_by_id(task_id)
        if not task:
            return None

        update_data = task_data.dict(exclude_unset=True)
        for field, value in update_data.items():
            setattr(task, field, value)

        await self.session.flush()
        await self.session.refresh(task)
        return task

    async def delete(self, task_id: int) -> bool:
        task = await self.get_by_id(task_id)
        if not task:
            return False

        await self.session.delete(task)
        await self.session.flush()
        return True

    async def get_task_stats(self, owner_id: int) -> dict:
        """Get task statistics for a user"""
        total_query = select(Task).where(Task.owner_id == owner_id)
        completed_query = select(Task).where(
            and_(Task.owner_id == owner_id, Task.completed == True)
        )

        total_result = await self.session.execute(total_query)
        completed_result = await self.session.execute(completed_query)

        total_tasks = len(total_result.scalars().all())
        completed_tasks = len(completed_result.scalars().all())

        return {
            "total_tasks": total_tasks,
            "completed_tasks": completed_tasks,
            "pending_tasks": total_tasks - completed_tasks,
            "completion_rate": completed_tasks / total_tasks if total_tasks > 0 else 0
        }


Step 5: Update FastAPI Application

Update Main Application

Update backend/app/main.py:

import os
from fastapi import FastAPI, Depends, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional

from database import get_database_session
from crud import UserRepository, TaskRepository
from schemas import (
    User, UserCreate, UserUpdate, UserWithTasks,
    Task, TaskCreate, TaskUpdate, TaskWithOwner
)

# Load environment
from dotenv import load_dotenv
load_dotenv()

app = FastAPI(
    title="Task Management API with PostgreSQL",
    description="A production-ready task management API built with FastAPI and PostgreSQL",
    version="2.0.0"
)

# Configure CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173", "http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Health check endpoint
@app.get("/health")
async def health_check():
    return {"status": "healthy", "message": "Task Management API is running"}

# User endpoints
@app.post("/users", response_model=User)
async def create_user(
    user_data: UserCreate,
    session: AsyncSession = Depends(get_database_session)
):
    user_repo = UserRepository(session)

    # Check if user already exists
    existing_user = await user_repo.get_by_email(user_data.email)
    if existing_user:
        raise HTTPException(status_code=400, detail="Email already registered")

    existing_username = await user_repo.get_by_username(user_data.username)
    if existing_username:
        raise HTTPException(status_code=400, detail="Username already taken")

    return await user_repo.create(user_data)

@app.get("/users/{user_id}", response_model=UserWithTasks)
async def get_user(
    user_id: int,
    session: AsyncSession = Depends(get_database_session)
):
    user_repo = UserRepository(session)
    user = await user_repo.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# Task endpoints
@app.post("/users/{user_id}/tasks", response_model=Task)
async def create_task(
    user_id: int,
    task_data: TaskCreate,
    session: AsyncSession = Depends(get_database_session)
):
    user_repo = UserRepository(session)
    task_repo = TaskRepository(session)

    # Verify user exists
    user = await user_repo.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return await task_repo.create(task_data, user_id)

@app.get("/users/{user_id}/tasks", response_model=List[Task])
async def get_user_tasks(
    user_id: int,
    skip: int = Query(0, ge=0),
    limit: int = Query(100, ge=1, le=1000),
    completed: Optional[bool] = Query(None),
    priority: Optional[str] = Query(None, regex="^(low|medium|high)$"),
    search: Optional[str] = Query(None),
    session: AsyncSession = Depends(get_database_session)
):
    user_repo = UserRepository(session)
    task_repo = TaskRepository(session)

    # Verify user exists
    user = await user_repo.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return await task_repo.get_user_tasks(
        owner_id=user_id,
        skip=skip,
        limit=limit,
        completed=completed,
        priority=priority,
        search=search
    )

@app.get("/tasks/{task_id}", response_model=TaskWithOwner)
async def get_task(
    task_id: int,
    session: AsyncSession = Depends(get_database_session)
):
    task_repo = TaskRepository(session)
    task = await task_repo.get_by_id(task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return task

@app.put("/tasks/{task_id}", response_model=Task)
async def update_task(
    task_id: int,
    task_data: TaskUpdate,
    session: AsyncSession = Depends(get_database_session)
):
    task_repo = TaskRepository(session)
    task = await task_repo.update(task_id, task_data)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return task

@app.delete("/tasks/{task_id}")
async def delete_task(
    task_id: int,
    session: AsyncSession = Depends(get_database_session)
):
    task_repo = TaskRepository(session)
    deleted = await task_repo.delete(task_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Task not found")
    return {"message": "Task deleted successfully"}

@app.get("/users/{user_id}/tasks/stats")
async def get_task_stats(
    user_id: int,
    session: AsyncSession = Depends(get_database_session)
):
    user_repo = UserRepository(session)
    task_repo = TaskRepository(session)

    # Verify user exists
    user = await user_repo.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return await task_repo.get_task_stats(user_id)

# Legacy endpoints for compatibility with frontend
@app.get("/", response_model=dict)
async def root():
    return {"message": "Task Management API with PostgreSQL", "version": "2.0.0"}

# For demo purposes - create a default user and return their tasks
@app.get("/tasks", response_model=List[Task])
async def get_demo_tasks(
    session: AsyncSession = Depends(get_database_session)
):
    user_repo = UserRepository(session)
    task_repo = TaskRepository(session)

    # Get or create demo user
    demo_user = await user_repo.get_by_username("demo")
    if not demo_user:
        demo_user_data = UserCreate(
            email="[email protected]",
            username="demo",
            full_name="Demo User"
        )
        demo_user = await user_repo.create(demo_user_data)

    return await task_repo.get_user_tasks(demo_user.id)

@app.post("/tasks", response_model=Task)
async def create_demo_task(
    task_data: TaskCreate,
    session: AsyncSession = Depends(get_database_session)
):
    user_repo = UserRepository(session)
    task_repo = TaskRepository(session)

    # Get or create demo user
    demo_user = await user_repo.get_by_username("demo")
    if not demo_user:
        demo_user_data = UserCreate(
            email="[email protected]",
            username="demo",
            full_name="Demo User"
        )
        demo_user = await user_repo.create(demo_user_data)

    return await task_repo.create(task_data, demo_user.id)


Step 6: Test the Database Integration

Start the Application

# Make sure PostgreSQL is running
docker ps  # Check if container is running

# Start FastAPI with database
cd backend
poetry run uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

Test Database Operations

  1. Check API Documentation: Visit http://localhost:8000/docs

  2. Test User Creation:

    curl -X POST "http://localhost:8000/users" \
      -H "Content-Type: application/json" \
      -d '{
        "email": "[email protected]",
        "username": "john_doe",
        "full_name": "John Doe"
      }'
    

  3. Test Task Creation:

    curl -X POST "http://localhost:8000/users/1/tasks" \
      -H "Content-Type: application/json" \
      -d '{
        "title": "Complete PostgreSQL Tutorial",
        "description": "Learn async database operations",
        "priority": "high"
      }'
    

  4. Test Task Filtering:

    # Get high priority tasks
    curl "http://localhost:8000/users/1/tasks?priority=high"
    
    # Search tasks
    curl "http://localhost:8000/users/1/tasks?search=PostgreSQL"
    

Verify Database State

# Connect to PostgreSQL
docker exec -it task-postgres psql -U taskuser -d taskdb

# Check tables
\dt

# View users
SELECT * FROM users;

# View tasks with owners
SELECT t.title, t.priority, u.username 
FROM tasks t 
JOIN users u ON t.owner_id = u.id;

# Exit
\q

Step 7: Production Configuration

Environment Management

Create backend/app/config.py:

import os
from typing import Optional
from pydantic import BaseSettings, validator

class Settings(BaseSettings):
    # Application
    app_name: str = "Task Management API"
    debug: bool = False
    environment: str = "production"

    # Database
    database_url: str
    database_url_sync: str

    # Connection Pool
    db_pool_size: int = 10
    db_max_overflow: int = 20
    db_pool_timeout: int = 30
    db_pool_recycle: int = 3600

    # Security
    secret_key: str
    access_token_expire_minutes: int = 30

    # CORS
    cors_origins: list = ["http://localhost:3000", "http://localhost:5173"]

    @validator("database_url", pre=True)
    def validate_database_url(cls, v):
        if not v:
            raise ValueError("DATABASE_URL is required")
        return v

    class Config:
        env_file = ".env"
        case_sensitive = False

settings = Settings()

Production Database Optimizations

Update backend/app/database.py for production:

import os
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy.pool import QueuePool
from app.config import settings

# Create production-optimized engine
engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,

    # Connection pooling
    pool_size=settings.db_pool_size,
    max_overflow=settings.db_max_overflow,
    pool_timeout=settings.db_pool_timeout,
    pool_recycle=settings.db_pool_recycle,
    poolclass=QueuePool,

    # Performance optimizations
    pool_pre_ping=True,  # Validate connections
    pool_reset_on_return="commit",  # Reset connections

    # Connection arguments
    connect_args={
        "server_settings": {
            "application_name": settings.app_name,
            "jit": "off",  # Disable JIT for better connection times
        }
    }
)

# Session factory with optimizations
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False,
    autocommit=False,
)

Base = declarative_base()

async def get_database_session() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception as e:
            await session.rollback()
            # Log error in production
            if settings.environment == "production":
                import logging
                logging.error(f"Database error: {e}")
            raise
        finally:
            await session.close()

Database Backup Strategy

Create backend/scripts/backup_db.sh:

#!/bin/bash

# Database backup script
DB_NAME="taskdb"
DB_USER="taskuser"
DB_HOST="localhost"
DB_PORT="5432"
BACKUP_DIR="/backups"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")

# Create backup directory
mkdir -p $BACKUP_DIR

# Create backup
pg_dump -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME \
  --no-password --clean --if-exists \
  -f "${BACKUP_DIR}/taskdb_backup_${TIMESTAMP}.sql"

# Compress backup
gzip "${BACKUP_DIR}/taskdb_backup_${TIMESTAMP}.sql"

# Remove backups older than 7 days
find $BACKUP_DIR -name "taskdb_backup_*.sql.gz" -mtime +7 -delete

echo "Backup completed: taskdb_backup_${TIMESTAMP}.sql.gz"


Troubleshooting

Common Issues & Solutions

Connection Errors:

# Check PostgreSQL status
docker ps

# Check logs
docker logs task-postgres

# Test connection
poetry run python -c "
import asyncio
from app.database import engine
async def test():
    async with engine.begin() as conn:
        result = await conn.execute('SELECT 1')
        print('Connection successful:', result.scalar())
asyncio.run(test())
"

Migration Issues:

# Check current migration status
poetry run alembic current

# View migration history
poetry run alembic history

# Rollback migration
poetry run alembic downgrade -1

# Reset database (development only)
poetry run alembic downgrade base
poetry run alembic upgrade head

Performance Issues:

-- Check active connections
SELECT count(*) FROM pg_stat_activity WHERE datname = 'taskdb';

-- Check slow queries
SELECT query, mean_exec_time, calls 
FROM pg_stat_statements 
ORDER BY mean_exec_time DESC 
LIMIT 5;

-- Create indexes for better performance
CREATE INDEX idx_tasks_owner_id ON tasks(owner_id);
CREATE INDEX idx_tasks_completed ON tasks(completed);
CREATE INDEX idx_tasks_priority ON tasks(priority);

Environment Issues:

# Load environment variables
export DATABASE_URL="postgresql+asyncpg://taskuser:taskpass@localhost:5432/taskdb"
export DATABASE_URL_SYNC="postgresql://taskuser:taskpass@localhost:5432/taskdb"

# Test environment loading
poetry run python -c "from app.config import settings; print(settings.database_url)"


What You've Accomplished

Congratulations! You've successfully integrated PostgreSQL with FastAPI featuring:

  • Async PostgreSQL integration with SQLAlchemy 2.0 and asyncpg driver
  • Database modeling with relationships, constraints, and proper schema design
  • Migration management using Alembic for version control
  • Repository pattern for clean separation of database logic
  • Advanced querying with filtering, searching, and pagination
  • Production configuration with connection pooling and optimization
  • Error handling and transaction management
  • Database testing and debugging capabilities

Next Steps

Immediate Enhancements:

  1. Add authentication - Secure your endpoints with JWT tokens
  2. Implement caching - Use Redis for better performance
  3. Add full-text search - PostgreSQL FTS or Elasticsearch
  4. Create database indexes - Optimize query performance
  5. Add database monitoring - Track performance metrics

Advanced Features:

  1. Database replication - Master-slave setup for high availability
  2. Connection pooling optimization - PgBouncer or built-in pooling
  3. Database sharding - Horizontal scaling strategies
  4. Backup automation - Scheduled backups and point-in-time recovery
  5. Query optimization - Analyze and optimize slow queries

Production Deployment:

  • Managed PostgreSQL: AWS RDS, Google Cloud SQL, or Railway PostgreSQL
  • Container orchestration: Docker Compose or Kubernetes
  • Environment management: Separate dev/staging/production databases
  • Monitoring: Database performance and health monitoring

Pro Tips for Production PostgreSQL

Performance Optimization:

  1. Use connection pooling - Reduce connection overhead
  2. Create proper indexes - Speed up common queries
  3. Monitor query performance - Identify and fix slow queries
  4. Use EXPLAIN ANALYZE - Understand query execution plans
  5. Configure PostgreSQL settings - Tune for your workload

Security Best Practices:

  1. Use environment variables - Never hardcode credentials
  2. Create limited-privilege users - Principle of least privilege
  3. Enable SSL connections - Encrypt data in transit
  4. Regular security updates - Keep PostgreSQL updated
  5. Audit database access - Log and monitor connections

Ready to build production-ready database applications? You now have the foundation to create scalable, performant applications with PostgreSQL and FastAPI. Your apps can handle real-world data persistence with confidence!