FastAPI Database Patterns — Deep Dive

Async SQLAlchemy 2.0 setup

SQLAlchemy 2.0 introduced native async support through asyncpg (PostgreSQL) or aiosqlite (SQLite). The setup:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=3600,
    pool_pre_ping=True,
    echo=False,
)

async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Key configuration decisions:

  • expire_on_commit=False: Prevents SQLAlchemy from lazily loading attributes after commit, which would fail in async context without an active session
  • pool_pre_ping=True: Tests connections before use, handling stale connections from database restarts
  • pool_recycle=3600: Recycles connections after 1 hour, preventing issues with database-side connection timeouts

Connection pool tuning

The pool manages a fixed set of database connections shared across all requests:

pool_size=20       # Persistent connections kept open
max_overflow=10    # Additional connections created under load (closed when idle)
# Total max: pool_size + max_overflow = 30

Sizing formula: Each Uvicorn worker has its own pool. With 4 workers and pool_size=20, you need 80 database connections minimum. PostgreSQL’s default max_connections=100 means you’re already close to the limit.

Required connections = workers × (pool_size + max_overflow)
                     = 4 × (20 + 10) = 120

Solutions:

  • Reduce pool_size per worker
  • Use PgBouncer as a connection pooler between your app and PostgreSQL
  • Increase PostgreSQL’s max_connections (but each connection costs ~5-10MB RAM)

PgBouncer is the production standard. It pools connections across all application workers, letting you run hundreds of workers with a fraction of the database connections:

App Workers (4×20=80 connections) → PgBouncer (20 connections) → PostgreSQL

Repository pattern with async SQLAlchemy

from sqlalchemy import select, func
from sqlalchemy.orm import selectinload

class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self.session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> User | None:
        result = await self.session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def list_with_orders(
        self, skip: int = 0, limit: int = 20
    ) -> list[User]:
        result = await self.session.execute(
            select(User)
            .options(selectinload(User.orders))
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())

    async def create(self, **kwargs) -> User:
        user = User(**kwargs)
        self.session.add(user)
        await self.session.flush()  # Get the ID without committing
        return user

    async def count(self) -> int:
        result = await self.session.execute(select(func.count(User.id)))
        return result.scalar_one()

# Dependency
async def get_user_repo(db: AsyncSession = Depends(get_db)) -> UserRepository:
    return UserRepository(db)

# Route
@app.get("/users/{user_id}")
async def get_user(user_id: int, repo: UserRepository = Depends(get_user_repo)):
    user = await repo.get_by_id(user_id)
    if not user:
        raise HTTPException(404)
    return user

The flush() in create writes to the database within the transaction (getting auto-generated IDs) without committing. The session’s commit happens in get_db’s cleanup.

Unit of Work pattern

For operations spanning multiple repositories:

class UnitOfWork:
    def __init__(self, session: AsyncSession):
        self.session = session
        self.users = UserRepository(session)
        self.orders = OrderRepository(session)
        self.products = ProductRepository(session)

    async def commit(self):
        await self.session.commit()

    async def rollback(self):
        await self.session.rollback()

async def get_uow(db: AsyncSession = Depends(get_db)) -> UnitOfWork:
    return UnitOfWork(db)

@app.post("/orders")
async def create_order(
    order_in: OrderCreate,
    uow: UnitOfWork = Depends(get_uow),
):
    user = await uow.users.get_by_id(order_in.user_id)
    product = await uow.products.get_by_id(order_in.product_id)

    if product.stock < order_in.quantity:
        raise HTTPException(400, "Insufficient stock")

    product.stock -= order_in.quantity
    order = await uow.orders.create(
        user_id=user.id,
        product_id=product.id,
        quantity=order_in.quantity,
    )
    # Commit happens in get_db cleanup, ensuring both
    # the stock reduction and order creation are atomic
    return order

Multi-tenancy patterns

Schema-per-tenant (PostgreSQL schemas)

from sqlalchemy import event, text

def get_tenant_session(tenant_id: str):
    async def dependency(db: AsyncSession = Depends(get_db)):
        schema = f"tenant_{tenant_id}"
        await db.execute(text(f"SET search_path TO {schema}, public"))
        yield db
        await db.execute(text("SET search_path TO public"))
    return dependency

# Dynamic dependency based on request header
async def get_tenant_db(
    request: Request,
    db: AsyncSession = Depends(get_db),
):
    tenant_id = request.headers.get("X-Tenant-ID")
    if not tenant_id:
        raise HTTPException(400, "Missing tenant ID")
    schema = f"tenant_{tenant_id}"
    await db.execute(text(f"SET search_path TO {schema}, public"))
    return db

