Optimistic Locking — Deep Dive
SQLAlchemy’s built-in version counter
SQLAlchemy provides native optimistic locking through the version_id_col mapper argument:
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(200))
price: Mapped[float] = mapped_column(Numeric(10, 2))
quantity: Mapped[int] = mapped_column(default=0)
version_id: Mapped[int] = mapped_column(default=1)
__mapper_args__ = {
"version_id_col": version_id,
}
With this configuration, every UPDATE automatically includes the version check:
with Session(engine) as session:
product = session.get(Product, 1)
# product.version_id == 3
product.price = 29.99
session.commit()
# Executes: UPDATE products SET price=29.99, version_id=4
# WHERE id=1 AND version_id=3
If another transaction modified the product between the read and the commit, SQLAlchemy raises StaleDataError:
from sqlalchemy.orm.exc import StaleDataError
try:
session.commit()
except StaleDataError:
session.rollback()
# handle conflict: retry or notify user
Custom version generators
You can use timestamps or UUIDs instead of integers:
import uuid
class Document(Base):
__tablename__ = "documents"
id: Mapped[int] = mapped_column(primary_key=True)
content: Mapped[str] = mapped_column(Text)
version_uuid: Mapped[str] = mapped_column(
String(36), default=lambda: str(uuid.uuid4())
)
__mapper_args__ = {
"version_id_col": version_uuid,
"version_id_generator": lambda version: str(uuid.uuid4()),
}
UUIDs make it impossible for clients to predict the next version, which prevents a class of race-condition exploits.
Django implementation
Django doesn’t have built-in optimistic locking, but the pattern is straightforward:
Manual version checking
from django.db import models
class Product(models.Model):
name = models.CharField(max_length=200)
price = models.DecimalField(max_digits=10, decimal_places=2)
quantity = models.IntegerField(default=0)
version = models.IntegerField(default=1)
def save_optimistic(self, **kwargs):
updated = Product.objects.filter(
id=self.id,
version=self.version, # only update if version matches
).update(
name=self.name,
price=self.price,
quantity=self.quantity,
version=self.version + 1,
)
if updated == 0:
raise StaleObjectError(
f"Product {self.id} was modified by another transaction"
)
self.version += 1
Mixin for reusability
class OptimisticLockMixin(models.Model):
version = models.IntegerField(default=1)
class Meta:
abstract = True
class StaleObjectError(Exception):
pass
def save(self, *args, **kwargs):
if not self.pk:
# New object — normal save
return super().save(*args, **kwargs)
# Existing object — optimistic lock check
cls = self.__class__
updated = cls.objects.filter(
pk=self.pk,
version=self.version,
).update(**{
field.name: getattr(self, field.name)
for field in self._meta.fields
if field.name != 'id'
} | {'version': self.version + 1})
if updated == 0:
raise self.StaleObjectError(
f"{cls.__name__} {self.pk} modified concurrently"
)
self.version += 1
class Product(OptimisticLockMixin):
name = models.CharField(max_length=200)
price = models.DecimalField(max_digits=10, decimal_places=2)
django-concurrency library
For a battle-tested solution:
from concurrency.fields import IntegerVersionField
class Product(models.Model):
name = models.CharField(max_length=200)
version = IntegerVersionField()
This automatically adds version checking to all save() calls and raises RecordModifiedError on conflicts.
REST API integration with ETags
HTTP ETags provide optimistic locking for APIs without exposing internal version numbers:
Generating ETags
import hashlib
from fastapi import FastAPI, Response, Header, HTTPException
app = FastAPI()
def compute_etag(product):
content = f"{product.id}:{product.version_id}"
return hashlib.md5(content.encode()).hexdigest()
@app.get("/products/{product_id}")
def get_product(product_id: int, response: Response):
product = db.get(Product, product_id)
etag = compute_etag(product)
response.headers["ETag"] = f'"{etag}"'
return {"id": product.id, "name": product.name, "price": product.price}
Enforcing conditional updates
@app.put("/products/{product_id}")
def update_product(
product_id: int,
data: ProductUpdate,
if_match: str = Header(...), # require If-Match header
):
product = db.get(Product, product_id)
current_etag = f'"{compute_etag(product)}"'
if if_match != current_etag:
raise HTTPException(
status_code=412, # Precondition Failed
detail="Product was modified since last read"
)
product.name = data.name
product.price = data.price
try:
db.commit()
except StaleDataError:
raise HTTPException(status_code=409, detail="Concurrent modification")
return {"id": product.id, "etag": compute_etag(product)}
Clients must include If-Match with the ETag they received from GET. If the resource changed, they get a 412 and must re-fetch before retrying.
Retry patterns
Automatic retry for commutative operations
from sqlalchemy.orm.exc import StaleDataError
import time
def increment_view_count(product_id: int, max_retries: int = 5):
for attempt in range(max_retries):
try:
with Session(engine) as session:
with session.begin():
product = session.get(Product, product_id)
product.view_count += 1
return # success
except StaleDataError:
if attempt == max_retries - 1:
raise
time.sleep(0.01 * (2 ** attempt))
For simple increments, a database-level atomic operation avoids the issue entirely:
from sqlalchemy import update
stmt = (
update(Product)
.where(Product.id == product_id)
.values(view_count=Product.view_count + 1)
)
session.execute(stmt)
This is a single atomic UPDATE — no read-modify-write cycle, no version conflict possible.
User-facing conflict resolution
For complex updates (editing a document, updating a profile), automatic retry doesn’t work — you need human judgment:
@app.put("/articles/{article_id}")
def update_article(article_id: int, data: ArticleUpdate):
with Session(engine) as session:
article = session.get(Article, article_id)
if data.expected_version != article.version_id:
# Return both versions so the UI can show a diff
return JSONResponse(
status_code=409,
content={
"error": "conflict",
"your_version": data.expected_version,
"current_version": article.version_id,
"current_data": {
"title": article.title,
"body": article.body,
},
},
)
article.title = data.title
article.body = data.body
session.commit()
Combining optimistic and pessimistic locking
Some systems use optimistic locking for most operations and switch to pessimistic locking for high-contention paths:
def update_inventory(product_id: int, quantity_change: int):
with Session(engine) as session:
with session.begin():
# Pessimistic lock for inventory (high contention)
product = session.scalars(
select(Product)
.where(Product.id == product_id)
.with_for_update()
).one()
product.quantity += quantity_change
# No version check needed — we hold the lock
def update_product_info(product_id: int, name: str, description: str):
# Optimistic lock for metadata (low contention)
with Session(engine) as session:
with session.begin():
product = session.get(Product, product_id)
product.name = name
product.description = description
# version_id_col handles conflict detection
Testing optimistic locking
import threading
def test_optimistic_lock_detects_conflict():
# Setup: create a product
with Session(engine) as session:
session.add(Product(id=1, name="Widget", price=10.0))
session.commit()
conflict_detected = threading.Event()
def concurrent_update():
with Session(engine) as session:
product = session.get(Product, 1)
time.sleep(0.1) # simulate slow processing
product.price = 15.0
try:
session.commit()
except StaleDataError:
conflict_detected.set()
session.rollback()
# Start two threads updating the same product
t1 = threading.Thread(target=concurrent_update)
t2 = threading.Thread(target=concurrent_update)
t1.start()
t2.start()
t1.join()
t2.join()
assert conflict_detected.is_set(), "Conflict should be detected"
One thing to remember: Optimistic locking shines when conflicts are rare. Use SQLAlchemy’s version_id_col for automatic version tracking, expose versions via ETags in REST APIs, implement retry logic for automated operations, and present conflict diffs to users for complex data. Fall back to pessimistic locking only for high-contention resources like inventory counters.
See Also
- Python Aioredis Understand Aioredis through a practical analogy so your Python decisions become faster and clearer.
- Python Alembic Understand Alembic through a practical analogy so your Python decisions become faster and clearer.
- Python Asyncpg Database asyncpg is the fastest way for Python to talk to PostgreSQL without making your program sit around waiting.
- Python Asyncpg Understand Asyncpg through a practical analogy so your Python decisions become faster and clearer.
- Python Cassandra Python Understand Cassandra Python through a practical analogy so your Python decisions become faster and clearer.