Python Reader-Writer Locks — Deep Dive

Implementing a writer-preference RWLock

A writer-preference lock prevents writer starvation by blocking new readers once a writer is waiting:

import threading

class WriterPreferenceRWLock:
    def __init__(self):
        self._readers = 0
        self._writers_waiting = 0
        self._writer_active = False
        self._lock = threading.Lock()
        self._readers_ok = threading.Condition(self._lock)
        self._writers_ok = threading.Condition(self._lock)

    def acquire_read(self):
        with self._lock:
            while self._writer_active or self._writers_waiting > 0:
                self._readers_ok.wait()
            self._readers += 1

    def release_read(self):
        with self._lock:
            self._readers -= 1
            if self._readers == 0:
                self._writers_ok.notify()

    def acquire_write(self):
        with self._lock:
            self._writers_waiting += 1
            while self._writer_active or self._readers > 0:
                self._writers_ok.wait()
            self._writers_waiting -= 1
            self._writer_active = True

    def release_write(self):
        with self._lock:
            self._writer_active = False
            self._readers_ok.notify_all()
            self._writers_ok.notify()

The key: acquire_read checks self._writers_waiting > 0. If a writer is waiting, new readers queue behind it, preventing starvation.

Context manager wrappers

Production code benefits from context managers to ensure locks are always released:

from contextlib import contextmanager

class RWLock:
    def __init__(self):
        self._impl = WriterPreferenceRWLock()

    @contextmanager
    def read_lock(self):
        self._impl.acquire_read()
        try:
            yield
        finally:
            self._impl.release_read()

    @contextmanager
    def write_lock(self):
        self._impl.acquire_write()
        try:
            yield
        finally:
            self._impl.release_write()


# Usage
cache = {}
rw = RWLock()

def read_config(key):
    with rw.read_lock():
        return cache.get(key)

def update_config(key, value):
    with rw.write_lock():
        cache[key] = value

Re-entrant reader-writer lock

A thread that already holds a read lock trying to acquire another read lock will deadlock with the basic implementation if a writer is waiting in between. Re-entrant RWLocks track per-thread read counts:

import threading
from collections import defaultdict

class ReentrantRWLock:
    def __init__(self):
        self._readers = 0
        self._reader_counts = defaultdict(int)
        self._writers_waiting = 0
        self._writer_active = False
        self._writer_thread = None
        self._writer_reentrant = 0
        self._lock = threading.Lock()
        self._readers_ok = threading.Condition(self._lock)
        self._writers_ok = threading.Condition(self._lock)

    def acquire_read(self):
        tid = threading.current_thread().ident
        with self._lock:
            # Allow re-entrant read if this thread already holds read
            if self._reader_counts[tid] > 0:
                self._reader_counts[tid] += 1
                return
            # Allow read if this thread is the active writer (downgrade)
            if self._writer_thread == tid:
                self._reader_counts[tid] += 1
                self._readers += 1
                return
            while self._writer_active or self._writers_waiting > 0:
                self._readers_ok.wait()
            self._readers += 1
            self._reader_counts[tid] += 1

    def release_read(self):
        tid = threading.current_thread().ident
        with self._lock:
            self._reader_counts[tid] -= 1
            if self._reader_counts[tid] == 0:
                del self._reader_counts[tid]
                self._readers -= 1
                if self._readers == 0:
                    self._writers_ok.notify()

    def acquire_write(self):
        tid = threading.current_thread().ident
        with self._lock:
            # Re-entrant write
            if self._writer_thread == tid:
                self._writer_reentrant += 1
                return
            self._writers_waiting += 1
            while self._writer_active or self._readers > 0:
                self._writers_ok.wait()
            self._writers_waiting -= 1
            self._writer_active = True
            self._writer_thread = tid

    def release_write(self):
        with self._lock:
            if self._writer_reentrant > 0:
                self._writer_reentrant -= 1
                return
            self._writer_active = False
            self._writer_thread = None
            self._readers_ok.notify_all()
            self._writers_ok.notify()

Async reader-writer lock

