Flask-SQLAlchemy Patterns — Deep Dive
Custom base model
Flask-SQLAlchemy 3.x lets you define a custom base class with shared columns and methods:
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from datetime import datetime, timezone
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(
default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc)
)
db = SQLAlchemy(model_class=Base)
class User(TimestampMixin, db.Model):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True, index=True)
The mixin approach adds created_at and updated_at to every model without repetition. The mapped_column with Mapped type hints is SQLAlchemy 2.0’s declarative style — it replaces the older Column() syntax and provides better IDE support and type checking.
Session lifecycle under the hood
Flask-SQLAlchemy’s db.session is a scoped session. Here’s the full lifecycle:
Request arrives
→ Flask pushes app context
→ Flask-SQLAlchemy creates session (bound to context)
→ View function runs, using db.session
→ View returns response
→ Flask tears down app context
→ Flask-SQLAlchemy calls session.remove()
→ If uncommitted changes: they're discarded (not committed)
→ Connection returned to pool
The session.remove() during teardown is critical. It:
- Rolls back any uncommitted transaction
- Closes the underlying connection
- Returns the connection to the pool
- Resets the scoped session so the next request gets a fresh one
If you forget to commit, changes silently disappear. This catches beginners who expect auto-commit behavior.
Connection pooling configuration
SQLAlchemy maintains a connection pool per engine. Flask-SQLAlchemy exposes pool settings through config:
app.config.update(
SQLALCHEMY_ENGINE_OPTIONS={
'pool_size': 10, # Permanent connections
'max_overflow': 20, # Temporary burst connections
'pool_timeout': 30, # Wait time for available connection
'pool_recycle': 1800, # Recycle connections after 30 min
'pool_pre_ping': True, # Verify connection before use
}
)
pool_pre_ping is essential for production. Without it, stale connections (killed by database timeout or network issues) cause OperationalError on the first query after idle periods. The pre-ping issues a lightweight SELECT 1 before handing out a connection.
pool_recycle prevents the database from killing long-lived connections. MySQL’s default wait_timeout is 8 hours; setting pool_recycle to less than that ensures SQLAlchemy refreshes connections before the database drops them.
Bulk operations
ORM-style inserts are slow for large datasets because each object goes through the full identity map:
# Slow: 1000 individual INSERTs
for data in dataset:
db.session.add(User(**data))
db.session.commit()
# Fast: bulk INSERT with Core
db.session.execute(
User.__table__.insert(),
[{'email': d['email'], 'name': d['name']} for d in dataset]
)
db.session.commit()
# Middle ground: ORM bulk save (skips some ORM overhead)
db.session.bulk_save_objects([User(**d) for d in dataset])
db.session.commit()
Performance comparison for 10,000 rows (typical PostgreSQL):
- Individual adds: ~8 seconds
bulk_save_objects: ~1.5 seconds- Core
insert(): ~0.3 seconds - Core
insert()withexecutemany_mode='values_plus_batch': ~0.1 seconds
The tradeoff: Core inserts skip ORM features (defaults, events, relationships). Use ORM for single objects and small batches; Core for bulk data loading.
Polymorphic models
Single table inheritance maps multiple Python classes to one database table:
class Notification(db.Model):
__tablename__ = 'notifications'
id: Mapped[int] = mapped_column(primary_key=True)
type: Mapped[str] = mapped_column(String(50))
message: Mapped[str]
__mapper_args__ = {
'polymorphic_on': type,
'polymorphic_identity': 'notification'
}
class EmailNotification(Notification):
email_address: Mapped[str | None] = mapped_column(String(255))
__mapper_args__ = {
'polymorphic_identity': 'email'
}
class SMSNotification(Notification):
phone_number: Mapped[str | None] = mapped_column(String(20))
__mapper_args__ = {
'polymorphic_identity': 'sms'
}
Querying Notification.query.all() returns a mix of EmailNotification and SMSNotification objects, automatically instantiated based on the type column. This avoids JOIN overhead of joined table inheritance while keeping a clean Python API.
Tradeoff: Subclass-specific columns must be nullable (since other subclasses don’t use them), wasting space and losing database-level NOT NULL constraints.
Soft delete pattern
Instead of deleting rows, mark them as deleted:
class SoftDeleteMixin:
deleted_at: Mapped[datetime | None] = mapped_column(default=None)
@classmethod
def query_active(cls):
return cls.query.filter(cls.deleted_at.is_(None))
def soft_delete(self):
self.deleted_at = datetime.now(timezone.utc)
For automatic filtering, use SQLAlchemy events:
from sqlalchemy import event
@event.listens_for(db.session, 'do_orm_execute')
def filter_soft_deleted(execute_state):
if execute_state.is_select:
execute_state.statement = execute_state.statement.options(
with_loader_criteria(SoftDeleteMixin,
lambda cls: cls.deleted_at.is_(None))
)
This transparently excludes soft-deleted records from all queries unless explicitly overridden.
Optimistic locking
Prevent concurrent updates from overwriting each other:
class Product(db.Model):
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
stock: Mapped[int]
version_id: Mapped[int] = mapped_column(default=1)
__mapper_args__ = {
'version_id_col': version_id
}
SQLAlchemy automatically includes WHERE version_id = <expected> in UPDATE statements. If another process changed the row, the UPDATE affects zero rows and SQLAlchemy raises StaleDataError. Your code catches this and retries or reports a conflict.
Repository pattern with Flask-SQLAlchemy
Abstracting database access behind repositories keeps view functions clean and makes testing easier:
class UserRepository:
def __init__(self, session):
self.session = session
def get_by_id(self, user_id: int) -> User | None:
return self.session.get(User, user_id)
def get_by_email(self, email: str) -> User | None:
return self.session.execute(
select(User).filter_by(email=email)
).scalar_one_or_none()
def list_active(self, page: int = 1, per_page: int = 20):
stmt = (
select(User)
.filter_by(is_active=True)
.order_by(User.created_at.desc())
.offset((page - 1) * per_page)
.limit(per_page)
)
return self.session.execute(stmt).scalars().all()
def save(self, user: User) -> User:
self.session.add(user)
self.session.flush() # Get ID without committing
return user
Views inject the repository (or create it with db.session), and tests can substitute a fake repository that returns predetermined objects.
Migration strategies with Alembic
Flask-Migrate wraps Alembic for database migrations. Production-safe patterns:
Non-locking column additions
def upgrade():
# PostgreSQL: ADD COLUMN with default doesn't lock the table (PG 11+)
op.add_column('users', sa.Column('role', sa.String(50),
server_default='member'))
Data migrations separate from schema
def upgrade():
# Schema change
op.add_column('users', sa.Column('full_name', sa.String(255)))
# Data migration in a separate step
connection = op.get_bind()
connection.execute(
text("UPDATE users SET full_name = first_name || ' ' || last_name")
)
Safe column removal (three-phase)
- Deploy code that stops reading the column — Remove all references in application code
- Run migration to drop the column —
op.drop_column('users', 'legacy_field') - Deploy cleanup — Remove the migration’s reverse operation if you don’t need rollback
Dropping a column while code still reads it causes 500 errors during deployment. The three-phase approach eliminates that window.
Query debugging
Enable query recording to catch performance issues during development:
app.config['SQLALCHEMY_RECORD_QUERIES'] = True
@app.after_request
def log_slow_queries(response):
for query in get_recorded_queries():
if query.duration >= 0.5: # 500ms threshold
app.logger.warning(
f'Slow query ({query.duration:.2f}s): {query.statement}'
)
return response
For production, use SQLAlchemy’s event system to emit metrics:
from sqlalchemy import event
import time
@event.listens_for(db.engine, 'before_cursor_execute')
def before_cursor(conn, cursor, statement, parameters, context, executemany):
conn.info['query_start'] = time.monotonic()
@event.listens_for(db.engine, 'after_cursor_execute')
def after_cursor(conn, cursor, statement, parameters, context, executemany):
elapsed = time.monotonic() - conn.info['query_start']
metrics.histogram('db.query.duration', elapsed)
One thing to remember: Flask-SQLAlchemy’s session is request-scoped and auto-cleaned. Every performance and correctness pattern flows from this: eager load to minimize queries within the request window, commit explicitly because auto-commit doesn’t exist, and use connection pooling to survive between requests.
See Also
- Python Django Admin Get an intuitive feel for Django Admin so Python behavior stops feeling unpredictable.
- Python Django Basics Get an intuitive feel for Django Basics so Python behavior stops feeling unpredictable.
- Python Django Celery Integration Why your Django app needs a helper to handle slow jobs in the background.
- Python Django Channels Websockets How Django can send real-time updates to your browser without you refreshing the page.
- Python Django Custom Management Commands How to teach Django new tricks by creating your own command-line shortcuts.