Database Transactions — Deep Dive

Transaction lifecycle in SQLAlchemy

Session transaction states

SQLAlchemy’s session tracks a transaction state machine:

ACTIVE → (flush) → ACTIVE → (commit) → COMMITTED
   ↓                   ↓
(error)            (rollback)
   ↓                   ↓
INVALID            ROLLED_BACK

Understanding these states prevents subtle bugs:

from sqlalchemy.orm import Session

session = Session(engine)

# Transaction begins implicitly on first operation
session.add(Order(customer_id=1))
# State: ACTIVE, changes in identity map but not flushed to DB

session.flush()
# State: ACTIVE, SQL INSERT sent but transaction still open
# Other sessions can't see the new order yet

session.commit()
# State: COMMITTED, transaction closed, changes visible to others

Autoflush behavior

SQLAlchemy flushes (sends pending SQL) automatically before queries to ensure you read your own writes:

session.add(Order(customer_id=1, total=50.0))
# No SQL executed yet

# This triggers autoflush before SELECT
orders = session.scalars(
    select(Order).where(Order.customer_id == 1)
).all()
# The new order appears in results

This is usually helpful, but can cause unexpected errors if a pending object has constraint violations. Disable it when needed:

with Session(engine, autoflush=False) as session:
    session.add(incomplete_object)  # won't flush until explicit commit/flush

Isolation level configuration

Per-engine isolation

from sqlalchemy import create_engine

engine = create_engine(
    "postgresql+psycopg2://user:pass@localhost/db",
    isolation_level="REPEATABLE READ",
)

Per-transaction isolation

For specific operations that need stricter isolation:

with engine.connect().execution_options(
    isolation_level="SERIALIZABLE"
) as conn:
    # This connection runs at SERIALIZABLE
    result = conn.execute(text("SELECT balance FROM accounts WHERE id = 1"))
    balance = result.scalar()
    conn.execute(
        text("UPDATE accounts SET balance = :new WHERE id = 1"),
        {"new": balance - 100},
    )
    conn.commit()

PostgreSQL SERIALIZABLE in practice

PostgreSQL’s SERIALIZABLE uses Serializable Snapshot Isolation (SSI), which is optimistic — it allows concurrent transactions to proceed and aborts one if a conflict is detected:

from sqlalchemy.exc import OperationalError

def transfer_funds(from_id, to_id, amount, max_retries=3):
    for attempt in range(max_retries):
        try:
            with Session(engine) as session:
                with session.begin():
                    session.connection(
                        execution_options={"isolation_level": "SERIALIZABLE"}
                    )
                    from_acct = session.get(Account, from_id)
                    to_acct = session.get(Account, to_id)

                    if from_acct.balance < amount:
                        raise InsufficientFunds()

                    from_acct.balance -= amount
                    to_acct.balance += amount
            return  # success
        except OperationalError as e:
            if "could not serialize access" in str(e):
                continue  # retry on serialization failure
            raise
    raise MaxRetriesExceeded()

The retry loop is essential — SERIALIZABLE transactions must be prepared for rollback and retry.

Explicit locking strategies

SELECT FOR UPDATE

Pessimistic locking prevents concurrent modifications by locking rows when they’re read:

from sqlalchemy import select

with Session(engine) as session:
    with session.begin():
        # Lock the inventory row — other transactions block until we commit
        stmt = (
            select(InventoryItem)
            .where(InventoryItem.product_id == 5)
            .with_for_update()
        )
        item = session.scalars(stmt).one()

        if item.quantity < 1:
            raise OutOfStock()

        item.quantity -= 1
        # Lock released on commit

Variants:

  • .with_for_update(nowait=True) — raises error instead of blocking
  • .with_for_update(skip_locked=True) — skips locked rows (useful for job queues)
  • .with_for_update(of=InventoryItem) — locks only specific table in a JOIN

Django locking

from django.db import transaction

with transaction.atomic():
    item = (
        InventoryItem.objects
        .select_for_update()
        .get(product_id=5)
    )
    if item.quantity < 1:
        raise OutOfStock()
    item.quantity -= 1
    item.save()

Advisory locks for application-level locking

PostgreSQL advisory locks provide application-defined locking without row contention:

from sqlalchemy import text

with Session(engine) as session:
    with session.begin():
        # Acquire advisory lock (blocks until available)
        session.execute(text("SELECT pg_advisory_xact_lock(:key)"), {"key": 12345})

        # Critical section — only one transaction holds this lock
        process_daily_report()
    # Lock released on commit

Savepoints and nested transactions

SQLAlchemy savepoints

