Python API Monitoring and Observability — Deep Dive

Technical foundation

Observability in production Python APIs is an engineering discipline, not a checkbox. It requires deliberate instrumentation, careful metric design, correlated signals across pillars, and alert policies that match business impact rather than technical thresholds.

OpenTelemetry setup for FastAPI

OpenTelemetry provides a unified SDK for metrics, traces, and logs. Start with auto-instrumentation and add manual spans where needed:

from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

def setup_telemetry(app):
    # Tracing
    tracer_provider = TracerProvider()
    tracer_provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
    )
    trace.set_tracer_provider(tracer_provider)
    
    # Metrics
    metric_reader = PeriodicExportingMetricReader(
        OTLPMetricExporter(endpoint="http://otel-collector:4317"),
        export_interval_millis=15000,
    )
    metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))
    
    # Auto-instrument
    FastAPIInstrumentor.instrument_app(app)
    HTTPXClientInstrumentor().instrument()
    SQLAlchemyInstrumentor().instrument(engine=db_engine)

This automatically creates spans for every FastAPI request, outgoing HTTP call, and database query — with zero manual instrumentation.

Custom span instrumentation

Auto-instrumentation covers HTTP and database calls but misses business logic. Add manual spans for critical operations:

tracer = trace.get_tracer("order-service")

async def process_order(order_id: int, user_id: int):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)
        span.set_attribute("user.id", user_id)
        
        with tracer.start_as_current_span("validate_inventory"):
            available = await check_inventory(order_id)
            span.set_attribute("inventory.available", available)
            if not available:
                span.set_status(trace.StatusCode.ERROR, "Out of stock")
                raise OutOfStockError(order_id)
        
        with tracer.start_as_current_span("charge_payment") as payment_span:
            result = await payment_service.charge(user_id, order.total)
            payment_span.set_attribute("payment.provider", result.provider)
            payment_span.set_attribute("payment.transaction_id", result.transaction_id)
        
        with tracer.start_as_current_span("send_confirmation"):
            await email_service.send_order_confirmation(user_id, order_id)

Each span shows up as a nested block in your trace viewer (Jaeger, Grafana Tempo), making bottlenecks visible at a glance.

RED metrics with Prometheus

The RED method (Rate, Errors, Duration) is the gold standard for API metrics:

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
import time

REQUEST_COUNT = Counter(
    "http_requests_total", "Total requests", ["method", "path", "status"]
)
REQUEST_DURATION = Histogram(
    "http_request_duration_seconds", "Request duration",
    ["method", "path"],
    buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
REQUESTS_IN_PROGRESS = Gauge(
    "http_requests_in_progress", "Requests currently being processed",
    ["method", "path"],
)

class MetricsMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        method = request.method
        path = self._normalize_path(request.url.path)
        
        REQUESTS_IN_PROGRESS.labels(method=method, path=path).inc()
        start = time.perf_counter()
        
        try:
            response = await call_next(request)
            REQUEST_COUNT.labels(method=method, path=path, status=response.status_code).inc()
            return response
        except Exception as e:
            REQUEST_COUNT.labels(method=method, path=path, status=500).inc()
            raise
        finally:
            duration = time.perf_counter() - start
            REQUEST_DURATION.labels(method=method, path=path).observe(duration)
            REQUESTS_IN_PROGRESS.labels(method=method, path=path).dec()
    
    def _normalize_path(self, path: str) -> str:
        # Prevent high-cardinality labels by normalizing IDs
        import re
        return re.sub(r"/\d+", "/{id}", path)

@app.get("/metrics")
async def metrics_endpoint():
    return Response(content=generate_latest(), media_type="text/plain")

The _normalize_path method is critical. Without it, paths like /users/1, /users/2, etc. create unbounded label cardinality that crashes Prometheus.

Structured logging with correlation

Connect logs to traces by injecting trace IDs:

import structlog
from opentelemetry import trace

def add_trace_context(logger, method_name, event_dict):
    span = trace.get_current_span()
    if span.is_recording():
        ctx = span.get_span_context()
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict

structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        add_trace_context,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)

logger = structlog.get_logger()

# Usage in handlers
async def create_order(request: CreateOrderRequest):
    logger.info("order_creation_started", user_id=request.user_id, items=len(request.items))
    order = await process_order(request)
    logger.info("order_created", order_id=order.id, total_cents=order.total_cents)
    return order

