FastAPI Database Patterns — Deep Dive
Async SQLAlchemy 2.0 setup
SQLAlchemy 2.0 introduced native async support through asyncpg (PostgreSQL) or aiosqlite (SQLite). The setup:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True,
echo=False,
)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Key configuration decisions:
expire_on_commit=False: Prevents SQLAlchemy from lazily loading attributes after commit, which would fail in async context without an active sessionpool_pre_ping=True: Tests connections before use, handling stale connections from database restartspool_recycle=3600: Recycles connections after 1 hour, preventing issues with database-side connection timeouts
Connection pool tuning
The pool manages a fixed set of database connections shared across all requests:
pool_size=20 # Persistent connections kept open
max_overflow=10 # Additional connections created under load (closed when idle)
# Total max: pool_size + max_overflow = 30
Sizing formula: Each Uvicorn worker has its own pool. With 4 workers and pool_size=20, you need 80 database connections minimum. PostgreSQL’s default max_connections=100 means you’re already close to the limit.
Required connections = workers × (pool_size + max_overflow)
= 4 × (20 + 10) = 120
Solutions:
- Reduce
pool_sizeper worker - Use PgBouncer as a connection pooler between your app and PostgreSQL
- Increase PostgreSQL’s
max_connections(but each connection costs ~5-10MB RAM)
PgBouncer is the production standard. It pools connections across all application workers, letting you run hundreds of workers with a fraction of the database connections:
App Workers (4×20=80 connections) → PgBouncer (20 connections) → PostgreSQL
Repository pattern with async SQLAlchemy
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: int) -> User | None:
result = await self.session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> User | None:
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def list_with_orders(
self, skip: int = 0, limit: int = 20
) -> list[User]:
result = await self.session.execute(
select(User)
.options(selectinload(User.orders))
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
async def create(self, **kwargs) -> User:
user = User(**kwargs)
self.session.add(user)
await self.session.flush() # Get the ID without committing
return user
async def count(self) -> int:
result = await self.session.execute(select(func.count(User.id)))
return result.scalar_one()
# Dependency
async def get_user_repo(db: AsyncSession = Depends(get_db)) -> UserRepository:
return UserRepository(db)
# Route
@app.get("/users/{user_id}")
async def get_user(user_id: int, repo: UserRepository = Depends(get_user_repo)):
user = await repo.get_by_id(user_id)
if not user:
raise HTTPException(404)
return user
The flush() in create writes to the database within the transaction (getting auto-generated IDs) without committing. The session’s commit happens in get_db’s cleanup.
Unit of Work pattern
For operations spanning multiple repositories:
class UnitOfWork:
def __init__(self, session: AsyncSession):
self.session = session
self.users = UserRepository(session)
self.orders = OrderRepository(session)
self.products = ProductRepository(session)
async def commit(self):
await self.session.commit()
async def rollback(self):
await self.session.rollback()
async def get_uow(db: AsyncSession = Depends(get_db)) -> UnitOfWork:
return UnitOfWork(db)
@app.post("/orders")
async def create_order(
order_in: OrderCreate,
uow: UnitOfWork = Depends(get_uow),
):
user = await uow.users.get_by_id(order_in.user_id)
product = await uow.products.get_by_id(order_in.product_id)
if product.stock < order_in.quantity:
raise HTTPException(400, "Insufficient stock")
product.stock -= order_in.quantity
order = await uow.orders.create(
user_id=user.id,
product_id=product.id,
quantity=order_in.quantity,
)
# Commit happens in get_db cleanup, ensuring both
# the stock reduction and order creation are atomic
return order
Multi-tenancy patterns
Schema-per-tenant (PostgreSQL schemas)
from sqlalchemy import event, text
def get_tenant_session(tenant_id: str):
async def dependency(db: AsyncSession = Depends(get_db)):
schema = f"tenant_{tenant_id}"
await db.execute(text(f"SET search_path TO {schema}, public"))
yield db
await db.execute(text("SET search_path TO public"))
return dependency
# Dynamic dependency based on request header
async def get_tenant_db(
request: Request,
db: AsyncSession = Depends(get_db),
):
tenant_id = request.headers.get("X-Tenant-ID")
if not tenant_id:
raise HTTPException(400, "Missing tenant ID")
schema = f"tenant_{tenant_id}"
await db.execute(text(f"SET search_path TO {schema}, public"))
return db
Row-level tenancy (shared schema)
class TenantMixin:
tenant_id: Mapped[int] = mapped_column(ForeignKey("tenants.id"), index=True)
class Item(Base, TenantMixin):
__tablename__ = "items"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
# Repository automatically filters by tenant
class TenantItemRepository:
def __init__(self, session: AsyncSession, tenant_id: int):
self.session = session
self.tenant_id = tenant_id
async def list_all(self) -> list[Item]:
result = await self.session.execute(
select(Item).where(Item.tenant_id == self.tenant_id)
)
return list(result.scalars().all())
Schema-per-tenant provides stronger isolation but complicates migrations (you run Alembic against every schema). Row-level tenancy is simpler but requires careful query discipline — a missing WHERE tenant_id = ? leaks data across tenants.
Read replica routing
write_engine = create_async_engine("postgresql+asyncpg://primary-host/db")
read_engine = create_async_engine("postgresql+asyncpg://replica-host/db")
write_session_factory = async_sessionmaker(write_engine, class_=AsyncSession)
read_session_factory = async_sessionmaker(read_engine, class_=AsyncSession)
async def get_write_db() -> AsyncGenerator[AsyncSession, None]:
async with write_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def get_read_db() -> AsyncGenerator[AsyncSession, None]:
async with read_session_factory() as session:
yield session # No commit needed for reads
@app.get("/items")
async def list_items(db: AsyncSession = Depends(get_read_db)):
...
@app.post("/items")
async def create_item(db: AsyncSession = Depends(get_write_db)):
...
Be aware of replication lag. After a write, an immediate read from the replica might not see the new data. For read-after-write consistency, read from the primary within a short window (e.g., 5 seconds) after writes.
Soft deletes
from datetime import datetime
from sqlalchemy import Column, DateTime
class SoftDeleteMixin:
deleted_at: Mapped[datetime | None] = mapped_column(DateTime, default=None, index=True)
@property
def is_deleted(self) -> bool:
return self.deleted_at is not None
class Item(Base, SoftDeleteMixin):
__tablename__ = "items"
...
# Repository filters out soft-deleted records by default
class ItemRepository:
async def list_active(self) -> list[Item]:
result = await self.session.execute(
select(Item).where(Item.deleted_at.is_(None))
)
return list(result.scalars().all())
async def soft_delete(self, item_id: int):
item = await self.get_by_id(item_id)
item.deleted_at = datetime.now(timezone.utc)
await self.session.flush()
Add a partial index on deleted_at IS NULL in PostgreSQL for efficient queries that filter to active records.
Bulk operations
For inserting or updating thousands of records, SQLAlchemy’s ORM approach (creating individual objects) is slow. Use Core operations:
from sqlalchemy import insert
async def bulk_create_items(session: AsyncSession, items: list[dict]):
await session.execute(
insert(Item),
items, # List of dicts matching column names
)
await session.flush()
Core insert bypasses ORM overhead (identity map, event hooks) and can be 10-50x faster for bulk operations. For even higher throughput, use PostgreSQL’s COPY command via asyncpg directly.
Query optimization patterns
Pagination with keyset (cursor) instead of OFFSET:
async def list_items_cursor(
session: AsyncSession,
after_id: int | None = None,
limit: int = 20,
) -> list[Item]:
query = select(Item).order_by(Item.id).limit(limit)
if after_id:
query = query.where(Item.id > after_id)
result = await session.execute(query)
return list(result.scalars().all())
OFFSET-based pagination scans and discards rows (OFFSET 10000 reads 10,000 rows). Keyset pagination uses an index seek — consistently fast regardless of page depth.
Batch loading with selectinload:
# N+1 problem: 1 query for orders + N queries for order items
orders = await session.execute(select(Order))
# Fixed: 2 queries total regardless of N
orders = await session.execute(
select(Order).options(selectinload(Order.items))
)
selectinload executes a second SELECT ... WHERE id IN (...) query. joinedload uses a JOIN — better for one-to-one, worse for one-to-many (row duplication).
The one thing to remember: Production FastAPI database architecture needs PgBouncer for connection pooling across workers, async SQLAlchemy 2.0 for high concurrency, the repository + unit of work pattern for maintainability, and keyset pagination + eager loading as the first performance optimizations when queries slow down.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.