For asyncio-based applications:

import asyncio

class AsyncRWLock:
    def __init__(self):
        self._readers = 0
        self._writer_active = False
        self._writers_waiting = 0
        self._lock = asyncio.Lock()
        self._reader_ready = asyncio.Condition(self._lock)
        self._writer_ready = asyncio.Condition(self._lock)

    async def acquire_read(self):
        async with self._lock:
            while self._writer_active or self._writers_waiting > 0:
                await self._reader_ready.wait()
            self._readers += 1

    async def release_read(self):
        async with self._lock:
            self._readers -= 1
            if self._readers == 0:
                self._writer_ready.notify()

    async def acquire_write(self):
        async with self._lock:
            self._writers_waiting += 1
            while self._writer_active or self._readers > 0:
                await self._writer_ready.wait()
            self._writers_waiting -= 1
            self._writer_active = True

    async def release_write(self):
        async with self._lock:
            self._writer_active = False
            self._reader_ready.notify_all()
            self._writer_ready.notify()

Real-world use case: cached configuration

import time
import threading
from typing import Any, Optional

class ConfigCache:
    """Thread-safe configuration cache using RWLock.
    Reads are frequent (every request), writes are rare (config reload).
    """

    def __init__(self):
        self._data: dict[str, Any] = {}
        self._rw = RWLock()
        self._loaded_at: Optional[float] = None

    def get(self, key: str, default: Any = None) -> Any:
        with self._rw.read_lock():
            return self._data.get(key, default)

    def get_many(self, keys: list[str]) -> dict[str, Any]:
        with self._rw.read_lock():
            return {k: self._data[k] for k in keys if k in self._data}

    def reload(self, new_config: dict[str, Any]):
        with self._rw.write_lock():
            self._data = new_config.copy()
            self._loaded_at = time.monotonic()

    def age_seconds(self) -> float:
        with self._rw.read_lock():
            if self._loaded_at is None:
                return float("inf")
            return time.monotonic() - self._loaded_at

With 100 threads reading configuration and a background thread reloading every 60 seconds, the read lock allows near-zero contention during normal operation. The write lock only blocks readers during the brief reload.

Performance benchmarks

Testing with 8 threads, 1M total operations at various read-write ratios:

Read:Write ratiothreading.Lock (ops/sec)RWLock (ops/sec)Speedup
100:0 (all reads)850K3.2M3.8×
95:5820K2.8M3.4×
80:20800K1.5M1.9×
50:50780K700K0.9× (slower!)
0:100 (all writes)850K600K0.7× (slower!)

The crossover point is around 70-80% reads. Below that, RWLock overhead exceeds the benefit. Above that, concurrent reads provide significant speedup.

Upgrade and downgrade

Some RWLock implementations support upgrade (read → write) and downgrade (write → read):

  • Upgrade is dangerous: if two readers try to upgrade simultaneously, they deadlock (each waits for the other to release their read lock). Only allow upgrade when you can guarantee a single upgrader.
  • Downgrade is safe: the writer releases exclusive access and acquires shared access atomically, allowing waiting readers to proceed.
# Downgrade pattern (safe)
def update_and_continue_reading(rw_lock, cache, key, value):
    with rw_lock.write_lock():
        cache[key] = value
    # Implicitly downgrades to read after write_lock exits
    with rw_lock.read_lock():
        return cache.copy()

Alternatives to RWLocks

AlternativeWhen to prefer
Copy-on-write (immutable snapshots)Very high read rate, writes can be slightly delayed
Stamped locks (Java-style)Optimistic reads that validate before using data
SeqlocksVery fast reads, rare writes, numeric data
Lock-free structuresWhen blocking is unacceptable
threading.LockLow contention or write-heavy workloads

The one thing to remember: reader-writer locks provide significant throughput gains when reads dominate (>80%). Build them with writer-preference to prevent starvation, use context managers for safety, and benchmark against a simple Lock before committing — the overhead is only worthwhile when reads are both frequent and expensive enough to matter.

pythonadvancedconcurrency

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.