Flask-SQLAlchemy Patterns — Deep Dive

Custom base model

Flask-SQLAlchemy 3.x lets you define a custom base class with shared columns and methods:

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from datetime import datetime, timezone

class Base(DeclarativeBase):
    pass

class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(
        default=lambda: datetime.now(timezone.utc)
    )
    updated_at: Mapped[datetime] = mapped_column(
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc)
    )

db = SQLAlchemy(model_class=Base)

class User(TimestampMixin, db.Model):
    __tablename__ = 'users'
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(unique=True, index=True)

The mixin approach adds created_at and updated_at to every model without repetition. The mapped_column with Mapped type hints is SQLAlchemy 2.0’s declarative style — it replaces the older Column() syntax and provides better IDE support and type checking.

Session lifecycle under the hood

Flask-SQLAlchemy’s db.session is a scoped session. Here’s the full lifecycle:

Request arrives
  → Flask pushes app context
  → Flask-SQLAlchemy creates session (bound to context)
  → View function runs, using db.session
  → View returns response
  → Flask tears down app context
  → Flask-SQLAlchemy calls session.remove()
    → If uncommitted changes: they're discarded (not committed)
    → Connection returned to pool

The session.remove() during teardown is critical. It:

  1. Rolls back any uncommitted transaction
  2. Closes the underlying connection
  3. Returns the connection to the pool
  4. Resets the scoped session so the next request gets a fresh one

If you forget to commit, changes silently disappear. This catches beginners who expect auto-commit behavior.

Connection pooling configuration

SQLAlchemy maintains a connection pool per engine. Flask-SQLAlchemy exposes pool settings through config:

app.config.update(
    SQLALCHEMY_ENGINE_OPTIONS={
        'pool_size': 10,          # Permanent connections
        'max_overflow': 20,       # Temporary burst connections
        'pool_timeout': 30,       # Wait time for available connection
        'pool_recycle': 1800,     # Recycle connections after 30 min
        'pool_pre_ping': True,    # Verify connection before use
    }
)

pool_pre_ping is essential for production. Without it, stale connections (killed by database timeout or network issues) cause OperationalError on the first query after idle periods. The pre-ping issues a lightweight SELECT 1 before handing out a connection.

pool_recycle prevents the database from killing long-lived connections. MySQL’s default wait_timeout is 8 hours; setting pool_recycle to less than that ensures SQLAlchemy refreshes connections before the database drops them.

Bulk operations

ORM-style inserts are slow for large datasets because each object goes through the full identity map:

# Slow: 1000 individual INSERTs
for data in dataset:
    db.session.add(User(**data))
db.session.commit()

# Fast: bulk INSERT with Core
db.session.execute(
    User.__table__.insert(),
    [{'email': d['email'], 'name': d['name']} for d in dataset]
)
db.session.commit()

# Middle ground: ORM bulk save (skips some ORM overhead)
db.session.bulk_save_objects([User(**d) for d in dataset])
db.session.commit()

Performance comparison for 10,000 rows (typical PostgreSQL):

  • Individual adds: ~8 seconds
  • bulk_save_objects: ~1.5 seconds
  • Core insert(): ~0.3 seconds
  • Core insert() with executemany_mode='values_plus_batch': ~0.1 seconds

The tradeoff: Core inserts skip ORM features (defaults, events, relationships). Use ORM for single objects and small batches; Core for bulk data loading.

Polymorphic models

Single table inheritance maps multiple Python classes to one database table:

class Notification(db.Model):
    __tablename__ = 'notifications'
    id: Mapped[int] = mapped_column(primary_key=True)
    type: Mapped[str] = mapped_column(String(50))
    message: Mapped[str]
    
    __mapper_args__ = {
        'polymorphic_on': type,
        'polymorphic_identity': 'notification'
    }

class EmailNotification(Notification):
    email_address: Mapped[str | None] = mapped_column(String(255))
    
    __mapper_args__ = {
        'polymorphic_identity': 'email'
    }

class SMSNotification(Notification):
    phone_number: Mapped[str | None] = mapped_column(String(20))
    
    __mapper_args__ = {
        'polymorphic_identity': 'sms'
    }

Querying Notification.query.all() returns a mix of EmailNotification and SMSNotification objects, automatically instantiated based on the type column. This avoids JOIN overhead of joined table inheritance while keeping a clean Python API.

Tradeoff: Subclass-specific columns must be nullable (since other subclasses don’t use them), wasting space and losing database-level NOT NULL constraints.

