etcd Distributed Config with Python — Deep Dive

etcd’s consistency model

etcd uses the Raft consensus algorithm, which requires a quorum (majority) of nodes to agree before a write is committed. In a 3-node cluster, 2 nodes must acknowledge a write. In a 5-node cluster, 3 must agree.

This provides linearizable reads by default — every read reflects the most recent committed write across the entire cluster. This is stronger than most databases, which often offer eventual consistency. The trade-off is latency: writes must round-trip to multiple nodes.

Key characteristics:

  • Write latency: typically 2-10ms in a local cluster, higher across regions
  • Read latency: sub-millisecond for serializable reads, slightly higher for linearizable
  • Storage limit: default 2GB (configurable to 8GB), designed for config data not bulk storage
  • Key limit: no hard limit, but performance degrades beyond a few hundred thousand keys
  • Watch throughput: can handle thousands of concurrent watchers efficiently

Dynamic configuration manager

A production-grade configuration manager for Python services:

import etcd3
import json
import threading
import logging
from typing import Any, Callable
from dataclasses import dataclass, field

log = logging.getLogger(__name__)

@dataclass
class ConfigValue:
    raw: bytes
    revision: int
    key: str

    @property
    def string(self) -> str:
        return self.raw.decode("utf-8")

    @property
    def json(self) -> Any:
        return json.loads(self.raw)

    @property
    def integer(self) -> int:
        return int(self.raw)

    @property
    def boolean(self) -> bool:
        return self.raw.decode().lower() in ("true", "1", "yes")


class DynamicConfig:
    """Thread-safe dynamic configuration backed by etcd."""

    def __init__(
        self,
        prefix: str,
        host: str = "localhost",
        port: int = 2379,
    ):
        self.prefix = prefix.rstrip("/") + "/"
        self.client = etcd3.client(host=host, port=port)
        self._cache: dict[str, ConfigValue] = {}
        self._callbacks: dict[str, list[Callable]] = {}
        self._lock = threading.Lock()
        self._watch_id = None
        self._load_initial()
        self._start_watching()

    def _load_initial(self) -> None:
        """Load all current values under the prefix."""
        for value, metadata in self.client.get_prefix(self.prefix):
            key = metadata.key.decode()
            short_key = key[len(self.prefix):]
            self._cache[short_key] = ConfigValue(
                raw=value,
                revision=metadata.mod_revision,
                key=short_key,
            )
        log.info(f"Loaded {len(self._cache)} config values from {self.prefix}")

    def _start_watching(self) -> None:
        """Start watching for changes."""
        self._watch_id = self.client.add_watch_prefix_callback(
            self.prefix,
            self._on_change,
        )

    def _on_change(self, event) -> None:
        """Handle a config change event."""
        key = event.key.decode()
        short_key = key[len(self.prefix):]

        with self._lock:
            if isinstance(event, etcd3.events.DeleteEvent):
                self._cache.pop(short_key, None)
                log.info(f"Config deleted: {short_key}")
            else:
                self._cache[short_key] = ConfigValue(
                    raw=event.value,
                    revision=event.mod_revision,
                    key=short_key,
                )
                log.info(f"Config updated: {short_key}")

            # Fire registered callbacks
            for callback in self._callbacks.get(short_key, []):
                try:
                    callback(short_key, self._cache.get(short_key))
                except Exception as e:
                    log.error(f"Callback error for {short_key}: {e}")

    def get(self, key: str, default: Any = None) -> ConfigValue | None:
        """Get a config value by key (relative to prefix)."""
        with self._lock:
            return self._cache.get(key, default)

    def get_string(self, key: str, default: str = "") -> str:
        val = self.get(key)
        return val.string if val else default

    def get_int(self, key: str, default: int = 0) -> int:
        val = self.get(key)
        return val.integer if val else default

    def get_bool(self, key: str, default: bool = False) -> bool:
        val = self.get(key)
        return val.boolean if val else default

    def get_json(self, key: str, default: Any = None) -> Any:
        val = self.get(key)
        return val.json if val else default

    def on_change(self, key: str, callback: Callable) -> None:
        """Register a callback for when a specific key changes."""
        self._callbacks.setdefault(key, []).append(callback)

    def set(self, key: str, value: str | bytes) -> None:
        """Write a config value."""
        if isinstance(value, str):
            value = value.encode()
        self.client.put(self.prefix + key, value)

    def close(self) -> None:
        if self._watch_id is not None:
            self.client.cancel_watch(self._watch_id)
        self.client.close()

Usage:

config = DynamicConfig(prefix="/config/myapp/", host="etcd.internal")

# Read values
db_url = config.get_string("database_url", "postgresql://localhost/dev")
rate_limit = config.get_int("rate_limit", 100)
maintenance = config.get_bool("maintenance_mode", False)

# React to changes
def on_rate_limit_change(key, value):
    print(f"Rate limit changed to {value.integer}")
    update_rate_limiter(value.integer)

config.on_change("rate_limit", on_rate_limit_change)

Leader election

When only one instance of a service should perform a task (scheduled jobs, data aggregation), etcd enables safe leader election:

import etcd3
import threading
import time
import logging
import uuid

log = logging.getLogger(__name__)

