Database Transactions — Deep Dive
Transaction lifecycle in SQLAlchemy
Session transaction states
SQLAlchemy’s session tracks a transaction state machine:
ACTIVE → (flush) → ACTIVE → (commit) → COMMITTED
↓ ↓
(error) (rollback)
↓ ↓
INVALID ROLLED_BACK
Understanding these states prevents subtle bugs:
from sqlalchemy.orm import Session
session = Session(engine)
# Transaction begins implicitly on first operation
session.add(Order(customer_id=1))
# State: ACTIVE, changes in identity map but not flushed to DB
session.flush()
# State: ACTIVE, SQL INSERT sent but transaction still open
# Other sessions can't see the new order yet
session.commit()
# State: COMMITTED, transaction closed, changes visible to others
Autoflush behavior
SQLAlchemy flushes (sends pending SQL) automatically before queries to ensure you read your own writes:
session.add(Order(customer_id=1, total=50.0))
# No SQL executed yet
# This triggers autoflush before SELECT
orders = session.scalars(
select(Order).where(Order.customer_id == 1)
).all()
# The new order appears in results
This is usually helpful, but can cause unexpected errors if a pending object has constraint violations. Disable it when needed:
with Session(engine, autoflush=False) as session:
session.add(incomplete_object) # won't flush until explicit commit/flush
Isolation level configuration
Per-engine isolation
from sqlalchemy import create_engine
engine = create_engine(
"postgresql+psycopg2://user:pass@localhost/db",
isolation_level="REPEATABLE READ",
)
Per-transaction isolation
For specific operations that need stricter isolation:
with engine.connect().execution_options(
isolation_level="SERIALIZABLE"
) as conn:
# This connection runs at SERIALIZABLE
result = conn.execute(text("SELECT balance FROM accounts WHERE id = 1"))
balance = result.scalar()
conn.execute(
text("UPDATE accounts SET balance = :new WHERE id = 1"),
{"new": balance - 100},
)
conn.commit()
PostgreSQL SERIALIZABLE in practice
PostgreSQL’s SERIALIZABLE uses Serializable Snapshot Isolation (SSI), which is optimistic — it allows concurrent transactions to proceed and aborts one if a conflict is detected:
from sqlalchemy.exc import OperationalError
def transfer_funds(from_id, to_id, amount, max_retries=3):
for attempt in range(max_retries):
try:
with Session(engine) as session:
with session.begin():
session.connection(
execution_options={"isolation_level": "SERIALIZABLE"}
)
from_acct = session.get(Account, from_id)
to_acct = session.get(Account, to_id)
if from_acct.balance < amount:
raise InsufficientFunds()
from_acct.balance -= amount
to_acct.balance += amount
return # success
except OperationalError as e:
if "could not serialize access" in str(e):
continue # retry on serialization failure
raise
raise MaxRetriesExceeded()
The retry loop is essential — SERIALIZABLE transactions must be prepared for rollback and retry.
Explicit locking strategies
SELECT FOR UPDATE
Pessimistic locking prevents concurrent modifications by locking rows when they’re read:
from sqlalchemy import select
with Session(engine) as session:
with session.begin():
# Lock the inventory row — other transactions block until we commit
stmt = (
select(InventoryItem)
.where(InventoryItem.product_id == 5)
.with_for_update()
)
item = session.scalars(stmt).one()
if item.quantity < 1:
raise OutOfStock()
item.quantity -= 1
# Lock released on commit
Variants:
.with_for_update(nowait=True)— raises error instead of blocking.with_for_update(skip_locked=True)— skips locked rows (useful for job queues).with_for_update(of=InventoryItem)— locks only specific table in a JOIN
Django locking
from django.db import transaction
with transaction.atomic():
item = (
InventoryItem.objects
.select_for_update()
.get(product_id=5)
)
if item.quantity < 1:
raise OutOfStock()
item.quantity -= 1
item.save()
Advisory locks for application-level locking
PostgreSQL advisory locks provide application-defined locking without row contention:
from sqlalchemy import text
with Session(engine) as session:
with session.begin():
# Acquire advisory lock (blocks until available)
session.execute(text("SELECT pg_advisory_xact_lock(:key)"), {"key": 12345})
# Critical section — only one transaction holds this lock
process_daily_report()
# Lock released on commit
Savepoints and nested transactions
SQLAlchemy savepoints
with Session(engine) as session:
with session.begin():
order = Order(customer_id=1, total=0)
session.add(order)
session.flush() # order gets an ID
total = 0
for item_data in cart_items:
nested = session.begin_nested() # SAVEPOINT
try:
item = create_order_item(session, order.id, item_data)
total += item.price
nested.commit() # RELEASE SAVEPOINT
except IntegrityError:
nested.rollback() # ROLLBACK TO SAVEPOINT
# skip invalid items, continue with rest
order.total = total
# outer COMMIT
Django savepoints
from django.db import transaction
with transaction.atomic(): # outer transaction
order = Order.objects.create(customer_id=1)
for item_data in cart_items:
try:
with transaction.atomic(): # savepoint
OrderItem.objects.create(order=order, **item_data)
except IntegrityError:
pass # savepoint rolled back, outer transaction continues
Deadlock detection and handling
Deadlocks occur when two transactions each hold a lock the other needs:
Transaction A: locks row 1, wants row 2
Transaction B: locks row 2, wants row 1
PostgreSQL detects deadlocks within ~1 second and aborts one transaction. Your code must handle this:
from sqlalchemy.exc import OperationalError
import time
def safe_update(session, updates, max_retries=3):
for attempt in range(max_retries):
try:
with session.begin():
for item_id, new_qty in sorted(updates): # consistent order!
stmt = (
select(InventoryItem)
.where(InventoryItem.id == item_id)
.with_for_update()
)
item = session.scalars(stmt).one()
item.quantity = new_qty
return
except OperationalError as e:
if "deadlock detected" in str(e):
time.sleep(0.1 * (2 ** attempt)) # exponential backoff
continue
raise
raise DeadlockRetryExhausted()
The sorted(updates) is critical — acquiring locks in a consistent order prevents most deadlocks.
Long-running transactions
The problem
Transactions that run for minutes or hours cause:
- Lock contention (blocking other transactions)
- Connection pool exhaustion
- Increased risk of deadlocks
- MVCC bloat in PostgreSQL (dead tuples can’t be vacuumed)
Batch processing pattern
Instead of one massive transaction, break work into chunks:
def process_large_batch(items, batch_size=1000):
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
with Session(engine) as session:
with session.begin():
for item in batch:
process_item(session, item)
# each batch is a separate transaction
Idempotent processing for crash recovery
If the process crashes mid-way through batches, you need to know which batches completed:
def process_with_checkpoints(items, batch_size=1000):
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_ids = [item.id for item in batch]
with Session(engine) as session:
with session.begin():
# Skip already-processed items
already_done = session.scalars(
select(ProcessedItem.source_id).where(
ProcessedItem.source_id.in_(batch_ids)
)
).all()
remaining = [
item for item in batch if item.id not in set(already_done)
]
for item in remaining:
result = process_item(session, item)
session.add(ProcessedItem(source_id=item.id))
Two-phase commit (distributed transactions)
When a transaction spans multiple databases:
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
engine_orders = create_engine("postgresql://localhost/orders")
engine_inventory = create_engine("postgresql://localhost/inventory")
# SQLAlchemy supports two-phase via session binding
session = Session()
session.configure(binds={Order: engine_orders, InventoryItem: engine_inventory})
with session.begin_twophase():
session.add(Order(customer_id=1))
item = session.get(InventoryItem, 5)
item.quantity -= 1
# both databases commit or both rollback
In practice, two-phase commit is fragile and slow. Most modern systems prefer the saga pattern — a sequence of local transactions with compensating actions for rollback:
async def checkout_saga(order_data):
order = await create_order(order_data) # step 1
try:
await reserve_inventory(order) # step 2
except InventoryError:
await cancel_order(order) # compensate step 1
raise
try:
await charge_payment(order) # step 3
except PaymentError:
await release_inventory(order) # compensate step 2
await cancel_order(order) # compensate step 1
raise
One thing to remember: Transactions are powerful but have real costs — lock duration, connection hold time, and deadlock risk all scale with transaction length and concurrency. Keep transactions short, acquire locks in consistent order, always handle serialization and deadlock errors with retries, and prefer saga patterns over distributed transactions.
See Also
- Python Aioredis Understand Aioredis through a practical analogy so your Python decisions become faster and clearer.
- Python Alembic Understand Alembic through a practical analogy so your Python decisions become faster and clearer.
- Python Asyncpg Database asyncpg is the fastest way for Python to talk to PostgreSQL without making your program sit around waiting.
- Python Asyncpg Understand Asyncpg through a practical analogy so your Python decisions become faster and clearer.
- Python Cassandra Python Understand Cassandra Python through a practical analogy so your Python decisions become faster and clearer.