Now every log entry includes trace_id and span_id. In Grafana, click a log entry to jump directly to its trace, or click a slow span to see related logs.

Custom business metrics

Beyond RED, track metrics that reflect business health:

from prometheus_client import Counter, Histogram, Gauge

orders_created = Counter("orders_created_total", "Orders created", ["payment_method"])
order_value = Histogram(
    "order_value_cents", "Order value distribution",
    buckets=[1000, 5000, 10000, 50000, 100000, 500000],
)
active_users = Gauge("active_users_current", "Currently active users")
payment_failures = Counter("payment_failures_total", "Failed payments", ["reason"])

async def create_order(order: Order):
    orders_created.labels(payment_method=order.payment_method).inc()
    order_value.observe(order.total_cents)
    try:
        await charge_payment(order)
    except PaymentError as e:
        payment_failures.labels(reason=e.reason).inc()
        raise

These metrics power business dashboards: revenue per hour, popular payment methods, failure trends.

SLO-based alerting

Instead of alerting on arbitrary thresholds, define Service Level Objectives (SLOs):

  • Availability SLO: 99.9% of requests return non-5xx responses (measured over 30 days)
  • Latency SLO: 95% of requests complete within 500ms (measured over 30 days)

Calculate error budget burn rate to alert only when the SLO is at risk:

# Prometheus alerting rule
groups:
  - name: slo-alerts
    rules:
      - alert: HighErrorBurnRate
        expr: |
          (
            sum(rate(http_requests_total{status=~"5.."}[5m]))
            /
            sum(rate(http_requests_total[5m]))
          ) > 14.4 * (1 - 0.999)
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Error budget burning 14.4x faster than sustainable"

A 14.4x burn rate means the monthly error budget will be exhausted in 2 hours. This approach eliminates alerts for brief, harmless spikes while catching sustained degradation.

Health check endpoints

Provide machine-readable health information:

@app.get("/health/live")
async def liveness():
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    checks = {}
    
    try:
        await db.execute(text("SELECT 1"))
        checks["database"] = "ok"
    except Exception:
        checks["database"] = "failed"
    
    try:
        await redis.ping()
        checks["redis"] = "ok"
    except Exception:
        checks["redis"] = "failed"
    
    all_ok = all(v == "ok" for v in checks.values())
    return JSONResponse(
        status_code=200 if all_ok else 503,
        content={"status": "ready" if all_ok else "degraded", "checks": checks},
    )

Kubernetes uses liveness probes to restart stuck containers and readiness probes to stop routing traffic to unhealthy instances.

Sampling strategies for high-traffic APIs

At thousands of requests per second, tracing every request is expensive. Use sampling:

from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

# Sample 10% of requests, but always trace if parent was sampled
sampler = ParentBased(root=TraceIdRatioBased(0.1))
tracer_provider = TracerProvider(sampler=sampler)

For errors, override sampling to capture 100%:

from opentelemetry.sdk.trace.sampling import ALWAYS_ON

class ErrorAwareSampler:
    def should_sample(self, parent_context, trace_id, name, kind, attributes, links):
        # Always sample errors
        if attributes and attributes.get("http.status_code", 200) >= 500:
            return ALWAYS_ON.should_sample(parent_context, trace_id, name, kind, attributes, links)
        # Sample 10% of normal traffic
        return TraceIdRatioBased(0.1).should_sample(parent_context, trace_id, name, kind, attributes, links)

Dashboards that tell stories

Organize Grafana dashboards in layers:

  1. Overview: The four golden signals for the entire API. This is the first place anyone looks during an incident.
  2. Per-endpoint: Drill down into specific endpoints showing rate, errors, latency, and top error types.
  3. Dependencies: Database query latency, Redis hit rates, external API response times.
  4. Business: Order rates, user signups, payment success rates.

Each dashboard should answer “is this thing healthy?” within 5 seconds of looking at it.

The one thing to remember: Wire OpenTelemetry auto-instrumentation for baseline visibility, add manual spans for business operations, connect logs to traces via trace IDs, set SLO-based alerts that reflect user impact, and sample intelligently to control costs at scale.

pythonapimonitoringobservabilityopentelemetryprometheus

See Also