Python Distributed Locks — Deep Dive
Redis Lock Internals
The redis-py lock uses three Redis operations:
Acquire
SET lock_key <unique_token> NX EX <timeout>
NX ensures only one client sets the key. EX sets the timeout. The unique token (usually a UUID) ensures only the holder can release the lock.
Release (Lua Script)
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
The Lua script makes the get-and-delete atomic. Without it, there’s a race condition:
- Process A checks the value (matches)
- Lock expires
- Process B acquires the lock
- Process A deletes Process B’s lock
Extend (Lua Script)
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
Extension resets the TTL only if you still hold the lock. This is critical for operations that run longer than expected.
Lock Extension Pattern
For variable-duration operations, extend the lock in a background thread:
import redis
import threading
import time
class AutoExtendingLock:
def __init__(self, redis_client, name, timeout=30,
extend_interval=None):
self.r = redis_client
self.lock = redis_client.lock(name, timeout=timeout)
self.timeout = timeout
self.extend_interval = extend_interval or timeout / 3
self._extending = False
self._extend_thread = None
def __enter__(self):
self.lock.acquire()
self._extending = True
self._extend_thread = threading.Thread(
target=self._extend_loop, daemon=True
)
self._extend_thread.start()
return self
def __exit__(self, *args):
self._extending = False
if self._extend_thread:
self._extend_thread.join(timeout=5)
try:
self.lock.release()
except redis.exceptions.LockNotOwnedError:
pass # Lock already expired
def _extend_loop(self):
while self._extending:
time.sleep(self.extend_interval)
if self._extending:
try:
self.lock.extend(self.timeout)
except redis.exceptions.LockNotOwnedError:
break # Lost the lock
# Usage
r = redis.Redis()
with AutoExtendingLock(r, 'long-running-job', timeout=30):
# Lock auto-extends every 10 seconds
do_long_operation()
Async Version
import asyncio
class AsyncAutoExtendingLock:
def __init__(self, redis_client, name, timeout=30):
self.r = redis_client
self.lock = redis_client.lock(name, timeout=timeout)
self.timeout = timeout
self._extend_task = None
async def __aenter__(self):
await self.lock.acquire()
self._extend_task = asyncio.create_task(
self._extend_loop()
)
return self
async def __aexit__(self, *args):
if self._extend_task:
self._extend_task.cancel()
try:
await self._extend_task
except asyncio.CancelledError:
pass
try:
await self.lock.release()
except Exception:
pass
async def _extend_loop(self):
while True:
await asyncio.sleep(self.timeout / 3)
try:
await self.lock.extend(self.timeout)
except Exception:
break
The Redlock Algorithm
Single-instance Redis locks fail if Redis goes down. Redlock uses N independent Redis instances (typically 5) and requires a majority quorum:
import redis
import uuid
import time
class Redlock:
def __init__(self, redis_instances, retry_count=3,
retry_delay=0.2):
self.instances = redis_instances
self.retry_count = retry_count
self.retry_delay = retry_delay
self.quorum = len(redis_instances) // 2 + 1
def acquire(self, resource, ttl_ms):
token = str(uuid.uuid4())
for _ in range(self.retry_count):
acquired = 0
start = time.monotonic()
for instance in self.instances:
try:
if instance.set(
resource, token, nx=True,
px=ttl_ms
):
acquired += 1
except redis.ConnectionError:
continue
elapsed_ms = (time.monotonic() - start) * 1000
remaining_ttl = ttl_ms - elapsed_ms
if acquired >= self.quorum and remaining_ttl > 0:
return {
'resource': resource,
'token': token,
'valid_until': time.monotonic() +
remaining_ttl / 1000,
}
# Failed — release any locks acquired
self._release_all(resource, token)
time.sleep(self.retry_delay)
return None # Could not acquire
def release(self, lock_info):
self._release_all(
lock_info['resource'], lock_info['token']
)
def _release_all(self, resource, token):
release_script = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
"""
for instance in self.instances:
try:
instance.eval(release_script, 1,
resource, token)
except redis.ConnectionError:
continue
Redlock Controversy
Martin Kleppmann’s critique of Redlock argues that it provides neither safety nor liveness guarantees under real-world conditions (clock skew, process pauses, network delays). The counter-argument from Redis’s Salvatore Sanfilippo is that Redlock is “good enough” for practical purposes.
The pragmatic stance: use Redlock if you need better availability than single-instance Redis locks but don’t need the guarantees of a proper consensus system. For true safety, use etcd or ZooKeeper.
Fencing Tokens Implementation
import redis
class FencedLock:
def __init__(self, redis_client, name, timeout=30):
self.r = redis_client
self.name = name
self.timeout = timeout
self.fence_key = f"{name}:fence"
def acquire(self):
# Atomically increment fence counter and set lock
script = """
local fence = redis.call('INCR', KEYS[2])
local acquired = redis.call(
'SET', KEYS[1], fence, 'NX', 'EX', ARGV[1]
)
if acquired then
return fence
else
redis.call('DECR', KEYS[2])
return 0
end
"""
token = self.r.eval(
script, 2, self.name, self.fence_key,
self.timeout
)
return int(token) if token else None
def release(self, token):
script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"""
self.r.eval(script, 1, self.name, str(token))
# Protected resource checks fence token
class FencedResource:
def __init__(self):
self.last_fence = 0
def write(self, data, fence_token):
if fence_token <= self.last_fence:
raise StaleTokenError(
f"Token {fence_token} <= "
f"last seen {self.last_fence}"
)
self.last_fence = fence_token
# Perform the write
do_write(data)
PostgreSQL Advisory Locks — Advanced
Session-Level vs Transaction-Level
# Session-level: held until explicitly released or
# connection closes
session.execute(text("SELECT pg_advisory_lock(12345)"))
# ... work ...
session.execute(text("SELECT pg_advisory_unlock(12345)"))
# Transaction-level: automatically released at commit/rollback
session.execute(text("SELECT pg_advisory_xact_lock(12345)"))
# ... work within transaction ...
session.commit() # Lock released automatically
Try-Lock (Non-Blocking)
result = session.execute(text(
"SELECT pg_try_advisory_lock(12345)"
)).scalar()
if result:
# Acquired the lock
try:
process()
finally:
session.execute(text(
"SELECT pg_advisory_unlock(12345)"
))
else:
# Lock is held by someone else
pass
Using Two-Part Keys
PostgreSQL advisory locks accept two int4 values, allowing you to namespace locks:
# Lock by (resource_type, resource_id)
RESOURCE_ORDER = 1
RESOURCE_USER = 2
def lock_order(session, order_id):
session.execute(text(
f"SELECT pg_advisory_lock({RESOURCE_ORDER}, "
f"{order_id})"
))
etcd Locks for Strong Consistency
When you need linearizable locks (true mutual exclusion under all conditions):
import etcd3
client = etcd3.client()
# etcd lease-based lock
lock = client.lock('my-resource', ttl=30)
if lock.acquire(timeout=10):
try:
# Guaranteed mutual exclusion via Raft consensus
process_resource()
finally:
lock.release()
etcd locks are built on the Raft consensus protocol, providing stronger guarantees than Redis but with higher latency (~10-50ms for acquire vs ~1ms for Redis).
Performance Comparison
| Lock Type | Acquire Latency | Throughput | Safety |
|---|---|---|---|
| Redis single-instance | ~1ms | ~100K/s | Best-effort |
| Redlock (5 instances) | ~5-20ms | ~10K/s | Improved |
| PostgreSQL advisory | ~2-5ms | ~50K/s | Session-bound |
| etcd | ~10-50ms | ~5K/s | Linearizable |
| ZooKeeper | ~10-50ms | ~5K/s | Linearizable |
Production Checklist
- Always set TTL — no TTL = potential permanent deadlock
- Use unique tokens — prevents accidental release of someone else’s lock
- Handle lock-not-owned errors — your lock might expire before you release it
- Monitor lock contention — track acquire wait times and failure rates
- Log lock operations — who acquired, how long held, timeouts
- Test failure scenarios — what happens when a lock holder crashes?
- Have a manual override — ability to force-release stuck locks in production
One thing to remember: The fundamental tradeoff in distributed locking is between performance and safety. Redis locks are fast but can fail under network partitions. Consensus-based locks (etcd, ZooKeeper) are safe but slow. Pick based on what happens if two processes enter the critical section simultaneously — if it’s “slightly wrong data,” use Redis. If it’s “lost money,” use consensus.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Fan Out Fan In Pattern How Python splits big jobs into small pieces, runs them all at once, then puts the results back together.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.
- Python Priority Queue Patterns Why some tasks cut the line in Python — and how priority queues decide who goes first.