class LeaderElection:
    """Simple leader election using etcd leases."""

    def __init__(
        self,
        client: etcd3.Etcd3Client,
        election_key: str,
        ttl: int = 10,
    ):
        self.client = client
        self.election_key = election_key
        self.ttl = ttl
        self.instance_id = str(uuid.uuid4())[:8]
        self.is_leader = False
        self.lease = None
        self._running = False

    def run(self, on_elected: Callable, on_revoked: Callable) -> None:
        """Run the election loop."""
        self._running = True
        while self._running:
            try:
                if not self.is_leader:
                    self._try_become_leader(on_elected)
                else:
                    self._maintain_leadership()
            except Exception as e:
                log.error(f"Election error: {e}")
                if self.is_leader:
                    self.is_leader = False
                    on_revoked()
            time.sleep(self.ttl // 3)

    def _try_become_leader(self, on_elected: Callable) -> None:
        self.lease = self.client.lease(self.ttl)

        # Atomic compare-and-swap: only succeed if key doesn't exist
        success, _ = self.client.transaction(
            compare=[
                self.client.transactions.create(self.election_key) == 0
            ],
            success=[
                self.client.transactions.put(
                    self.election_key,
                    self.instance_id.encode(),
                    self.lease,
                )
            ],
            failure=[],
        )

        if success:
            self.is_leader = True
            log.info(f"Instance {self.instance_id} elected as leader")
            on_elected()

    def _maintain_leadership(self) -> None:
        if self.lease:
            self.lease.refresh()

    def resign(self) -> None:
        """Voluntarily give up leadership."""
        self._running = False
        if self.lease:
            self.lease.revoke()
        self.is_leader = False
        log.info(f"Instance {self.instance_id} resigned leadership")

Distributed locking

etcd’s transactions enable distributed locks for coordinating Python services:

class DistributedLock:
    """Distributed lock using etcd leases and transactions."""

    def __init__(
        self,
        client: etcd3.Etcd3Client,
        lock_name: str,
        ttl: int = 30,
    ):
        self.client = client
        self.lock_key = f"/locks/{lock_name}"
        self.ttl = ttl
        self.lease = None
        self.lock_id = str(uuid.uuid4())[:8]

    def acquire(self, timeout: float = 10.0) -> bool:
        """Try to acquire the lock within timeout seconds."""
        deadline = time.monotonic() + timeout
        self.lease = self.client.lease(self.ttl)

        while time.monotonic() < deadline:
            success, _ = self.client.transaction(
                compare=[
                    self.client.transactions.create(self.lock_key) == 0
                ],
                success=[
                    self.client.transactions.put(
                        self.lock_key,
                        self.lock_id.encode(),
                        self.lease,
                    )
                ],
                failure=[],
            )
            if success:
                return True
            time.sleep(0.5)

        self.lease.revoke()
        return False

    def release(self) -> None:
        """Release the lock."""
        if self.lease:
            self.lease.revoke()
            self.lease = None

    def __enter__(self):
        if not self.acquire():
            raise TimeoutError(f"Could not acquire lock {self.lock_key}")
        return self

    def __exit__(self, *args):
        self.release()

Usage:

client = etcd3.client(host="etcd.internal")

with DistributedLock(client, "migration-lock", ttl=300):
    run_database_migration()

Service discovery pattern

class ServiceRegistry:
    """Register and discover services via etcd."""

    def __init__(self, client: etcd3.Etcd3Client, namespace: str = "/services"):
        self.client = client
        self.namespace = namespace

    def register(
        self, service_name: str, host: str, port: int, ttl: int = 15,
    ) -> etcd3.Lease:
        """Register a service instance with a lease."""
        lease = self.client.lease(ttl)
        key = f"{self.namespace}/{service_name}/{host}:{port}"
        value = json.dumps({
            "host": host,
            "port": port,
            "registered_at": time.time(),
        })
        self.client.put(key, value.encode(), lease)
        return lease  # Caller must refresh this lease periodically

    def discover(self, service_name: str) -> list[dict]:
        """Find all healthy instances of a service."""
        prefix = f"{self.namespace}/{service_name}/"
        instances = []
        for value, metadata in self.client.get_prefix(prefix):
            try:
                instances.append(json.loads(value.decode()))
            except json.JSONDecodeError:
                continue
        return instances

    def watch_service(
        self, service_name: str, callback: Callable[[list[dict]], None],
    ) -> int:
        """Watch for changes in service instances."""
        prefix = f"{self.namespace}/{service_name}/"

        def on_event(event):
            instances = self.discover(service_name)
            callback(instances)

        return self.client.add_watch_prefix_callback(prefix, on_event)

Performance and operational considerations

  • Cluster sizing: 3 nodes for most use cases, 5 for higher availability. More nodes increase write latency without proportional benefit.
  • Compaction: etcd keeps all revisions by default. Enable auto-compaction (--auto-compaction-retention=1) to prevent storage bloat.
  • Defragmentation: periodic defrag reclaims disk space after compaction. Schedule during low-traffic windows.
  • Snapshot backups: etcdctl snapshot save creates point-in-time backups. Automate this with a cron job or Kubernetes CronJob.
  • Watch scalability: etcd handles thousands of concurrent watchers, but each watch consumes server memory. Use prefix watches instead of individual key watches when possible.
MetricTypical value
Write latency (3-node, local)2-10ms
Read latency (serializable)< 1ms
Concurrent watches10,000+
Recommended data size< 2GB
Key-value size limit1.5MB per value

etcd vs alternatives for Python

FeatureetcdConsulZooKeeperRedis
ConsistencyStrong (Raft)Strong (Raft)Strong (ZAB)Eventual
Watch supportNative gRPCLong-poll HTTPNativePub/sub
Python clientetcd3python-consulkazooredis-py
K8s integrationBuilt-inAdd-onNot standardNot standard
Service meshNoYes (Connect)NoNo
ComplexityLowMediumHighLow

The one thing to remember: etcd’s combination of strong consistency, real-time watches, leases, and transactions makes it the ideal building block for Python distributed systems — from dynamic configuration through leader election to service discovery.

pythonetcddistributed-systemsconfiguration

See Also