with Session(engine) as session:
    with session.begin():
        order = Order(customer_id=1, total=0)
        session.add(order)
        session.flush()  # order gets an ID

        total = 0
        for item_data in cart_items:
            nested = session.begin_nested()  # SAVEPOINT
            try:
                item = create_order_item(session, order.id, item_data)
                total += item.price
                nested.commit()  # RELEASE SAVEPOINT
            except IntegrityError:
                nested.rollback()  # ROLLBACK TO SAVEPOINT
                # skip invalid items, continue with rest

        order.total = total
    # outer COMMIT

Django savepoints

from django.db import transaction

with transaction.atomic():  # outer transaction
    order = Order.objects.create(customer_id=1)

    for item_data in cart_items:
        try:
            with transaction.atomic():  # savepoint
                OrderItem.objects.create(order=order, **item_data)
        except IntegrityError:
            pass  # savepoint rolled back, outer transaction continues

Deadlock detection and handling

Deadlocks occur when two transactions each hold a lock the other needs:

Transaction A: locks row 1, wants row 2
Transaction B: locks row 2, wants row 1

PostgreSQL detects deadlocks within ~1 second and aborts one transaction. Your code must handle this:

from sqlalchemy.exc import OperationalError
import time

def safe_update(session, updates, max_retries=3):
    for attempt in range(max_retries):
        try:
            with session.begin():
                for item_id, new_qty in sorted(updates):  # consistent order!
                    stmt = (
                        select(InventoryItem)
                        .where(InventoryItem.id == item_id)
                        .with_for_update()
                    )
                    item = session.scalars(stmt).one()
                    item.quantity = new_qty
            return
        except OperationalError as e:
            if "deadlock detected" in str(e):
                time.sleep(0.1 * (2 ** attempt))  # exponential backoff
                continue
            raise
    raise DeadlockRetryExhausted()

The sorted(updates) is critical — acquiring locks in a consistent order prevents most deadlocks.

Long-running transactions

The problem

Transactions that run for minutes or hours cause:

  • Lock contention (blocking other transactions)
  • Connection pool exhaustion
  • Increased risk of deadlocks
  • MVCC bloat in PostgreSQL (dead tuples can’t be vacuumed)

Batch processing pattern

Instead of one massive transaction, break work into chunks:

def process_large_batch(items, batch_size=1000):
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        with Session(engine) as session:
            with session.begin():
                for item in batch:
                    process_item(session, item)
        # each batch is a separate transaction

Idempotent processing for crash recovery

If the process crashes mid-way through batches, you need to know which batches completed:

def process_with_checkpoints(items, batch_size=1000):
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_ids = [item.id for item in batch]

        with Session(engine) as session:
            with session.begin():
                # Skip already-processed items
                already_done = session.scalars(
                    select(ProcessedItem.source_id).where(
                        ProcessedItem.source_id.in_(batch_ids)
                    )
                ).all()
                remaining = [
                    item for item in batch if item.id not in set(already_done)
                ]

                for item in remaining:
                    result = process_item(session, item)
                    session.add(ProcessedItem(source_id=item.id))

Two-phase commit (distributed transactions)

When a transaction spans multiple databases:

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

engine_orders = create_engine("postgresql://localhost/orders")
engine_inventory = create_engine("postgresql://localhost/inventory")

# SQLAlchemy supports two-phase via session binding
session = Session()
session.configure(binds={Order: engine_orders, InventoryItem: engine_inventory})

with session.begin_twophase():
    session.add(Order(customer_id=1))
    item = session.get(InventoryItem, 5)
    item.quantity -= 1
# both databases commit or both rollback

In practice, two-phase commit is fragile and slow. Most modern systems prefer the saga pattern — a sequence of local transactions with compensating actions for rollback:

async def checkout_saga(order_data):
    order = await create_order(order_data)       # step 1
    try:
        await reserve_inventory(order)            # step 2
    except InventoryError:
        await cancel_order(order)                 # compensate step 1
        raise

    try:
        await charge_payment(order)               # step 3
    except PaymentError:
        await release_inventory(order)            # compensate step 2
        await cancel_order(order)                 # compensate step 1
        raise

One thing to remember: Transactions are powerful but have real costs — lock duration, connection hold time, and deadlock risk all scale with transaction length and concurrency. Keep transactions short, acquire locks in consistent order, always handle serialization and deadlock errors with retries, and prefer saga patterns over distributed transactions.

pythondatabasesbackend

See Also

  • Python Aioredis Understand Aioredis through a practical analogy so your Python decisions become faster and clearer.
  • Python Alembic Understand Alembic through a practical analogy so your Python decisions become faster and clearer.
  • Python Asyncpg Database asyncpg is the fastest way for Python to talk to PostgreSQL without making your program sit around waiting.
  • Python Asyncpg Understand Asyncpg through a practical analogy so your Python decisions become faster and clearer.
  • Python Cassandra Python Understand Cassandra Python through a practical analogy so your Python decisions become faster and clearer.