Attribute-Based Access Control in Python — Deep Dive

Building a policy engine

A practical ABAC engine needs policies, combining algorithms, and a clean evaluation interface:

from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import Callable

class Decision(Enum):
    PERMIT = "permit"
    DENY = "deny"
    NOT_APPLICABLE = "not_applicable"
    INDETERMINATE = "indeterminate"

@dataclass
class AccessRequest:
    subject: dict
    resource: dict
    action: str
    environment: dict = field(default_factory=dict)

@dataclass
class PolicyResult:
    decision: Decision
    reason: str = ""
    policy_id: str = ""

class Policy:
    def __init__(self, policy_id: str, target: Callable, rules: list[Callable],
                 combining: str = "deny_overrides"):
        self.policy_id = policy_id
        self.target = target       # function(request) -> bool
        self.rules = rules         # list of function(request) -> Decision
        self.combining = combining

    def evaluate(self, request: AccessRequest) -> PolicyResult:
        # Check if policy applies
        if not self.target(request):
            return PolicyResult(Decision.NOT_APPLICABLE, policy_id=self.policy_id)

        decisions = []
        for rule in self.rules:
            try:
                result = rule(request)
                decisions.append(result)
            except Exception as e:
                decisions.append(Decision.INDETERMINATE)

        final = self._combine(decisions)
        return PolicyResult(final, policy_id=self.policy_id)

    def _combine(self, decisions: list[Decision]) -> Decision:
        if self.combining == "deny_overrides":
            if Decision.DENY in decisions:
                return Decision.DENY
            if Decision.PERMIT in decisions:
                return Decision.PERMIT
            return Decision.NOT_APPLICABLE

        elif self.combining == "permit_overrides":
            if Decision.PERMIT in decisions:
                return Decision.PERMIT
            if Decision.DENY in decisions:
                return Decision.DENY
            return Decision.NOT_APPLICABLE

        elif self.combining == "first_applicable":
            for d in decisions:
                if d in (Decision.PERMIT, Decision.DENY):
                    return d
            return Decision.NOT_APPLICABLE

        return Decision.INDETERMINATE

Defining policies with the engine

class PolicyEngine:
    def __init__(self, default_decision: Decision = Decision.DENY):
        self.policies: list[Policy] = []
        self.default = default_decision

    def add_policy(self, policy: Policy):
        self.policies.append(policy)

    def evaluate(self, request: AccessRequest) -> PolicyResult:
        results = [p.evaluate(request) for p in self.policies]
        applicable = [r for r in results if r.decision != Decision.NOT_APPLICABLE]

        if not applicable:
            return PolicyResult(self.default, reason="No applicable policy")

        # Global combining: deny overrides
        for r in applicable:
            if r.decision == Decision.DENY:
                return r
        for r in applicable:
            if r.decision == Decision.PERMIT:
                return r

        return PolicyResult(self.default, reason="No conclusive decision")

# Define policies
engine = PolicyEngine()

# Policy 1: Doctors can read patient records in their department
engine.add_policy(Policy(
    policy_id="doctor-patient-records",
    target=lambda r: (r.action == "read"
                      and r.resource.get("type") == "patient_record"
                      and r.subject.get("role") == "doctor"),
    rules=[
        lambda r: (Decision.PERMIT
                   if r.subject["department"] == r.resource["department"]
                   else Decision.DENY),
        lambda r: (Decision.PERMIT
                   if 7 <= r.environment.get("hour", 12) <= 19
                   else Decision.DENY),
    ],
    combining="deny_overrides",
))

# Policy 2: Employees can read their own records
engine.add_policy(Policy(
    policy_id="own-records",
    target=lambda r: (r.action == "read"
                      and r.resource.get("type") == "employee_record"),
    rules=[
        lambda r: (Decision.PERMIT
                   if r.subject["id"] == r.resource["owner_id"]
                   else Decision.DENY),
    ],
))

