Python Feature Flag Strategies — Deep Dive

Building a Flag Evaluation Engine

A production flag system needs three things: a flag definition, an evaluation context, and a resolver that matches them.

Flag Definitions

from dataclasses import dataclass, field
from enum import Enum
from datetime import date
from typing import Any

class FlagType(str, Enum):
    RELEASE = "release"
    EXPERIMENT = "experiment"
    OPS = "ops"
    PERMISSION = "permission"

@dataclass
class RolloutRule:
    """A single rule in the evaluation chain."""
    attribute: str           # e.g., "country", "plan", "user_id"
    operator: str            # "eq", "in", "gt", "contains", "percentage"
    value: Any               # comparison value
    variant: str = "on"      # which variant to serve if rule matches

@dataclass
class FlagDefinition:
    key: str
    flag_type: FlagType
    enabled: bool = True
    default_variant: str = "off"
    rules: list[RolloutRule] = field(default_factory=list)
    variants: dict[str, Any] = field(default_factory=lambda: {"on": True, "off": False})
    owner: str = ""
    expires: date | None = None
    description: str = ""

Evaluation Context

Every flag evaluation receives context about the current request:

@dataclass
class EvalContext:
    user_id: str = ""
    email: str = ""
    country: str = ""
    plan: str = "free"
    device: str = ""
    custom: dict[str, Any] = field(default_factory=dict)

    def get_attribute(self, name: str) -> Any:
        if hasattr(self, name):
            return getattr(self, name)
        return self.custom.get(name)

The Evaluation Engine

import hashlib

class FlagEvaluator:
    def __init__(self, flags: dict[str, FlagDefinition]):
        self.flags = flags

    def evaluate(self, flag_key: str, context: EvalContext) -> Any:
        flag = self.flags.get(flag_key)
        if not flag or not flag.enabled:
            return flag.variants[flag.default_variant] if flag else None

        for rule in flag.rules:
            if self._matches_rule(rule, context, flag_key):
                return flag.variants.get(rule.variant, flag.variants[flag.default_variant])

        return flag.variants[flag.default_variant]

    def _matches_rule(self, rule: RolloutRule, ctx: EvalContext, flag_key: str) -> bool:
        if rule.operator == "percentage":
            return self._in_percentage(ctx.user_id, flag_key, rule.value)

        attr_value = ctx.get_attribute(rule.attribute)
        if attr_value is None:
            return False

        match rule.operator:
            case "eq":
                return attr_value == rule.value
            case "neq":
                return attr_value != rule.value
            case "in":
                return attr_value in rule.value
            case "not_in":
                return attr_value not in rule.value
            case "contains":
                return rule.value in str(attr_value)
            case "gt":
                return attr_value > rule.value
            case "lt":
                return attr_value < rule.value
            case _:
                return False

    def _in_percentage(self, user_id: str, flag_key: str, percentage: float) -> bool:
        """Consistent hashing: same user + flag always gets same bucket."""
        hash_input = f"{flag_key}:{user_id}".encode()
        hash_value = int(hashlib.sha256(hash_input).hexdigest()[:8], 16)
        bucket = (hash_value % 10000) / 100  # 0.00 to 99.99
        return bucket < percentage

The consistent hashing in _in_percentage is critical. Using user_id + flag_key as the hash input ensures:

  • The same user always sees the same variant for a given flag
  • Different flags have independent rollout populations (changing flag A’s percentage doesn’t affect who sees flag B)

Flag Storage Backends

In-Memory with File Sync

Good for small teams and simple setups:

import json
import time
from pathlib import Path

class FileFlagStore:
    def __init__(self, path: str, poll_interval: float = 5.0):
        self.path = Path(path)
        self._flags: dict[str, FlagDefinition] = {}
        self._last_modified: float = 0
        self._poll_interval = poll_interval

    def load(self) -> dict[str, FlagDefinition]:
        mtime = self.path.stat().st_mtime
        if mtime <= self._last_modified:
            return self._flags

        with open(self.path) as f:
            raw = json.load(f)

        self._flags = {
            key: FlagDefinition(
                key=key,
                flag_type=FlagType(data["type"]),
                enabled=data.get("enabled", True),
                rules=[RolloutRule(**r) for r in data.get("rules", [])],
                variants=data.get("variants", {"on": True, "off": False}),
                owner=data.get("owner", ""),
                expires=date.fromisoformat(data["expires"]) if data.get("expires") else None,
            )
            for key, data in raw.items()
        }
        self._last_modified = mtime
        return self._flags

Redis-Backed for Distributed Systems

import redis
import json

class RedisFlagStore:
    def __init__(self, redis_client: redis.Redis, prefix: str = "flags:"):
        self.redis = redis_client
        self.prefix = prefix
        self._local_cache: dict[str, FlagDefinition] = {}
        self._cache_ttl: float = 5.0
        self._last_refresh: float = 0

    async def get_flags(self) -> dict[str, FlagDefinition]:
        now = time.monotonic()
        if now - self._last_refresh < self._cache_ttl:
            return self._local_cache

        keys = await self.redis.keys(f"{self.prefix}*")
        if not keys:
            return self._local_cache

        pipe = self.redis.pipeline()
        for key in keys:
            pipe.get(key)
        values = await pipe.execute()

        flags = {}
        for key, value in zip(keys, values):
            if value:
                flag_key = key.decode().removeprefix(self.prefix)
                data = json.loads(value)
                flags[flag_key] = self._parse_flag(flag_key, data)

        self._local_cache = flags
        self._last_refresh = now
        return flags

