Structured Logging with structlog in Python — Deep Dive

Structured logging in production goes beyond replacing print() with JSON. The processor pipeline must handle exception serialization, sensitive data redaction, trace correlation, and performance at scale. structlog’s design makes all of this composable, but getting the details right requires understanding the execution model and common pitfalls.

Production configuration

A production-grade structlog setup typically looks like this:

import logging
import structlog

shared_processors = [
    structlog.contextvars.merge_contextvars,
    structlog.stdlib.filter_by_level,
    structlog.stdlib.add_logger_name,
    structlog.processors.add_log_level,
    structlog.processors.TimeStamper(fmt="iso", utc=True),
    structlog.processors.StackInfoRenderer(),
    structlog.processors.UnicodeDecoder(),
]

structlog.configure(
    processors=shared_processors + [
        structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

formatter = structlog.stdlib.ProcessorFormatter(
    processors=[
        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
        structlog.processors.JSONRenderer(),
    ]
)

handler = logging.StreamHandler()
handler.setFormatter(formatter)

root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)

This configuration routes both structlog and standard library loggers through the same processor chain, producing consistent JSON output.

Why ProcessorFormatter?

The two-stage approach (structlog processors + stdlib ProcessorFormatter) exists because third-party libraries use logging.getLogger(). ProcessorFormatter intercepts their log records and applies structlog processors, unifying output format without modifying library code.

Custom processors

Sensitive data redaction

Prevent credentials and PII from appearing in logs:

SENSITIVE_KEYS = {"password", "token", "secret", "authorization", "credit_card", "ssn"}

def redact_sensitive(logger, method_name, event_dict):
    for key in list(event_dict.keys()):
        if any(s in key.lower() for s in SENSITIVE_KEYS):
            event_dict[key] = "[REDACTED]"
    return event_dict

Place this processor early in the chain so downstream processors (including the renderer) never see raw values.

Request context enrichment

Add HTTP request metadata without passing it through every function:

def add_request_context(logger, method_name, event_dict):
    # Works with Flask, FastAPI, or any framework using contextvars
    from structlog.contextvars import get_contextvars
    ctx = get_contextvars()
    
    # Add correlation fields
    for key in ("request_id", "user_id", "tenant_id", "trace_id"):
        if key in ctx and key not in event_dict:
            event_dict[key] = ctx[key]
    
    return event_dict

Exception formatting

structlog’s default exception rendering includes the full traceback as a string. For JSON logs consumed by Elasticsearch, structure the exception:

import traceback

def format_exception(logger, method_name, event_dict):
    exc_info = event_dict.pop("exc_info", None)
    if exc_info:
        if isinstance(exc_info, BaseException):
            exc_info = (type(exc_info), exc_info, exc_info.__traceback__)
        elif exc_info is True:
            exc_info = sys.exc_info()
        
        if exc_info and exc_info[0] is not None:
            event_dict["exception"] = {
                "type": exc_info[0].__name__,
                "message": str(exc_info[1]),
                "traceback": traceback.format_exception(*exc_info),
            }
    return event_dict

This produces:

{
    "event": "payment_failed",
    "exception": {
        "type": "TimeoutError",
        "message": "Connection to payment gateway timed out",
        "traceback": ["Traceback (most recent call last):\n", "..."]
    }
}

Trace ID correlation

Integrate with OpenTelemetry to add trace and span IDs to every log entry:

from opentelemetry import trace

def add_trace_context(logger, method_name, event_dict):
    span = trace.get_current_span()
    if span and span.is_recording():
        ctx = span.get_span_context()
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict

With trace IDs in logs, Grafana can jump from a log line directly to the associated trace in Tempo or Jaeger.

Async and multiprocessing considerations

asyncio

structlog’s contextvars integration works natively with asyncio. Each task gets its own context:

async def handle_request(request):
    clear_contextvars()
    bind_contextvars(request_id=request.id)
    
    # All async calls within this handler share the bound context
    await validate(request)
    await process(request)
    # Logs from validate() and process() include request_id

Context does not leak between concurrent requests because contextvars are task-scoped in asyncio.

Multiprocessing

Each process gets its own structlog configuration. In Gunicorn with pre-fork workers, configure structlog in the post_fork hook or at module level (since each worker imports the module independently).

Avoid file-based log handlers in multi-process setups — use stdout and let the container runtime or systemd handle log collection.

Performance

structlog adds overhead per log call. Benchmarks on Python 3.12:

  • structlog with JSON rendering: ~15-25 μs per log call
  • Standard logging with basic formatter: ~5-8 μs per log call
  • The difference: ~10-17 μs per call

For a service logging 100 times per request at 1000 RPS, that is ~1-2ms total additional overhead per second. Negligible for most services.

Optimization techniques

  1. cache_logger_on_first_use=True: Avoids rebuilding the processor chain on every get_logger() call.

  2. Filter early: Place filter_by_level at the start of the chain so debug messages skip all processors when debug logging is disabled.

  3. Lazy values: For expensive computations, use callables:

log.info("query_result", row_count=lambda: len(expensive_query()))
# The lambda is only called if the log level passes filtering
  1. Avoid logging in tight loops: If you must, batch events or use sampling:
import random

if random.random() < 0.01:  # Log 1% of iterations
    log.debug("batch_item_processed", item_id=item.id)

Log aggregation patterns

Elasticsearch / OpenSearch

Ship JSON logs via Filebeat or Fluentd. The JSON structure maps directly to Elasticsearch fields without parsing:

# Filebeat config
- type: container
  paths: ["/var/log/containers/*.log"]
  json:
    keys_under_root: true
    add_error_key: true

Grafana Loki

Loki indexes labels, not full text. Structure your logs so that high-cardinality fields are in the JSON body (searched at query time) and low-cardinality fields are Loki labels:

# Loki labels: service, level, environment
# JSON body: request_id, user_id, trace_id, event details

CloudWatch / Datadog

Both natively parse JSON logs. structlog’s JSON output works without additional configuration.

Testing structured logs

Capture and assert on structured log output:

import structlog
from structlog.testing import capture_logs

def test_order_logging():
    with capture_logs() as cap:
        process_order(order_id=42)
    
    assert any(
        log["event"] == "order_processed" and log["order_id"] == 42
        for log in cap
    )

def test_error_includes_context():
    with capture_logs() as cap:
        with pytest.raises(ValueError):
            validate_order(invalid_data)
    
    error_logs = [log for log in cap if log["log_level"] == "error"]
    assert error_logs[0]["event"] == "validation_failed"
    assert "field" in error_logs[0]

capture_logs replaces the processor chain with an in-memory list, making assertions clean and deterministic.

Migration strategy

For existing codebases with thousands of logging.getLogger() calls:

  1. Phase 1: Configure structlog’s ProcessorFormatter to intercept stdlib logging. All existing logs become JSON with no code changes.
  2. Phase 2: Add context binding in middleware (request IDs, user IDs). All logs gain context automatically.
  3. Phase 3: Gradually replace logging.getLogger() with structlog.get_logger() in new code and during refactors.
  4. Phase 4: Add custom processors (redaction, trace correlation) to the pipeline.

This incremental approach means you get value from day one without a big-bang rewrite.

One thing to remember: A production structlog pipeline needs sensitive data redaction, trace ID correlation, proper exception formatting, and stdlib integration — the processor chain is where you encode your organization’s logging standards as executable code.

pythonstructlogstructured-logging

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.