Optimistic Locking — Deep Dive

SQLAlchemy’s built-in version counter

SQLAlchemy provides native optimistic locking through the version_id_col mapper argument:

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Product(Base):
    __tablename__ = "products"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(200))
    price: Mapped[float] = mapped_column(Numeric(10, 2))
    quantity: Mapped[int] = mapped_column(default=0)
    version_id: Mapped[int] = mapped_column(default=1)

    __mapper_args__ = {
        "version_id_col": version_id,
    }

With this configuration, every UPDATE automatically includes the version check:

with Session(engine) as session:
    product = session.get(Product, 1)
    # product.version_id == 3

    product.price = 29.99
    session.commit()
    # Executes: UPDATE products SET price=29.99, version_id=4
    #           WHERE id=1 AND version_id=3

If another transaction modified the product between the read and the commit, SQLAlchemy raises StaleDataError:

from sqlalchemy.orm.exc import StaleDataError

try:
    session.commit()
except StaleDataError:
    session.rollback()
    # handle conflict: retry or notify user

Custom version generators

You can use timestamps or UUIDs instead of integers:

import uuid

class Document(Base):
    __tablename__ = "documents"

    id: Mapped[int] = mapped_column(primary_key=True)
    content: Mapped[str] = mapped_column(Text)
    version_uuid: Mapped[str] = mapped_column(
        String(36), default=lambda: str(uuid.uuid4())
    )

    __mapper_args__ = {
        "version_id_col": version_uuid,
        "version_id_generator": lambda version: str(uuid.uuid4()),
    }

UUIDs make it impossible for clients to predict the next version, which prevents a class of race-condition exploits.

Django implementation

Django doesn’t have built-in optimistic locking, but the pattern is straightforward:

Manual version checking

from django.db import models

class Product(models.Model):
    name = models.CharField(max_length=200)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    quantity = models.IntegerField(default=0)
    version = models.IntegerField(default=1)

    def save_optimistic(self, **kwargs):
        updated = Product.objects.filter(
            id=self.id,
            version=self.version,  # only update if version matches
        ).update(
            name=self.name,
            price=self.price,
            quantity=self.quantity,
            version=self.version + 1,
        )
        if updated == 0:
            raise StaleObjectError(
                f"Product {self.id} was modified by another transaction"
            )
        self.version += 1

Mixin for reusability

class OptimisticLockMixin(models.Model):
    version = models.IntegerField(default=1)

    class Meta:
        abstract = True

    class StaleObjectError(Exception):
        pass

    def save(self, *args, **kwargs):
        if not self.pk:
            # New object — normal save
            return super().save(*args, **kwargs)

        # Existing object — optimistic lock check
        cls = self.__class__
        updated = cls.objects.filter(
            pk=self.pk,
            version=self.version,
        ).update(**{
            field.name: getattr(self, field.name)
            for field in self._meta.fields
            if field.name != 'id'
        } | {'version': self.version + 1})

        if updated == 0:
            raise self.StaleObjectError(
                f"{cls.__name__} {self.pk} modified concurrently"
            )
        self.version += 1

class Product(OptimisticLockMixin):
    name = models.CharField(max_length=200)
    price = models.DecimalField(max_digits=10, decimal_places=2)

django-concurrency library

For a battle-tested solution:

from concurrency.fields import IntegerVersionField

class Product(models.Model):
    name = models.CharField(max_length=200)
    version = IntegerVersionField()

This automatically adds version checking to all save() calls and raises RecordModifiedError on conflicts.

REST API integration with ETags

HTTP ETags provide optimistic locking for APIs without exposing internal version numbers:

Generating ETags

import hashlib
from fastapi import FastAPI, Response, Header, HTTPException

app = FastAPI()

def compute_etag(product):
    content = f"{product.id}:{product.version_id}"
    return hashlib.md5(content.encode()).hexdigest()

@app.get("/products/{product_id}")
def get_product(product_id: int, response: Response):
    product = db.get(Product, product_id)
    etag = compute_etag(product)
    response.headers["ETag"] = f'"{etag}"'
    return {"id": product.id, "name": product.name, "price": product.price}

Enforcing conditional updates