The local cache avoids hitting Redis on every evaluation. A 5-second TTL means flag changes propagate within 5 seconds — fast enough for most use cases.

Integration with LaunchDarkly

For teams that want a managed service:

import ldclient
from ldclient.config import Config
from ldclient import Context

ldclient.set_config(Config("sdk-key-from-dashboard"))
client = ldclient.get()

def is_feature_on(flag_key: str, user_id: str, **attributes) -> bool:
    context = Context.builder(user_id).kind("user")
    for key, value in attributes.items():
        context.set(key, value)

    return client.variation(flag_key, context.build(), default=False)

# Usage:
if is_feature_on("new-checkout", user.id, plan=user.plan, country=user.country):
    return new_checkout_flow()

Integration with Unleash (Self-Hosted)

from UnleashClient import UnleashClient

client = UnleashClient(
    url="https://unleash.internal/api",
    app_name="my-python-app",
    custom_headers={"Authorization": "token"},
)
client.initialize_client()

def check_flag(flag_key: str, context: dict) -> bool:
    return client.is_enabled(
        flag_key,
        context=context,
        default_value=False,
    )

Flag Lifecycle Management

Stale flags are technical debt. Automate their detection:

from datetime import date, timedelta

class FlagLifecycleManager:
    def __init__(self, store):
        self.store = store

    def find_expired(self) -> list[FlagDefinition]:
        today = date.today()
        return [
            flag for flag in self.store.values()
            if flag.expires and flag.expires < today
        ]

    def find_stale(self, max_age_days: int = 90) -> list[FlagDefinition]:
        """Release and experiment flags older than max_age_days."""
        cutoff = date.today() - timedelta(days=max_age_days)
        return [
            flag for flag in self.store.values()
            if flag.flag_type in (FlagType.RELEASE, FlagType.EXPERIMENT)
            and flag.expires
            and flag.expires < cutoff
        ]

    def generate_cleanup_report(self) -> str:
        expired = self.find_expired()
        stale = self.find_stale()
        lines = ["# Flag Cleanup Report", ""]
        
        if expired:
            lines.append(f"## Expired ({len(expired)})")
            for f in expired:
                lines.append(f"- `{f.key}` (owner: {f.owner}, expired: {f.expires})")
        
        if stale:
            lines.append(f"\n## Stale ({len(stale)})")
            for f in stale:
                lines.append(f"- `{f.key}` (owner: {f.owner}, type: {f.flag_type.value})")
        
        return "\n".join(lines)

Run this as a weekly CI job that posts to Slack or creates Jira tickets.

FastAPI Middleware for Flag Context

Inject flag evaluation into every request:

from fastapi import Request, Depends
from starlette.middleware.base import BaseHTTPMiddleware

class FeatureFlagMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, evaluator: FlagEvaluator):
        super().__init__(app)
        self.evaluator = evaluator

    async def dispatch(self, request: Request, call_next):
        user = getattr(request.state, "user", None)
        context = EvalContext(
            user_id=user.id if user else "",
            email=user.email if user else "",
            country=request.headers.get("cf-ipcountry", ""),
            plan=user.plan if user else "free",
        )
        request.state.flags = FlagAccessor(self.evaluator, context)
        return await call_next(request)

class FlagAccessor:
    def __init__(self, evaluator: FlagEvaluator, context: EvalContext):
        self._evaluator = evaluator
        self._context = context
        self._evaluated: dict[str, Any] = {}

    def is_on(self, flag_key: str) -> bool:
        if flag_key not in self._evaluated:
            self._evaluated[flag_key] = self._evaluator.evaluate(flag_key, self._context)
        return bool(self._evaluated[flag_key])

    def get_variant(self, flag_key: str) -> Any:
        if flag_key not in self._evaluated:
            self._evaluated[flag_key] = self._evaluator.evaluate(flag_key, self._context)
        return self._evaluated[flag_key]

# In route handlers:
@app.get("/checkout")
async def checkout(request: Request):
    if request.state.flags.is_on("new-checkout"):
        return new_checkout()
    return old_checkout()

The FlagAccessor caches evaluations within a single request, so checking the same flag twice doesn’t re-evaluate rules.

Testing with Feature Flags

import pytest
from unittest.mock import patch

@pytest.fixture
def flags_on():
    """Enable all flags in tests."""
    def always_true(flag_key, context):
        return True
    with patch.object(FlagEvaluator, 'evaluate', side_effect=always_true):
        yield

@pytest.fixture
def flags_off():
    """Disable all flags in tests."""
    def always_false(flag_key, context):
        return False
    with patch.object(FlagEvaluator, 'evaluate', side_effect=always_false):
        yield

def test_new_checkout_enabled(client, flags_on):
    response = client.get("/checkout")
    assert "new-checkout" in response.text

def test_old_checkout_fallback(client, flags_off):
    response = client.get("/checkout")
    assert "classic-checkout" in response.text

Always test both flag states. If you only test with the flag on, you’ll break the off path without knowing it.

One thing to remember: A feature flag system is an evaluation engine (rules + context + consistent hashing), a storage backend (file, Redis, or SaaS), and a lifecycle manager (expire, clean up, report). Skipping the lifecycle part guarantees flag debt.

pythonfeature-flagsproduction

See Also