Soft delete pattern

Instead of deleting rows, mark them as deleted:

class SoftDeleteMixin:
    deleted_at: Mapped[datetime | None] = mapped_column(default=None)
    
    @classmethod
    def query_active(cls):
        return cls.query.filter(cls.deleted_at.is_(None))
    
    def soft_delete(self):
        self.deleted_at = datetime.now(timezone.utc)

For automatic filtering, use SQLAlchemy events:

from sqlalchemy import event

@event.listens_for(db.session, 'do_orm_execute')
def filter_soft_deleted(execute_state):
    if execute_state.is_select:
        execute_state.statement = execute_state.statement.options(
            with_loader_criteria(SoftDeleteMixin, 
                lambda cls: cls.deleted_at.is_(None))
        )

This transparently excludes soft-deleted records from all queries unless explicitly overridden.

Optimistic locking

Prevent concurrent updates from overwriting each other:

class Product(db.Model):
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    stock: Mapped[int]
    version_id: Mapped[int] = mapped_column(default=1)
    
    __mapper_args__ = {
        'version_id_col': version_id
    }

SQLAlchemy automatically includes WHERE version_id = <expected> in UPDATE statements. If another process changed the row, the UPDATE affects zero rows and SQLAlchemy raises StaleDataError. Your code catches this and retries or reports a conflict.

Repository pattern with Flask-SQLAlchemy

Abstracting database access behind repositories keeps view functions clean and makes testing easier:

class UserRepository:
    def __init__(self, session):
        self.session = session
    
    def get_by_id(self, user_id: int) -> User | None:
        return self.session.get(User, user_id)
    
    def get_by_email(self, email: str) -> User | None:
        return self.session.execute(
            select(User).filter_by(email=email)
        ).scalar_one_or_none()
    
    def list_active(self, page: int = 1, per_page: int = 20):
        stmt = (
            select(User)
            .filter_by(is_active=True)
            .order_by(User.created_at.desc())
            .offset((page - 1) * per_page)
            .limit(per_page)
        )
        return self.session.execute(stmt).scalars().all()
    
    def save(self, user: User) -> User:
        self.session.add(user)
        self.session.flush()  # Get ID without committing
        return user

Views inject the repository (or create it with db.session), and tests can substitute a fake repository that returns predetermined objects.

Migration strategies with Alembic

Flask-Migrate wraps Alembic for database migrations. Production-safe patterns:

Non-locking column additions

def upgrade():
    # PostgreSQL: ADD COLUMN with default doesn't lock the table (PG 11+)
    op.add_column('users', sa.Column('role', sa.String(50), 
                                      server_default='member'))

Data migrations separate from schema

def upgrade():
    # Schema change
    op.add_column('users', sa.Column('full_name', sa.String(255)))
    
    # Data migration in a separate step
    connection = op.get_bind()
    connection.execute(
        text("UPDATE users SET full_name = first_name || ' ' || last_name")
    )

Safe column removal (three-phase)

  1. Deploy code that stops reading the column — Remove all references in application code
  2. Run migration to drop the columnop.drop_column('users', 'legacy_field')
  3. Deploy cleanup — Remove the migration’s reverse operation if you don’t need rollback

Dropping a column while code still reads it causes 500 errors during deployment. The three-phase approach eliminates that window.

Query debugging

Enable query recording to catch performance issues during development:

app.config['SQLALCHEMY_RECORD_QUERIES'] = True

@app.after_request
def log_slow_queries(response):
    for query in get_recorded_queries():
        if query.duration >= 0.5:  # 500ms threshold
            app.logger.warning(
                f'Slow query ({query.duration:.2f}s): {query.statement}'
            )
    return response

For production, use SQLAlchemy’s event system to emit metrics:

from sqlalchemy import event
import time

@event.listens_for(db.engine, 'before_cursor_execute')
def before_cursor(conn, cursor, statement, parameters, context, executemany):
    conn.info['query_start'] = time.monotonic()

@event.listens_for(db.engine, 'after_cursor_execute')
def after_cursor(conn, cursor, statement, parameters, context, executemany):
    elapsed = time.monotonic() - conn.info['query_start']
    metrics.histogram('db.query.duration', elapsed)

One thing to remember: Flask-SQLAlchemy’s session is request-scoped and auto-cleaned. Every performance and correctness pattern flows from this: eager load to minimize queries within the request window, commit explicitly because auto-commit doesn’t exist, and use connection pooling to survive between requests.

pythonflaskdatabaseorm

See Also