Python Distributed Locks — Deep Dive

Redis Lock Internals

The redis-py lock uses three Redis operations:

Acquire

SET lock_key <unique_token> NX EX <timeout>

NX ensures only one client sets the key. EX sets the timeout. The unique token (usually a UUID) ensures only the holder can release the lock.

Release (Lua Script)

if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end

The Lua script makes the get-and-delete atomic. Without it, there’s a race condition:

  1. Process A checks the value (matches)
  2. Lock expires
  3. Process B acquires the lock
  4. Process A deletes Process B’s lock

Extend (Lua Script)

if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
    return 0
end

Extension resets the TTL only if you still hold the lock. This is critical for operations that run longer than expected.

Lock Extension Pattern

For variable-duration operations, extend the lock in a background thread:

import redis
import threading
import time

class AutoExtendingLock:
    def __init__(self, redis_client, name, timeout=30,
                 extend_interval=None):
        self.r = redis_client
        self.lock = redis_client.lock(name, timeout=timeout)
        self.timeout = timeout
        self.extend_interval = extend_interval or timeout / 3
        self._extending = False
        self._extend_thread = None

    def __enter__(self):
        self.lock.acquire()
        self._extending = True
        self._extend_thread = threading.Thread(
            target=self._extend_loop, daemon=True
        )
        self._extend_thread.start()
        return self

    def __exit__(self, *args):
        self._extending = False
        if self._extend_thread:
            self._extend_thread.join(timeout=5)
        try:
            self.lock.release()
        except redis.exceptions.LockNotOwnedError:
            pass  # Lock already expired

    def _extend_loop(self):
        while self._extending:
            time.sleep(self.extend_interval)
            if self._extending:
                try:
                    self.lock.extend(self.timeout)
                except redis.exceptions.LockNotOwnedError:
                    break  # Lost the lock

# Usage
r = redis.Redis()
with AutoExtendingLock(r, 'long-running-job', timeout=30):
    # Lock auto-extends every 10 seconds
    do_long_operation()

Async Version

import asyncio

class AsyncAutoExtendingLock:
    def __init__(self, redis_client, name, timeout=30):
        self.r = redis_client
        self.lock = redis_client.lock(name, timeout=timeout)
        self.timeout = timeout
        self._extend_task = None

    async def __aenter__(self):
        await self.lock.acquire()
        self._extend_task = asyncio.create_task(
            self._extend_loop()
        )
        return self

    async def __aexit__(self, *args):
        if self._extend_task:
            self._extend_task.cancel()
            try:
                await self._extend_task
            except asyncio.CancelledError:
                pass
        try:
            await self.lock.release()
        except Exception:
            pass

    async def _extend_loop(self):
        while True:
            await asyncio.sleep(self.timeout / 3)
            try:
                await self.lock.extend(self.timeout)
            except Exception:
                break

The Redlock Algorithm

Single-instance Redis locks fail if Redis goes down. Redlock uses N independent Redis instances (typically 5) and requires a majority quorum:

import redis
import uuid
import time

