Python Feature Flag Strategies — Deep Dive
Building a Flag Evaluation Engine
A production flag system needs three things: a flag definition, an evaluation context, and a resolver that matches them.
Flag Definitions
from dataclasses import dataclass, field
from enum import Enum
from datetime import date
from typing import Any
class FlagType(str, Enum):
RELEASE = "release"
EXPERIMENT = "experiment"
OPS = "ops"
PERMISSION = "permission"
@dataclass
class RolloutRule:
"""A single rule in the evaluation chain."""
attribute: str # e.g., "country", "plan", "user_id"
operator: str # "eq", "in", "gt", "contains", "percentage"
value: Any # comparison value
variant: str = "on" # which variant to serve if rule matches
@dataclass
class FlagDefinition:
key: str
flag_type: FlagType
enabled: bool = True
default_variant: str = "off"
rules: list[RolloutRule] = field(default_factory=list)
variants: dict[str, Any] = field(default_factory=lambda: {"on": True, "off": False})
owner: str = ""
expires: date | None = None
description: str = ""
Evaluation Context
Every flag evaluation receives context about the current request:
@dataclass
class EvalContext:
user_id: str = ""
email: str = ""
country: str = ""
plan: str = "free"
device: str = ""
custom: dict[str, Any] = field(default_factory=dict)
def get_attribute(self, name: str) -> Any:
if hasattr(self, name):
return getattr(self, name)
return self.custom.get(name)
The Evaluation Engine
import hashlib
class FlagEvaluator:
def __init__(self, flags: dict[str, FlagDefinition]):
self.flags = flags
def evaluate(self, flag_key: str, context: EvalContext) -> Any:
flag = self.flags.get(flag_key)
if not flag or not flag.enabled:
return flag.variants[flag.default_variant] if flag else None
for rule in flag.rules:
if self._matches_rule(rule, context, flag_key):
return flag.variants.get(rule.variant, flag.variants[flag.default_variant])
return flag.variants[flag.default_variant]
def _matches_rule(self, rule: RolloutRule, ctx: EvalContext, flag_key: str) -> bool:
if rule.operator == "percentage":
return self._in_percentage(ctx.user_id, flag_key, rule.value)
attr_value = ctx.get_attribute(rule.attribute)
if attr_value is None:
return False
match rule.operator:
case "eq":
return attr_value == rule.value
case "neq":
return attr_value != rule.value
case "in":
return attr_value in rule.value
case "not_in":
return attr_value not in rule.value
case "contains":
return rule.value in str(attr_value)
case "gt":
return attr_value > rule.value
case "lt":
return attr_value < rule.value
case _:
return False
def _in_percentage(self, user_id: str, flag_key: str, percentage: float) -> bool:
"""Consistent hashing: same user + flag always gets same bucket."""
hash_input = f"{flag_key}:{user_id}".encode()
hash_value = int(hashlib.sha256(hash_input).hexdigest()[:8], 16)
bucket = (hash_value % 10000) / 100 # 0.00 to 99.99
return bucket < percentage
The consistent hashing in _in_percentage is critical. Using user_id + flag_key as the hash input ensures:
- The same user always sees the same variant for a given flag
- Different flags have independent rollout populations (changing flag A’s percentage doesn’t affect who sees flag B)
Flag Storage Backends
In-Memory with File Sync
Good for small teams and simple setups:
import json
import time
from pathlib import Path
class FileFlagStore:
def __init__(self, path: str, poll_interval: float = 5.0):
self.path = Path(path)
self._flags: dict[str, FlagDefinition] = {}
self._last_modified: float = 0
self._poll_interval = poll_interval
def load(self) -> dict[str, FlagDefinition]:
mtime = self.path.stat().st_mtime
if mtime <= self._last_modified:
return self._flags
with open(self.path) as f:
raw = json.load(f)
self._flags = {
key: FlagDefinition(
key=key,
flag_type=FlagType(data["type"]),
enabled=data.get("enabled", True),
rules=[RolloutRule(**r) for r in data.get("rules", [])],
variants=data.get("variants", {"on": True, "off": False}),
owner=data.get("owner", ""),
expires=date.fromisoformat(data["expires"]) if data.get("expires") else None,
)
for key, data in raw.items()
}
self._last_modified = mtime
return self._flags
Redis-Backed for Distributed Systems
import redis
import json
class RedisFlagStore:
def __init__(self, redis_client: redis.Redis, prefix: str = "flags:"):
self.redis = redis_client
self.prefix = prefix
self._local_cache: dict[str, FlagDefinition] = {}
self._cache_ttl: float = 5.0
self._last_refresh: float = 0
async def get_flags(self) -> dict[str, FlagDefinition]:
now = time.monotonic()
if now - self._last_refresh < self._cache_ttl:
return self._local_cache
keys = await self.redis.keys(f"{self.prefix}*")
if not keys:
return self._local_cache
pipe = self.redis.pipeline()
for key in keys:
pipe.get(key)
values = await pipe.execute()
flags = {}
for key, value in zip(keys, values):
if value:
flag_key = key.decode().removeprefix(self.prefix)
data = json.loads(value)
flags[flag_key] = self._parse_flag(flag_key, data)
self._local_cache = flags
self._last_refresh = now
return flags
The local cache avoids hitting Redis on every evaluation. A 5-second TTL means flag changes propagate within 5 seconds — fast enough for most use cases.
Integration with LaunchDarkly
For teams that want a managed service:
import ldclient
from ldclient.config import Config
from ldclient import Context
ldclient.set_config(Config("sdk-key-from-dashboard"))
client = ldclient.get()
def is_feature_on(flag_key: str, user_id: str, **attributes) -> bool:
context = Context.builder(user_id).kind("user")
for key, value in attributes.items():
context.set(key, value)
return client.variation(flag_key, context.build(), default=False)
# Usage:
if is_feature_on("new-checkout", user.id, plan=user.plan, country=user.country):
return new_checkout_flow()
Integration with Unleash (Self-Hosted)
from UnleashClient import UnleashClient
client = UnleashClient(
url="https://unleash.internal/api",
app_name="my-python-app",
custom_headers={"Authorization": "token"},
)
client.initialize_client()
def check_flag(flag_key: str, context: dict) -> bool:
return client.is_enabled(
flag_key,
context=context,
default_value=False,
)
Flag Lifecycle Management
Stale flags are technical debt. Automate their detection:
from datetime import date, timedelta
class FlagLifecycleManager:
def __init__(self, store):
self.store = store
def find_expired(self) -> list[FlagDefinition]:
today = date.today()
return [
flag for flag in self.store.values()
if flag.expires and flag.expires < today
]
def find_stale(self, max_age_days: int = 90) -> list[FlagDefinition]:
"""Release and experiment flags older than max_age_days."""
cutoff = date.today() - timedelta(days=max_age_days)
return [
flag for flag in self.store.values()
if flag.flag_type in (FlagType.RELEASE, FlagType.EXPERIMENT)
and flag.expires
and flag.expires < cutoff
]
def generate_cleanup_report(self) -> str:
expired = self.find_expired()
stale = self.find_stale()
lines = ["# Flag Cleanup Report", ""]
if expired:
lines.append(f"## Expired ({len(expired)})")
for f in expired:
lines.append(f"- `{f.key}` (owner: {f.owner}, expired: {f.expires})")
if stale:
lines.append(f"\n## Stale ({len(stale)})")
for f in stale:
lines.append(f"- `{f.key}` (owner: {f.owner}, type: {f.flag_type.value})")
return "\n".join(lines)
Run this as a weekly CI job that posts to Slack or creates Jira tickets.
FastAPI Middleware for Flag Context
Inject flag evaluation into every request:
from fastapi import Request, Depends
from starlette.middleware.base import BaseHTTPMiddleware
class FeatureFlagMiddleware(BaseHTTPMiddleware):
def __init__(self, app, evaluator: FlagEvaluator):
super().__init__(app)
self.evaluator = evaluator
async def dispatch(self, request: Request, call_next):
user = getattr(request.state, "user", None)
context = EvalContext(
user_id=user.id if user else "",
email=user.email if user else "",
country=request.headers.get("cf-ipcountry", ""),
plan=user.plan if user else "free",
)
request.state.flags = FlagAccessor(self.evaluator, context)
return await call_next(request)
class FlagAccessor:
def __init__(self, evaluator: FlagEvaluator, context: EvalContext):
self._evaluator = evaluator
self._context = context
self._evaluated: dict[str, Any] = {}
def is_on(self, flag_key: str) -> bool:
if flag_key not in self._evaluated:
self._evaluated[flag_key] = self._evaluator.evaluate(flag_key, self._context)
return bool(self._evaluated[flag_key])
def get_variant(self, flag_key: str) -> Any:
if flag_key not in self._evaluated:
self._evaluated[flag_key] = self._evaluator.evaluate(flag_key, self._context)
return self._evaluated[flag_key]
# In route handlers:
@app.get("/checkout")
async def checkout(request: Request):
if request.state.flags.is_on("new-checkout"):
return new_checkout()
return old_checkout()
The FlagAccessor caches evaluations within a single request, so checking the same flag twice doesn’t re-evaluate rules.
Testing with Feature Flags
import pytest
from unittest.mock import patch
@pytest.fixture
def flags_on():
"""Enable all flags in tests."""
def always_true(flag_key, context):
return True
with patch.object(FlagEvaluator, 'evaluate', side_effect=always_true):
yield
@pytest.fixture
def flags_off():
"""Disable all flags in tests."""
def always_false(flag_key, context):
return False
with patch.object(FlagEvaluator, 'evaluate', side_effect=always_false):
yield
def test_new_checkout_enabled(client, flags_on):
response = client.get("/checkout")
assert "new-checkout" in response.text
def test_old_checkout_fallback(client, flags_off):
response = client.get("/checkout")
assert "classic-checkout" in response.text
Always test both flag states. If you only test with the flag on, you’ll break the off path without knowing it.
One thing to remember: A feature flag system is an evaluation engine (rules + context + consistent hashing), a storage backend (file, Redis, or SaaS), and a lifecycle manager (expire, clean up, report). Skipping the lifecycle part guarantees flag debt.
See Also
- Python Ab Testing Framework How tech companies test two versions of something to see which one wins — explained with a lemonade stand experiment.
- Python Configuration Hierarchy How your Python app decides which settings to use — explained like layers of clothing on a cold day.
- Python Graceful Shutdown Why your Python app needs to say goodbye properly before it stops — explained with a restaurant closing analogy.
- Python Health Check Patterns Why your Python app needs regular check-ups — explained like a doctor's visit for software.
- Python Readiness Liveness Probes The two questions every cloud platform asks your Python app — explained with a school attendance analogy.