etcd Distributed Config with Python — Deep Dive
etcd’s consistency model
etcd uses the Raft consensus algorithm, which requires a quorum (majority) of nodes to agree before a write is committed. In a 3-node cluster, 2 nodes must acknowledge a write. In a 5-node cluster, 3 must agree.
This provides linearizable reads by default — every read reflects the most recent committed write across the entire cluster. This is stronger than most databases, which often offer eventual consistency. The trade-off is latency: writes must round-trip to multiple nodes.
Key characteristics:
- Write latency: typically 2-10ms in a local cluster, higher across regions
- Read latency: sub-millisecond for serializable reads, slightly higher for linearizable
- Storage limit: default 2GB (configurable to 8GB), designed for config data not bulk storage
- Key limit: no hard limit, but performance degrades beyond a few hundred thousand keys
- Watch throughput: can handle thousands of concurrent watchers efficiently
Dynamic configuration manager
A production-grade configuration manager for Python services:
import etcd3
import json
import threading
import logging
from typing import Any, Callable
from dataclasses import dataclass, field
log = logging.getLogger(__name__)
@dataclass
class ConfigValue:
raw: bytes
revision: int
key: str
@property
def string(self) -> str:
return self.raw.decode("utf-8")
@property
def json(self) -> Any:
return json.loads(self.raw)
@property
def integer(self) -> int:
return int(self.raw)
@property
def boolean(self) -> bool:
return self.raw.decode().lower() in ("true", "1", "yes")
class DynamicConfig:
"""Thread-safe dynamic configuration backed by etcd."""
def __init__(
self,
prefix: str,
host: str = "localhost",
port: int = 2379,
):
self.prefix = prefix.rstrip("/") + "/"
self.client = etcd3.client(host=host, port=port)
self._cache: dict[str, ConfigValue] = {}
self._callbacks: dict[str, list[Callable]] = {}
self._lock = threading.Lock()
self._watch_id = None
self._load_initial()
self._start_watching()
def _load_initial(self) -> None:
"""Load all current values under the prefix."""
for value, metadata in self.client.get_prefix(self.prefix):
key = metadata.key.decode()
short_key = key[len(self.prefix):]
self._cache[short_key] = ConfigValue(
raw=value,
revision=metadata.mod_revision,
key=short_key,
)
log.info(f"Loaded {len(self._cache)} config values from {self.prefix}")
def _start_watching(self) -> None:
"""Start watching for changes."""
self._watch_id = self.client.add_watch_prefix_callback(
self.prefix,
self._on_change,
)
def _on_change(self, event) -> None:
"""Handle a config change event."""
key = event.key.decode()
short_key = key[len(self.prefix):]
with self._lock:
if isinstance(event, etcd3.events.DeleteEvent):
self._cache.pop(short_key, None)
log.info(f"Config deleted: {short_key}")
else:
self._cache[short_key] = ConfigValue(
raw=event.value,
revision=event.mod_revision,
key=short_key,
)
log.info(f"Config updated: {short_key}")
# Fire registered callbacks
for callback in self._callbacks.get(short_key, []):
try:
callback(short_key, self._cache.get(short_key))
except Exception as e:
log.error(f"Callback error for {short_key}: {e}")
def get(self, key: str, default: Any = None) -> ConfigValue | None:
"""Get a config value by key (relative to prefix)."""
with self._lock:
return self._cache.get(key, default)
def get_string(self, key: str, default: str = "") -> str:
val = self.get(key)
return val.string if val else default
def get_int(self, key: str, default: int = 0) -> int:
val = self.get(key)
return val.integer if val else default
def get_bool(self, key: str, default: bool = False) -> bool:
val = self.get(key)
return val.boolean if val else default
def get_json(self, key: str, default: Any = None) -> Any:
val = self.get(key)
return val.json if val else default
def on_change(self, key: str, callback: Callable) -> None:
"""Register a callback for when a specific key changes."""
self._callbacks.setdefault(key, []).append(callback)
def set(self, key: str, value: str | bytes) -> None:
"""Write a config value."""
if isinstance(value, str):
value = value.encode()
self.client.put(self.prefix + key, value)
def close(self) -> None:
if self._watch_id is not None:
self.client.cancel_watch(self._watch_id)
self.client.close()
Usage:
config = DynamicConfig(prefix="/config/myapp/", host="etcd.internal")
# Read values
db_url = config.get_string("database_url", "postgresql://localhost/dev")
rate_limit = config.get_int("rate_limit", 100)
maintenance = config.get_bool("maintenance_mode", False)
# React to changes
def on_rate_limit_change(key, value):
print(f"Rate limit changed to {value.integer}")
update_rate_limiter(value.integer)
config.on_change("rate_limit", on_rate_limit_change)
Leader election
When only one instance of a service should perform a task (scheduled jobs, data aggregation), etcd enables safe leader election:
import etcd3
import threading
import time
import logging
import uuid
log = logging.getLogger(__name__)
class LeaderElection:
"""Simple leader election using etcd leases."""
def __init__(
self,
client: etcd3.Etcd3Client,
election_key: str,
ttl: int = 10,
):
self.client = client
self.election_key = election_key
self.ttl = ttl
self.instance_id = str(uuid.uuid4())[:8]
self.is_leader = False
self.lease = None
self._running = False
def run(self, on_elected: Callable, on_revoked: Callable) -> None:
"""Run the election loop."""
self._running = True
while self._running:
try:
if not self.is_leader:
self._try_become_leader(on_elected)
else:
self._maintain_leadership()
except Exception as e:
log.error(f"Election error: {e}")
if self.is_leader:
self.is_leader = False
on_revoked()
time.sleep(self.ttl // 3)
def _try_become_leader(self, on_elected: Callable) -> None:
self.lease = self.client.lease(self.ttl)
# Atomic compare-and-swap: only succeed if key doesn't exist
success, _ = self.client.transaction(
compare=[
self.client.transactions.create(self.election_key) == 0
],
success=[
self.client.transactions.put(
self.election_key,
self.instance_id.encode(),
self.lease,
)
],
failure=[],
)
if success:
self.is_leader = True
log.info(f"Instance {self.instance_id} elected as leader")
on_elected()
def _maintain_leadership(self) -> None:
if self.lease:
self.lease.refresh()
def resign(self) -> None:
"""Voluntarily give up leadership."""
self._running = False
if self.lease:
self.lease.revoke()
self.is_leader = False
log.info(f"Instance {self.instance_id} resigned leadership")
Distributed locking
etcd’s transactions enable distributed locks for coordinating Python services:
class DistributedLock:
"""Distributed lock using etcd leases and transactions."""
def __init__(
self,
client: etcd3.Etcd3Client,
lock_name: str,
ttl: int = 30,
):
self.client = client
self.lock_key = f"/locks/{lock_name}"
self.ttl = ttl
self.lease = None
self.lock_id = str(uuid.uuid4())[:8]
def acquire(self, timeout: float = 10.0) -> bool:
"""Try to acquire the lock within timeout seconds."""
deadline = time.monotonic() + timeout
self.lease = self.client.lease(self.ttl)
while time.monotonic() < deadline:
success, _ = self.client.transaction(
compare=[
self.client.transactions.create(self.lock_key) == 0
],
success=[
self.client.transactions.put(
self.lock_key,
self.lock_id.encode(),
self.lease,
)
],
failure=[],
)
if success:
return True
time.sleep(0.5)
self.lease.revoke()
return False
def release(self) -> None:
"""Release the lock."""
if self.lease:
self.lease.revoke()
self.lease = None
def __enter__(self):
if not self.acquire():
raise TimeoutError(f"Could not acquire lock {self.lock_key}")
return self
def __exit__(self, *args):
self.release()
Usage:
client = etcd3.client(host="etcd.internal")
with DistributedLock(client, "migration-lock", ttl=300):
run_database_migration()
Service discovery pattern
class ServiceRegistry:
"""Register and discover services via etcd."""
def __init__(self, client: etcd3.Etcd3Client, namespace: str = "/services"):
self.client = client
self.namespace = namespace
def register(
self, service_name: str, host: str, port: int, ttl: int = 15,
) -> etcd3.Lease:
"""Register a service instance with a lease."""
lease = self.client.lease(ttl)
key = f"{self.namespace}/{service_name}/{host}:{port}"
value = json.dumps({
"host": host,
"port": port,
"registered_at": time.time(),
})
self.client.put(key, value.encode(), lease)
return lease # Caller must refresh this lease periodically
def discover(self, service_name: str) -> list[dict]:
"""Find all healthy instances of a service."""
prefix = f"{self.namespace}/{service_name}/"
instances = []
for value, metadata in self.client.get_prefix(prefix):
try:
instances.append(json.loads(value.decode()))
except json.JSONDecodeError:
continue
return instances
def watch_service(
self, service_name: str, callback: Callable[[list[dict]], None],
) -> int:
"""Watch for changes in service instances."""
prefix = f"{self.namespace}/{service_name}/"
def on_event(event):
instances = self.discover(service_name)
callback(instances)
return self.client.add_watch_prefix_callback(prefix, on_event)
Performance and operational considerations
- Cluster sizing: 3 nodes for most use cases, 5 for higher availability. More nodes increase write latency without proportional benefit.
- Compaction: etcd keeps all revisions by default. Enable auto-compaction (
--auto-compaction-retention=1) to prevent storage bloat. - Defragmentation: periodic defrag reclaims disk space after compaction. Schedule during low-traffic windows.
- Snapshot backups:
etcdctl snapshot savecreates point-in-time backups. Automate this with a cron job or Kubernetes CronJob. - Watch scalability: etcd handles thousands of concurrent watchers, but each watch consumes server memory. Use prefix watches instead of individual key watches when possible.
| Metric | Typical value |
|---|---|
| Write latency (3-node, local) | 2-10ms |
| Read latency (serializable) | < 1ms |
| Concurrent watches | 10,000+ |
| Recommended data size | < 2GB |
| Key-value size limit | 1.5MB per value |
etcd vs alternatives for Python
| Feature | etcd | Consul | ZooKeeper | Redis |
|---|---|---|---|---|
| Consistency | Strong (Raft) | Strong (Raft) | Strong (ZAB) | Eventual |
| Watch support | Native gRPC | Long-poll HTTP | Native | Pub/sub |
| Python client | etcd3 | python-consul | kazoo | redis-py |
| K8s integration | Built-in | Add-on | Not standard | Not standard |
| Service mesh | No | Yes (Connect) | No | No |
| Complexity | Low | Medium | High | Low |
The one thing to remember: etcd’s combination of strong consistency, real-time watches, leases, and transactions makes it the ideal building block for Python distributed systems — from dynamic configuration through leader election to service discovery.
See Also
- Python Ansible Automation How Python powers Ansible to automatically set up and manage hundreds of servers without logging into each one
- Python Docker Compose Orchestration How Python developers use Docker Compose to run multiple services together like a conductor leading an orchestra
- Python Helm Charts Python Why Python developers use Helm charts to package and deploy their apps to Kubernetes clusters
- Python Nomad Job Scheduling How Python developers use HashiCorp Nomad to run their programs across many computers without managing each one
- Python Pulumi Infrastructure How Python developers use Pulumi to build cloud infrastructure using the same language they already know