Python Correlation IDs — Deep Dive

A correlation ID system seems trivial — generate UUID, attach to logs. In production, the complexity hides in async propagation, cross-process boundaries, and integration with existing observability stacks. This guide covers battle-tested patterns for Python services.

ContextVar mechanics

Python 3.7 introduced contextvars.ContextVar, designed specifically for per-task or per-request state in async applications. Each asyncio.Task gets its own context copy, so correlation IDs set in one request don’t leak into another.

from contextvars import ContextVar, copy_context

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

# Each task inherits the context at creation time
async def handler(request):
    correlation_id.set("abc-123")
    # Any function called from here sees "abc-123"
    await process(request)

async def process(request):
    cid = correlation_id.get()  # "abc-123" — no explicit passing

Pitfall: ThreadPoolExecutor

ContextVar propagation works automatically for asyncio.Task but not for concurrent.futures.ThreadPoolExecutor. If you offload work to a thread pool:

import asyncio
from contextvars import copy_context

async def handler(request):
    correlation_id.set("abc-123")
    ctx = copy_context()
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None, ctx.run, blocking_work
    )

copy_context() snapshots the current context, and ctx.run() executes the function within that snapshot. Without this, the thread sees the default empty value.

Production middleware implementations

FastAPI / Starlette

import uuid
from contextvars import ContextVar
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

CID_HEADER = "X-Correlation-ID"
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

class CorrelationMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next) -> Response:
        cid = request.headers.get(CID_HEADER) or str(uuid.uuid4())
        token = correlation_id.set(cid)
        try:
            response = await call_next(request)
            response.headers[CID_HEADER] = cid
            return response
        finally:
            correlation_id.reset(token)

The token = correlation_id.set() / correlation_id.reset(token) pattern ensures proper cleanup even if call_next raises.

Django

import uuid
from contextvars import ContextVar

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

class CorrelationMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        cid = request.META.get("HTTP_X_CORRELATION_ID") or str(uuid.uuid4())
        token = correlation_id.set(cid)
        try:
            response = self.get_response(request)
            response["X-Correlation-ID"] = cid
            return response
        finally:
            correlation_id.reset(token)

Django converts headers to META keys with HTTP_ prefix and underscores. The middleware pattern is synchronous but works with Django’s async views too because ContextVar is async-safe.

Flask

import uuid
from flask import Flask, g, request

app = Flask(__name__)

@app.before_request
def set_correlation_id():
    g.correlation_id = request.headers.get(
        "X-Correlation-ID", str(uuid.uuid4())
    )

@app.after_request
def add_correlation_header(response):
    response.headers["X-Correlation-ID"] = g.correlation_id
    return response

Flask uses g (application context) rather than ContextVar because its request model is thread-based. For Flask with gevent or async extensions, switch to ContextVar.

Structured logging integration

With structlog

import structlog
from contextvars import ContextVar

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

def add_correlation_id(logger, method_name, event_dict):
    cid = correlation_id.get("")
    if cid:
        event_dict["correlation_id"] = cid
    return event_dict

structlog.configure(
    processors=[
        add_correlation_id,
        structlog.processors.JSONRenderer()
    ]
)

With Loguru

from loguru import logger

def correlation_patcher(record):
    record["extra"]["correlation_id"] = correlation_id.get("")

logger = logger.patch(correlation_patcher)
logger.add(sys.stderr, format="{extra[correlation_id]} | {message}")

Cross-service propagation patterns

HTTP client wrapper

Rather than remembering to add headers in every HTTP call, create an instrumented client:

import httpx

class CorrelatedClient(httpx.AsyncClient):
    async def send(self, request, *args, **kwargs):
        cid = correlation_id.get("")
        if cid:
            request.headers.setdefault("X-Correlation-ID", cid)
        return await super().send(request, *args, **kwargs)

All HTTP calls through this client automatically propagate the correlation ID.

gRPC metadata

import grpc