@app.put("/products/{product_id}")
def update_product(
    product_id: int,
    data: ProductUpdate,
    if_match: str = Header(...),  # require If-Match header
):
    product = db.get(Product, product_id)
    current_etag = f'"{compute_etag(product)}"'

    if if_match != current_etag:
        raise HTTPException(
            status_code=412,  # Precondition Failed
            detail="Product was modified since last read"
        )

    product.name = data.name
    product.price = data.price
    try:
        db.commit()
    except StaleDataError:
        raise HTTPException(status_code=409, detail="Concurrent modification")

    return {"id": product.id, "etag": compute_etag(product)}

Clients must include If-Match with the ETag they received from GET. If the resource changed, they get a 412 and must re-fetch before retrying.

Retry patterns

Automatic retry for commutative operations

from sqlalchemy.orm.exc import StaleDataError
import time

def increment_view_count(product_id: int, max_retries: int = 5):
    for attempt in range(max_retries):
        try:
            with Session(engine) as session:
                with session.begin():
                    product = session.get(Product, product_id)
                    product.view_count += 1
            return  # success
        except StaleDataError:
            if attempt == max_retries - 1:
                raise
            time.sleep(0.01 * (2 ** attempt))

For simple increments, a database-level atomic operation avoids the issue entirely:

from sqlalchemy import update

stmt = (
    update(Product)
    .where(Product.id == product_id)
    .values(view_count=Product.view_count + 1)
)
session.execute(stmt)

This is a single atomic UPDATE — no read-modify-write cycle, no version conflict possible.

User-facing conflict resolution

For complex updates (editing a document, updating a profile), automatic retry doesn’t work — you need human judgment:

@app.put("/articles/{article_id}")
def update_article(article_id: int, data: ArticleUpdate):
    with Session(engine) as session:
        article = session.get(Article, article_id)

        if data.expected_version != article.version_id:
            # Return both versions so the UI can show a diff
            return JSONResponse(
                status_code=409,
                content={
                    "error": "conflict",
                    "your_version": data.expected_version,
                    "current_version": article.version_id,
                    "current_data": {
                        "title": article.title,
                        "body": article.body,
                    },
                },
            )

        article.title = data.title
        article.body = data.body
        session.commit()

Combining optimistic and pessimistic locking

Some systems use optimistic locking for most operations and switch to pessimistic locking for high-contention paths:

def update_inventory(product_id: int, quantity_change: int):
    with Session(engine) as session:
        with session.begin():
            # Pessimistic lock for inventory (high contention)
            product = session.scalars(
                select(Product)
                .where(Product.id == product_id)
                .with_for_update()
            ).one()

            product.quantity += quantity_change
            # No version check needed — we hold the lock

def update_product_info(product_id: int, name: str, description: str):
    # Optimistic lock for metadata (low contention)
    with Session(engine) as session:
        with session.begin():
            product = session.get(Product, product_id)
            product.name = name
            product.description = description
            # version_id_col handles conflict detection

Testing optimistic locking

import threading

def test_optimistic_lock_detects_conflict():
    # Setup: create a product
    with Session(engine) as session:
        session.add(Product(id=1, name="Widget", price=10.0))
        session.commit()

    conflict_detected = threading.Event()

    def concurrent_update():
        with Session(engine) as session:
            product = session.get(Product, 1)
            time.sleep(0.1)  # simulate slow processing
            product.price = 15.0
            try:
                session.commit()
            except StaleDataError:
                conflict_detected.set()
                session.rollback()

    # Start two threads updating the same product
    t1 = threading.Thread(target=concurrent_update)
    t2 = threading.Thread(target=concurrent_update)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    assert conflict_detected.is_set(), "Conflict should be detected"

One thing to remember: Optimistic locking shines when conflicts are rare. Use SQLAlchemy’s version_id_col for automatic version tracking, expose versions via ETags in REST APIs, implement retry logic for automated operations, and present conflict diffs to users for complex data. Fall back to pessimistic locking only for high-contention resources like inventory counters.

pythondatabasesconcurrency

See Also

  • Python Aioredis Understand Aioredis through a practical analogy so your Python decisions become faster and clearer.
  • Python Alembic Understand Alembic through a practical analogy so your Python decisions become faster and clearer.
  • Python Asyncpg Database asyncpg is the fastest way for Python to talk to PostgreSQL without making your program sit around waiting.
  • Python Asyncpg Understand Asyncpg through a practical analogy so your Python decisions become faster and clearer.
  • Python Cassandra Python Understand Cassandra Python through a practical analogy so your Python decisions become faster and clearer.