Attribute-Based Access Control in Python — Deep Dive
Building a policy engine
A practical ABAC engine needs policies, combining algorithms, and a clean evaluation interface:
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import Callable
class Decision(Enum):
PERMIT = "permit"
DENY = "deny"
NOT_APPLICABLE = "not_applicable"
INDETERMINATE = "indeterminate"
@dataclass
class AccessRequest:
subject: dict
resource: dict
action: str
environment: dict = field(default_factory=dict)
@dataclass
class PolicyResult:
decision: Decision
reason: str = ""
policy_id: str = ""
class Policy:
def __init__(self, policy_id: str, target: Callable, rules: list[Callable],
combining: str = "deny_overrides"):
self.policy_id = policy_id
self.target = target # function(request) -> bool
self.rules = rules # list of function(request) -> Decision
self.combining = combining
def evaluate(self, request: AccessRequest) -> PolicyResult:
# Check if policy applies
if not self.target(request):
return PolicyResult(Decision.NOT_APPLICABLE, policy_id=self.policy_id)
decisions = []
for rule in self.rules:
try:
result = rule(request)
decisions.append(result)
except Exception as e:
decisions.append(Decision.INDETERMINATE)
final = self._combine(decisions)
return PolicyResult(final, policy_id=self.policy_id)
def _combine(self, decisions: list[Decision]) -> Decision:
if self.combining == "deny_overrides":
if Decision.DENY in decisions:
return Decision.DENY
if Decision.PERMIT in decisions:
return Decision.PERMIT
return Decision.NOT_APPLICABLE
elif self.combining == "permit_overrides":
if Decision.PERMIT in decisions:
return Decision.PERMIT
if Decision.DENY in decisions:
return Decision.DENY
return Decision.NOT_APPLICABLE
elif self.combining == "first_applicable":
for d in decisions:
if d in (Decision.PERMIT, Decision.DENY):
return d
return Decision.NOT_APPLICABLE
return Decision.INDETERMINATE
Defining policies with the engine
class PolicyEngine:
def __init__(self, default_decision: Decision = Decision.DENY):
self.policies: list[Policy] = []
self.default = default_decision
def add_policy(self, policy: Policy):
self.policies.append(policy)
def evaluate(self, request: AccessRequest) -> PolicyResult:
results = [p.evaluate(request) for p in self.policies]
applicable = [r for r in results if r.decision != Decision.NOT_APPLICABLE]
if not applicable:
return PolicyResult(self.default, reason="No applicable policy")
# Global combining: deny overrides
for r in applicable:
if r.decision == Decision.DENY:
return r
for r in applicable:
if r.decision == Decision.PERMIT:
return r
return PolicyResult(self.default, reason="No conclusive decision")
# Define policies
engine = PolicyEngine()
# Policy 1: Doctors can read patient records in their department
engine.add_policy(Policy(
policy_id="doctor-patient-records",
target=lambda r: (r.action == "read"
and r.resource.get("type") == "patient_record"
and r.subject.get("role") == "doctor"),
rules=[
lambda r: (Decision.PERMIT
if r.subject["department"] == r.resource["department"]
else Decision.DENY),
lambda r: (Decision.PERMIT
if 7 <= r.environment.get("hour", 12) <= 19
else Decision.DENY),
],
combining="deny_overrides",
))
# Policy 2: Employees can read their own records
engine.add_policy(Policy(
policy_id="own-records",
target=lambda r: (r.action == "read"
and r.resource.get("type") == "employee_record"),
rules=[
lambda r: (Decision.PERMIT
if r.subject["id"] == r.resource["owner_id"]
else Decision.DENY),
],
))
# Evaluate
request = AccessRequest(
subject={"id": "doc-42", "role": "doctor", "department": "cardiology"},
resource={"type": "patient_record", "department": "cardiology", "id": "rec-99"},
action="read",
environment={"hour": 14, "ip": "10.0.1.50"},
)
result = engine.evaluate(request)
print(result.decision) # Decision.PERMIT
Open Policy Agent (OPA) integration
For complex deployments, externalize policies to OPA — a dedicated policy engine with its own language (Rego):
# Rego policy (stored in OPA)
# policy.rego
"""
package myapp.authz
default allow = false
allow {
input.action == "read"
input.resource.type == "document"
input.subject.clearance >= input.resource.classification_level
is_business_hours
}
is_business_hours {
hour := time.clock(time.now_ns())[0]
hour >= 8
hour < 18
}
"""
# Python client
import requests
class OPAClient:
def __init__(self, opa_url: str = "http://localhost:8181"):
self.url = opa_url
def evaluate(self, request: AccessRequest) -> bool:
payload = {
"input": {
"subject": request.subject,
"resource": request.resource,
"action": request.action,
"environment": request.environment,
}
}
response = requests.post(
f"{self.url}/v1/data/myapp/authz/allow",
json=payload,
timeout=1,
)
result = response.json()
return result.get("result", False)
opa = OPAClient()
# In your application
def check_access(request: AccessRequest) -> bool:
try:
return opa.evaluate(request)
except requests.RequestException:
# Fail closed — deny access if OPA is unavailable
return False
OPA advantages: policies are version-controlled separately from code, can be updated without redeploying your app, and Rego is purpose-built for policy logic.
FastAPI ABAC middleware
from fastapi import FastAPI, Depends, HTTPException, Request
from datetime import datetime, timezone
app = FastAPI()
async def abac_check(
required_action: str,
resource_type: str,
resource_attrs: dict = None,
):
"""Create an ABAC dependency for FastAPI routes."""
async def checker(request: Request, user: User = Depends(get_current_user)):
access_request = AccessRequest(
subject={
"id": str(user.id),
"role": user.role,
"department": user.department,
"clearance": user.clearance_level,
},
resource={
"type": resource_type,
**(resource_attrs or {}),
},
action=required_action,
environment={
"hour": datetime.now(timezone.utc).hour,
"ip": request.client.host,
"day_of_week": datetime.now(timezone.utc).weekday(),
},
)
result = engine.evaluate(access_request)
if result.decision != Decision.PERMIT:
raise HTTPException(
status_code=403,
detail=f"Access denied: {result.reason}",
)
return user
return checker
# Route-level resource attributes
@app.get("/documents/{doc_id}")
async def get_document(
doc_id: int,
user: User = Depends(lambda: abac_check("read", "document")),
):
doc = await fetch_document(doc_id)
# For resource-specific attributes, do a second check with full context
access_request = AccessRequest(
subject={"id": str(user.id), "department": user.department,
"clearance": user.clearance_level},
resource={"type": "document", "department": doc.department,
"classification_level": doc.classification,
"owner_id": str(doc.owner_id)},
action="read",
environment={"hour": datetime.now(timezone.utc).hour},
)
result = engine.evaluate(access_request)
if result.decision != Decision.PERMIT:
raise HTTPException(status_code=403, detail="Access denied")
return doc
Django ABAC with middleware
# middleware.py
from django.http import HttpResponseForbidden
import json
class ABACMiddleware:
PROTECTED_PATTERNS = {
"/api/documents/": {"action": "read", "resource_type": "document"},
"/api/patients/": {"action": "read", "resource_type": "patient_record"},
}
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
if not request.user.is_authenticated:
return self.get_response(request)
for pattern, config in self.PROTECTED_PATTERNS.items():
if request.path.startswith(pattern):
access_request = AccessRequest(
subject={
"id": str(request.user.id),
"role": getattr(request.user, "role", "user"),
"department": getattr(request.user, "department", ""),
},
resource={"type": config["resource_type"]},
action=self._method_to_action(request.method),
environment={
"hour": datetime.now().hour,
"ip": request.META.get("REMOTE_ADDR", ""),
},
)
result = engine.evaluate(access_request)
if result.decision != Decision.PERMIT:
return HttpResponseForbidden(
json.dumps({"error": "Access denied", "reason": result.reason}),
content_type="application/json",
)
return self.get_response(request)
def _method_to_action(self, method: str) -> str:
return {"GET": "read", "POST": "create", "PUT": "update",
"PATCH": "update", "DELETE": "delete"}.get(method, "read")
Performance: caching policy decisions
ABAC evaluation can be expensive when policies involve database lookups or external calls:
import hashlib
import json
import redis
redis_client = redis.Redis()
class CachedPolicyEngine:
def __init__(self, engine: PolicyEngine, cache_ttl: int = 60):
self.engine = engine
self.ttl = cache_ttl
def evaluate(self, request: AccessRequest) -> PolicyResult:
# Create cache key from request attributes
key_data = json.dumps({
"s": request.subject,
"r": request.resource,
"a": request.action,
"e": {k: v for k, v in request.environment.items()
if k not in ("timestamp",)}, # exclude volatile attributes
}, sort_keys=True)
cache_key = f"abac:{hashlib.sha256(key_data.encode()).hexdigest()[:16]}"
cached = redis_client.get(cache_key)
if cached:
data = json.loads(cached)
return PolicyResult(Decision(data["decision"]), data.get("reason", ""))
result = self.engine.evaluate(request)
redis_client.setex(
cache_key,
self.ttl,
json.dumps({"decision": result.decision.value, "reason": result.reason}),
)
return result
Be careful with caching: volatile environment attributes (time, threat level) can make cache keys unique for every request. Exclude or bucket volatile attributes.
Testing ABAC policies
import pytest
from datetime import datetime
@pytest.fixture
def engine():
e = PolicyEngine()
# ... add policies
return e
@pytest.mark.parametrize("hour,expected", [
(9, Decision.PERMIT),
(14, Decision.PERMIT),
(22, Decision.DENY),
(3, Decision.DENY),
])
def test_business_hours_constraint(engine, hour, expected):
request = AccessRequest(
subject={"role": "doctor", "department": "cardiology"},
resource={"type": "patient_record", "department": "cardiology"},
action="read",
environment={"hour": hour},
)
assert engine.evaluate(request).decision == expected
def test_cross_department_denied(engine):
request = AccessRequest(
subject={"role": "doctor", "department": "neurology"},
resource={"type": "patient_record", "department": "cardiology"},
action="read",
environment={"hour": 10},
)
assert engine.evaluate(request).decision == Decision.DENY
def test_own_record_always_permitted(engine):
request = AccessRequest(
subject={"id": "emp-42", "role": "employee"},
resource={"type": "employee_record", "owner_id": "emp-42"},
action="read",
environment={"hour": 23}, # even outside business hours
)
assert engine.evaluate(request).decision == Decision.PERMIT
The one thing to remember: Production ABAC in Python requires a policy engine with combining algorithms, separation of policy decisions from enforcement, caching for performance, and thorough testing of attribute combinations — consider OPA for complex policy sets that need to evolve independently from application code.
See Also
- Python Api Key Management Why apps use special passwords called API keys, and how to keep them safe — explained with a library card analogy
- Python Audit Logging Learn Audit Logging with a clear mental model so your Python code is easier to trust and maintain.
- Python Bandit Security Scanning Why Bandit Security Scanning helps Python teams catch painful mistakes early without slowing daily development.
- Python Clickjacking Prevention How invisible website layers trick you into clicking the wrong thing, and how Python apps stop it
- Python Content Security Policy How websites create a guest list for scripts and styles to block hackers from sneaking in malicious code