class Redlock:
    def __init__(self, redis_instances, retry_count=3,
                 retry_delay=0.2):
        self.instances = redis_instances
        self.retry_count = retry_count
        self.retry_delay = retry_delay
        self.quorum = len(redis_instances) // 2 + 1

    def acquire(self, resource, ttl_ms):
        token = str(uuid.uuid4())

        for _ in range(self.retry_count):
            acquired = 0
            start = time.monotonic()

            for instance in self.instances:
                try:
                    if instance.set(
                        resource, token, nx=True,
                        px=ttl_ms
                    ):
                        acquired += 1
                except redis.ConnectionError:
                    continue

            elapsed_ms = (time.monotonic() - start) * 1000
            remaining_ttl = ttl_ms - elapsed_ms

            if acquired >= self.quorum and remaining_ttl > 0:
                return {
                    'resource': resource,
                    'token': token,
                    'valid_until': time.monotonic() +
                                   remaining_ttl / 1000,
                }

            # Failed — release any locks acquired
            self._release_all(resource, token)
            time.sleep(self.retry_delay)

        return None  # Could not acquire

    def release(self, lock_info):
        self._release_all(
            lock_info['resource'], lock_info['token']
        )

    def _release_all(self, resource, token):
        release_script = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        end
        return 0
        """
        for instance in self.instances:
            try:
                instance.eval(release_script, 1,
                             resource, token)
            except redis.ConnectionError:
                continue

Redlock Controversy

Martin Kleppmann’s critique of Redlock argues that it provides neither safety nor liveness guarantees under real-world conditions (clock skew, process pauses, network delays). The counter-argument from Redis’s Salvatore Sanfilippo is that Redlock is “good enough” for practical purposes.

The pragmatic stance: use Redlock if you need better availability than single-instance Redis locks but don’t need the guarantees of a proper consensus system. For true safety, use etcd or ZooKeeper.

Fencing Tokens Implementation

import redis

class FencedLock:
    def __init__(self, redis_client, name, timeout=30):
        self.r = redis_client
        self.name = name
        self.timeout = timeout
        self.fence_key = f"{name}:fence"

    def acquire(self):
        # Atomically increment fence counter and set lock
        script = """
        local fence = redis.call('INCR', KEYS[2])
        local acquired = redis.call(
            'SET', KEYS[1], fence, 'NX', 'EX', ARGV[1]
        )
        if acquired then
            return fence
        else
            redis.call('DECR', KEYS[2])
            return 0
        end
        """
        token = self.r.eval(
            script, 2, self.name, self.fence_key,
            self.timeout
        )
        return int(token) if token else None

    def release(self, token):
        script = """
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('DEL', KEYS[1])
        end
        return 0
        """
        self.r.eval(script, 1, self.name, str(token))

# Protected resource checks fence token
class FencedResource:
    def __init__(self):
        self.last_fence = 0

    def write(self, data, fence_token):
        if fence_token <= self.last_fence:
            raise StaleTokenError(
                f"Token {fence_token} <= "
                f"last seen {self.last_fence}"
            )
        self.last_fence = fence_token
        # Perform the write
        do_write(data)

PostgreSQL Advisory Locks — Advanced

Session-Level vs Transaction-Level

# Session-level: held until explicitly released or
# connection closes
session.execute(text("SELECT pg_advisory_lock(12345)"))
# ... work ...
session.execute(text("SELECT pg_advisory_unlock(12345)"))

# Transaction-level: automatically released at commit/rollback
session.execute(text("SELECT pg_advisory_xact_lock(12345)"))
# ... work within transaction ...
session.commit()  # Lock released automatically

Try-Lock (Non-Blocking)

result = session.execute(text(
    "SELECT pg_try_advisory_lock(12345)"
)).scalar()

if result:
    # Acquired the lock
    try:
        process()
    finally:
        session.execute(text(
            "SELECT pg_advisory_unlock(12345)"
        ))
else:
    # Lock is held by someone else
    pass

Using Two-Part Keys

PostgreSQL advisory locks accept two int4 values, allowing you to namespace locks:

# Lock by (resource_type, resource_id)
RESOURCE_ORDER = 1
RESOURCE_USER = 2

def lock_order(session, order_id):
    session.execute(text(
        f"SELECT pg_advisory_lock({RESOURCE_ORDER}, "
        f"{order_id})"
    ))

etcd Locks for Strong Consistency

When you need linearizable locks (true mutual exclusion under all conditions):

import etcd3

client = etcd3.client()

# etcd lease-based lock
lock = client.lock('my-resource', ttl=30)

if lock.acquire(timeout=10):
    try:
        # Guaranteed mutual exclusion via Raft consensus
        process_resource()
    finally:
        lock.release()

etcd locks are built on the Raft consensus protocol, providing stronger guarantees than Redis but with higher latency (~10-50ms for acquire vs ~1ms for Redis).

Performance Comparison

Lock TypeAcquire LatencyThroughputSafety
Redis single-instance~1ms~100K/sBest-effort
Redlock (5 instances)~5-20ms~10K/sImproved
PostgreSQL advisory~2-5ms~50K/sSession-bound
etcd~10-50ms~5K/sLinearizable
ZooKeeper~10-50ms~5K/sLinearizable

Production Checklist

  1. Always set TTL — no TTL = potential permanent deadlock
  2. Use unique tokens — prevents accidental release of someone else’s lock
  3. Handle lock-not-owned errors — your lock might expire before you release it
  4. Monitor lock contention — track acquire wait times and failure rates
  5. Log lock operations — who acquired, how long held, timeouts
  6. Test failure scenarios — what happens when a lock holder crashes?
  7. Have a manual override — ability to force-release stuck locks in production

One thing to remember: The fundamental tradeoff in distributed locking is between performance and safety. Redis locks are fast but can fail under network partitions. Consensus-based locks (etcd, ZooKeeper) are safe but slow. Pick based on what happens if two processes enter the critical section simultaneously — if it’s “slightly wrong data,” use Redis. If it’s “lost money,” use consensus.

pythondistributed-systemsconcurrency

See Also