class CorrelationInterceptor(grpc.UnaryUnaryClientInterceptor):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        metadata = list(client_call_details.metadata or [])
        cid = correlation_id.get("")
        if cid:
            metadata.append(("x-correlation-id", cid))
        new_details = grpc.ClientCallDetails(
            client_call_details.method,
            client_call_details.timeout,
            metadata,
            client_call_details.credentials,
            client_call_details.wait_for_ready,
            client_call_details.compression
        )
        return continuation(new_details, request)

Celery task propagation

from celery import Celery, signals
import uuid

app = Celery("myapp")

@signals.before_task_publish.connect
def inject_cid(headers, **kwargs):
    headers["correlation_id"] = correlation_id.get(str(uuid.uuid4()))

@signals.task_prerun.connect
def extract_cid(task, **kwargs):
    cid = getattr(task.request, "correlation_id", "") or str(uuid.uuid4())
    correlation_id.set(cid)

@signals.task_postrun.connect
def clear_cid(**kwargs):
    correlation_id.set("")

Bridging to OpenTelemetry

If you’re adopting OpenTelemetry incrementally, use the trace ID as your correlation ID:

from opentelemetry import trace

def get_correlation_id() -> str:
    """Return correlation ID, preferring OTel trace ID if available."""
    span = trace.get_current_span()
    ctx = span.get_span_context()
    if ctx.is_valid:
        return format(ctx.trace_id, "032x")
    return correlation_id.get("")

This approach lets you transition from custom correlation IDs to full distributed tracing without changing downstream consumers that search by correlation ID.

ID format considerations

FormatSizeCollision riskExample
UUID436 chars~1 in 2^122f47ac10b-58cc-4372-a567-0e02b2c3d479
ULID26 chars~1 in 2^8001ARZ3NDEKTSV4RRFFQ69G5FAV
nanoid (21)21 chars~1 in 2^126V1StGXR8_Z5jdHi6B-myT
OTel trace ID32 hex~1 in 2^1280af7651916cd43dd8448eb211c80319c

ULIDs are sortable by time, which helps when scanning logs chronologically. Nanoids are URL-safe and shorter. For new systems, ULID or OTel trace IDs are the best choices.

Testing correlation propagation

import pytest
from unittest.mock import patch
from contextvars import copy_context

def test_correlation_propagates_to_downstream():
    """Verify CID reaches the payment service call."""
    correlation_id.set("test-cid-123")

    with patch("httpx.AsyncClient.post") as mock_post:
        mock_post.return_value = httpx.Response(200, json={"ok": True})
        asyncio.run(call_payment_service("order-1", 99.99))

        headers = mock_post.call_args.kwargs.get("headers", {})
        assert headers["X-Correlation-ID"] == "test-cid-123"

Test each propagation boundary separately: HTTP client, gRPC interceptor, Celery signal, and message broker publisher.

Operational patterns

Log search workflow

# Find all logs for a failed request
grep "abc-123-def" /var/log/api/*.log /var/log/payments/*.log

# With structured logs in Elasticsearch
GET /logs-*/_search
{ "query": { "match": { "correlation_id": "abc-123-def" } } }

Correlation ID in error responses

Include the ID in API error responses so users can report it:

{
  "error": "Payment declined",
  "correlation_id": "abc-123-def",
  "message": "Please include this ID when contacting support"
}

This turns a “something went wrong” support ticket into a 30-second log lookup.

One thing to remember: The real cost of correlation IDs isn’t implementation — it’s discipline. Every service boundary (HTTP, gRPC, message queue, background job) must propagate the ID. Miss one link and the chain breaks. Automate propagation through client wrappers and framework middleware so individual developers never have to think about it.

pythonobservabilitydistributed-systemsarchitecture

See Also

  • Python Alerting Patterns Alerting is a smoke detector for your code — it wakes you up when something is burning, not when someone is cooking.
  • Python Grafana Dashboards Python Grafana turns boring numbers from your Python app into colorful, real-time dashboards — like a car's dashboard but for your code.
  • Python Log Aggregation Elk ELK collects scattered log files from all your services into one searchable place — like gathering every sticky note in the office into a single filing cabinet.
  • Python Logging Best Practices Treat logs like a flight recorder so you can understand failures after they happen, not just during development.
  • Python Logging Handlers Think of logging handlers as mailboxes that decide where your app's messages end up — screen, file, or faraway server.