Python API Monitoring and Observability — Deep Dive
Technical foundation
Observability in production Python APIs is an engineering discipline, not a checkbox. It requires deliberate instrumentation, careful metric design, correlated signals across pillars, and alert policies that match business impact rather than technical thresholds.
OpenTelemetry setup for FastAPI
OpenTelemetry provides a unified SDK for metrics, traces, and logs. Start with auto-instrumentation and add manual spans where needed:
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
def setup_telemetry(app):
# Tracing
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
)
trace.set_tracer_provider(tracer_provider)
# Metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://otel-collector:4317"),
export_interval_millis=15000,
)
metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))
# Auto-instrument
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument(engine=db_engine)
This automatically creates spans for every FastAPI request, outgoing HTTP call, and database query — with zero manual instrumentation.
Custom span instrumentation
Auto-instrumentation covers HTTP and database calls but misses business logic. Add manual spans for critical operations:
tracer = trace.get_tracer("order-service")
async def process_order(order_id: int, user_id: int):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
span.set_attribute("user.id", user_id)
with tracer.start_as_current_span("validate_inventory"):
available = await check_inventory(order_id)
span.set_attribute("inventory.available", available)
if not available:
span.set_status(trace.StatusCode.ERROR, "Out of stock")
raise OutOfStockError(order_id)
with tracer.start_as_current_span("charge_payment") as payment_span:
result = await payment_service.charge(user_id, order.total)
payment_span.set_attribute("payment.provider", result.provider)
payment_span.set_attribute("payment.transaction_id", result.transaction_id)
with tracer.start_as_current_span("send_confirmation"):
await email_service.send_order_confirmation(user_id, order_id)
Each span shows up as a nested block in your trace viewer (Jaeger, Grafana Tempo), making bottlenecks visible at a glance.
RED metrics with Prometheus
The RED method (Rate, Errors, Duration) is the gold standard for API metrics:
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
import time
REQUEST_COUNT = Counter(
"http_requests_total", "Total requests", ["method", "path", "status"]
)
REQUEST_DURATION = Histogram(
"http_request_duration_seconds", "Request duration",
["method", "path"],
buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
REQUESTS_IN_PROGRESS = Gauge(
"http_requests_in_progress", "Requests currently being processed",
["method", "path"],
)
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
method = request.method
path = self._normalize_path(request.url.path)
REQUESTS_IN_PROGRESS.labels(method=method, path=path).inc()
start = time.perf_counter()
try:
response = await call_next(request)
REQUEST_COUNT.labels(method=method, path=path, status=response.status_code).inc()
return response
except Exception as e:
REQUEST_COUNT.labels(method=method, path=path, status=500).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_DURATION.labels(method=method, path=path).observe(duration)
REQUESTS_IN_PROGRESS.labels(method=method, path=path).dec()
def _normalize_path(self, path: str) -> str:
# Prevent high-cardinality labels by normalizing IDs
import re
return re.sub(r"/\d+", "/{id}", path)
@app.get("/metrics")
async def metrics_endpoint():
return Response(content=generate_latest(), media_type="text/plain")
The _normalize_path method is critical. Without it, paths like /users/1, /users/2, etc. create unbounded label cardinality that crashes Prometheus.
Structured logging with correlation
Connect logs to traces by injecting trace IDs:
import structlog
from opentelemetry import trace
def add_trace_context(logger, method_name, event_dict):
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
add_trace_context,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
# Usage in handlers
async def create_order(request: CreateOrderRequest):
logger.info("order_creation_started", user_id=request.user_id, items=len(request.items))
order = await process_order(request)
logger.info("order_created", order_id=order.id, total_cents=order.total_cents)
return order
Now every log entry includes trace_id and span_id. In Grafana, click a log entry to jump directly to its trace, or click a slow span to see related logs.
Custom business metrics
Beyond RED, track metrics that reflect business health:
from prometheus_client import Counter, Histogram, Gauge
orders_created = Counter("orders_created_total", "Orders created", ["payment_method"])
order_value = Histogram(
"order_value_cents", "Order value distribution",
buckets=[1000, 5000, 10000, 50000, 100000, 500000],
)
active_users = Gauge("active_users_current", "Currently active users")
payment_failures = Counter("payment_failures_total", "Failed payments", ["reason"])
async def create_order(order: Order):
orders_created.labels(payment_method=order.payment_method).inc()
order_value.observe(order.total_cents)
try:
await charge_payment(order)
except PaymentError as e:
payment_failures.labels(reason=e.reason).inc()
raise
These metrics power business dashboards: revenue per hour, popular payment methods, failure trends.
SLO-based alerting
Instead of alerting on arbitrary thresholds, define Service Level Objectives (SLOs):
- Availability SLO: 99.9% of requests return non-5xx responses (measured over 30 days)
- Latency SLO: 95% of requests complete within 500ms (measured over 30 days)
Calculate error budget burn rate to alert only when the SLO is at risk:
# Prometheus alerting rule
groups:
- name: slo-alerts
rules:
- alert: HighErrorBurnRate
expr: |
(
sum(rate(http_requests_total{status=~"5.."}[5m]))
/
sum(rate(http_requests_total[5m]))
) > 14.4 * (1 - 0.999)
for: 2m
labels:
severity: critical
annotations:
summary: "Error budget burning 14.4x faster than sustainable"
A 14.4x burn rate means the monthly error budget will be exhausted in 2 hours. This approach eliminates alerts for brief, harmless spikes while catching sustained degradation.
Health check endpoints
Provide machine-readable health information:
@app.get("/health/live")
async def liveness():
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
checks = {}
try:
await db.execute(text("SELECT 1"))
checks["database"] = "ok"
except Exception:
checks["database"] = "failed"
try:
await redis.ping()
checks["redis"] = "ok"
except Exception:
checks["redis"] = "failed"
all_ok = all(v == "ok" for v in checks.values())
return JSONResponse(
status_code=200 if all_ok else 503,
content={"status": "ready" if all_ok else "degraded", "checks": checks},
)
Kubernetes uses liveness probes to restart stuck containers and readiness probes to stop routing traffic to unhealthy instances.
Sampling strategies for high-traffic APIs
At thousands of requests per second, tracing every request is expensive. Use sampling:
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
# Sample 10% of requests, but always trace if parent was sampled
sampler = ParentBased(root=TraceIdRatioBased(0.1))
tracer_provider = TracerProvider(sampler=sampler)
For errors, override sampling to capture 100%:
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
class ErrorAwareSampler:
def should_sample(self, parent_context, trace_id, name, kind, attributes, links):
# Always sample errors
if attributes and attributes.get("http.status_code", 200) >= 500:
return ALWAYS_ON.should_sample(parent_context, trace_id, name, kind, attributes, links)
# Sample 10% of normal traffic
return TraceIdRatioBased(0.1).should_sample(parent_context, trace_id, name, kind, attributes, links)
Dashboards that tell stories
Organize Grafana dashboards in layers:
- Overview: The four golden signals for the entire API. This is the first place anyone looks during an incident.
- Per-endpoint: Drill down into specific endpoints showing rate, errors, latency, and top error types.
- Dependencies: Database query latency, Redis hit rates, external API response times.
- Business: Order rates, user signups, payment success rates.
Each dashboard should answer “is this thing healthy?” within 5 seconds of looking at it.
The one thing to remember: Wire OpenTelemetry auto-instrumentation for baseline visibility, add manual spans for business operations, connect logs to traces via trace IDs, set SLO-based alerts that reflect user impact, and sample intelligently to control costs at scale.
See Also
- Python Api Authentication Comparison API keys, JWTs, OAuth, and sessions — four ways Python APIs verify who is knocking at the door.
- Python Api Caching Layers Why Python APIs remember answers to common questions — like a teacher who writes frequent answers on the whiteboard.
- Python Api Error Handling Standards Why good error messages from your Python API are like clear road signs — they tell callers exactly what went wrong and what to do next.
- Python Api Load Testing Testing how many people your Python API can handle at once — like stress-testing a bridge before opening it to traffic.
- Python Request Validation Patterns How Python APIs check incoming data before trusting it — like a bouncer checking IDs at the door.