# Evaluate
request = AccessRequest(
    subject={"id": "doc-42", "role": "doctor", "department": "cardiology"},
    resource={"type": "patient_record", "department": "cardiology", "id": "rec-99"},
    action="read",
    environment={"hour": 14, "ip": "10.0.1.50"},
)
result = engine.evaluate(request)
print(result.decision)  # Decision.PERMIT

Open Policy Agent (OPA) integration

For complex deployments, externalize policies to OPA — a dedicated policy engine with its own language (Rego):

# Rego policy (stored in OPA)
# policy.rego
"""
package myapp.authz

default allow = false

allow {
    input.action == "read"
    input.resource.type == "document"
    input.subject.clearance >= input.resource.classification_level
    is_business_hours
}

is_business_hours {
    hour := time.clock(time.now_ns())[0]
    hour >= 8
    hour < 18
}
"""

# Python client
import requests

class OPAClient:
    def __init__(self, opa_url: str = "http://localhost:8181"):
        self.url = opa_url

    def evaluate(self, request: AccessRequest) -> bool:
        payload = {
            "input": {
                "subject": request.subject,
                "resource": request.resource,
                "action": request.action,
                "environment": request.environment,
            }
        }

        response = requests.post(
            f"{self.url}/v1/data/myapp/authz/allow",
            json=payload,
            timeout=1,
        )
        result = response.json()
        return result.get("result", False)

opa = OPAClient()

# In your application
def check_access(request: AccessRequest) -> bool:
    try:
        return opa.evaluate(request)
    except requests.RequestException:
        # Fail closed — deny access if OPA is unavailable
        return False

OPA advantages: policies are version-controlled separately from code, can be updated without redeploying your app, and Rego is purpose-built for policy logic.

FastAPI ABAC middleware

from fastapi import FastAPI, Depends, HTTPException, Request
from datetime import datetime, timezone

app = FastAPI()

async def abac_check(
    required_action: str,
    resource_type: str,
    resource_attrs: dict = None,
):
    """Create an ABAC dependency for FastAPI routes."""
    async def checker(request: Request, user: User = Depends(get_current_user)):
        access_request = AccessRequest(
            subject={
                "id": str(user.id),
                "role": user.role,
                "department": user.department,
                "clearance": user.clearance_level,
            },
            resource={
                "type": resource_type,
                **(resource_attrs or {}),
            },
            action=required_action,
            environment={
                "hour": datetime.now(timezone.utc).hour,
                "ip": request.client.host,
                "day_of_week": datetime.now(timezone.utc).weekday(),
            },
        )

        result = engine.evaluate(access_request)
        if result.decision != Decision.PERMIT:
            raise HTTPException(
                status_code=403,
                detail=f"Access denied: {result.reason}",
            )
        return user
    return checker

# Route-level resource attributes
@app.get("/documents/{doc_id}")
async def get_document(
    doc_id: int,
    user: User = Depends(lambda: abac_check("read", "document")),
):
    doc = await fetch_document(doc_id)
    # For resource-specific attributes, do a second check with full context
    access_request = AccessRequest(
        subject={"id": str(user.id), "department": user.department,
                 "clearance": user.clearance_level},
        resource={"type": "document", "department": doc.department,
                  "classification_level": doc.classification,
                  "owner_id": str(doc.owner_id)},
        action="read",
        environment={"hour": datetime.now(timezone.utc).hour},
    )
    result = engine.evaluate(access_request)
    if result.decision != Decision.PERMIT:
        raise HTTPException(status_code=403, detail="Access denied")
    return doc

Django ABAC with middleware

# middleware.py
from django.http import HttpResponseForbidden
import json