Row-level tenancy (shared schema)

class TenantMixin:
    tenant_id: Mapped[int] = mapped_column(ForeignKey("tenants.id"), index=True)

class Item(Base, TenantMixin):
    __tablename__ = "items"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]

# Repository automatically filters by tenant
class TenantItemRepository:
    def __init__(self, session: AsyncSession, tenant_id: int):
        self.session = session
        self.tenant_id = tenant_id

    async def list_all(self) -> list[Item]:
        result = await self.session.execute(
            select(Item).where(Item.tenant_id == self.tenant_id)
        )
        return list(result.scalars().all())

Schema-per-tenant provides stronger isolation but complicates migrations (you run Alembic against every schema). Row-level tenancy is simpler but requires careful query discipline — a missing WHERE tenant_id = ? leaks data across tenants.

Read replica routing

write_engine = create_async_engine("postgresql+asyncpg://primary-host/db")
read_engine = create_async_engine("postgresql+asyncpg://replica-host/db")

write_session_factory = async_sessionmaker(write_engine, class_=AsyncSession)
read_session_factory = async_sessionmaker(read_engine, class_=AsyncSession)

async def get_write_db() -> AsyncGenerator[AsyncSession, None]:
    async with write_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

async def get_read_db() -> AsyncGenerator[AsyncSession, None]:
    async with read_session_factory() as session:
        yield session  # No commit needed for reads

@app.get("/items")
async def list_items(db: AsyncSession = Depends(get_read_db)):
    ...

@app.post("/items")
async def create_item(db: AsyncSession = Depends(get_write_db)):
    ...

Be aware of replication lag. After a write, an immediate read from the replica might not see the new data. For read-after-write consistency, read from the primary within a short window (e.g., 5 seconds) after writes.

Soft deletes

from datetime import datetime
from sqlalchemy import Column, DateTime

class SoftDeleteMixin:
    deleted_at: Mapped[datetime | None] = mapped_column(DateTime, default=None, index=True)

    @property
    def is_deleted(self) -> bool:
        return self.deleted_at is not None

class Item(Base, SoftDeleteMixin):
    __tablename__ = "items"
    ...

# Repository filters out soft-deleted records by default
class ItemRepository:
    async def list_active(self) -> list[Item]:
        result = await self.session.execute(
            select(Item).where(Item.deleted_at.is_(None))
        )
        return list(result.scalars().all())

    async def soft_delete(self, item_id: int):
        item = await self.get_by_id(item_id)
        item.deleted_at = datetime.now(timezone.utc)
        await self.session.flush()

Add a partial index on deleted_at IS NULL in PostgreSQL for efficient queries that filter to active records.

Bulk operations

For inserting or updating thousands of records, SQLAlchemy’s ORM approach (creating individual objects) is slow. Use Core operations:

from sqlalchemy import insert

async def bulk_create_items(session: AsyncSession, items: list[dict]):
    await session.execute(
        insert(Item),
        items,  # List of dicts matching column names
    )
    await session.flush()

Core insert bypasses ORM overhead (identity map, event hooks) and can be 10-50x faster for bulk operations. For even higher throughput, use PostgreSQL’s COPY command via asyncpg directly.

Query optimization patterns

Pagination with keyset (cursor) instead of OFFSET:

async def list_items_cursor(
    session: AsyncSession,
    after_id: int | None = None,
    limit: int = 20,
) -> list[Item]:
    query = select(Item).order_by(Item.id).limit(limit)
    if after_id:
        query = query.where(Item.id > after_id)
    result = await session.execute(query)
    return list(result.scalars().all())

OFFSET-based pagination scans and discards rows (OFFSET 10000 reads 10,000 rows). Keyset pagination uses an index seek — consistently fast regardless of page depth.

Batch loading with selectinload:

# N+1 problem: 1 query for orders + N queries for order items
orders = await session.execute(select(Order))

# Fixed: 2 queries total regardless of N
orders = await session.execute(
    select(Order).options(selectinload(Order.items))
)

selectinload executes a second SELECT ... WHERE id IN (...) query. joinedload uses a JOIN — better for one-to-one, worse for one-to-many (row duplication).

The one thing to remember: Production FastAPI database architecture needs PgBouncer for connection pooling across workers, async SQLAlchemy 2.0 for high concurrency, the repository + unit of work pattern for maintainability, and keyset pagination + eager loading as the first performance optimizations when queries slow down.

pythonwebapisdatabases

See Also

  • Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
  • Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
  • Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
  • Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
  • Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.