class ABACMiddleware:
    PROTECTED_PATTERNS = {
        "/api/documents/": {"action": "read", "resource_type": "document"},
        "/api/patients/": {"action": "read", "resource_type": "patient_record"},
    }

    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        if not request.user.is_authenticated:
            return self.get_response(request)

        for pattern, config in self.PROTECTED_PATTERNS.items():
            if request.path.startswith(pattern):
                access_request = AccessRequest(
                    subject={
                        "id": str(request.user.id),
                        "role": getattr(request.user, "role", "user"),
                        "department": getattr(request.user, "department", ""),
                    },
                    resource={"type": config["resource_type"]},
                    action=self._method_to_action(request.method),
                    environment={
                        "hour": datetime.now().hour,
                        "ip": request.META.get("REMOTE_ADDR", ""),
                    },
                )

                result = engine.evaluate(access_request)
                if result.decision != Decision.PERMIT:
                    return HttpResponseForbidden(
                        json.dumps({"error": "Access denied", "reason": result.reason}),
                        content_type="application/json",
                    )

        return self.get_response(request)

    def _method_to_action(self, method: str) -> str:
        return {"GET": "read", "POST": "create", "PUT": "update",
                "PATCH": "update", "DELETE": "delete"}.get(method, "read")

Performance: caching policy decisions

ABAC evaluation can be expensive when policies involve database lookups or external calls:

import hashlib
import json
import redis

redis_client = redis.Redis()

class CachedPolicyEngine:
    def __init__(self, engine: PolicyEngine, cache_ttl: int = 60):
        self.engine = engine
        self.ttl = cache_ttl

    def evaluate(self, request: AccessRequest) -> PolicyResult:
        # Create cache key from request attributes
        key_data = json.dumps({
            "s": request.subject,
            "r": request.resource,
            "a": request.action,
            "e": {k: v for k, v in request.environment.items()
                  if k not in ("timestamp",)},  # exclude volatile attributes
        }, sort_keys=True)
        cache_key = f"abac:{hashlib.sha256(key_data.encode()).hexdigest()[:16]}"

        cached = redis_client.get(cache_key)
        if cached:
            data = json.loads(cached)
            return PolicyResult(Decision(data["decision"]), data.get("reason", ""))

        result = self.engine.evaluate(request)
        redis_client.setex(
            cache_key,
            self.ttl,
            json.dumps({"decision": result.decision.value, "reason": result.reason}),
        )
        return result

Be careful with caching: volatile environment attributes (time, threat level) can make cache keys unique for every request. Exclude or bucket volatile attributes.

Testing ABAC policies

import pytest
from datetime import datetime

@pytest.fixture
def engine():
    e = PolicyEngine()
    # ... add policies
    return e

@pytest.mark.parametrize("hour,expected", [
    (9, Decision.PERMIT),
    (14, Decision.PERMIT),
    (22, Decision.DENY),
    (3, Decision.DENY),
])
def test_business_hours_constraint(engine, hour, expected):
    request = AccessRequest(
        subject={"role": "doctor", "department": "cardiology"},
        resource={"type": "patient_record", "department": "cardiology"},
        action="read",
        environment={"hour": hour},
    )
    assert engine.evaluate(request).decision == expected

def test_cross_department_denied(engine):
    request = AccessRequest(
        subject={"role": "doctor", "department": "neurology"},
        resource={"type": "patient_record", "department": "cardiology"},
        action="read",
        environment={"hour": 10},
    )
    assert engine.evaluate(request).decision == Decision.DENY

def test_own_record_always_permitted(engine):
    request = AccessRequest(
        subject={"id": "emp-42", "role": "employee"},
        resource={"type": "employee_record", "owner_id": "emp-42"},
        action="read",
        environment={"hour": 23},  # even outside business hours
    )
    assert engine.evaluate(request).decision == Decision.PERMIT

The one thing to remember: Production ABAC in Python requires a policy engine with combining algorithms, separation of policy decisions from enforcement, caching for performance, and thorough testing of attribute combinations — consider OPA for complex policy sets that need to evolve independently from application code.

pythonsecurityauthorizationweb

See Also