Python Log Aggregation with ELK — Deep Dive
Running ELK at production scale for Python services requires careful attention to index design, field mapping, log processing pipelines, and cost management. This guide covers the engineering decisions that separate a useful log aggregation system from an expensive disk-eating monster.
Architecture for Python services
Small scale (< 100GB/day)
Python Apps → stdout (JSON)
→ Filebeat → Elasticsearch (3-node cluster)
→ Kibana
Direct Filebeat-to-Elasticsearch works well for smaller deployments. No Logstash needed.
Medium scale (100GB-1TB/day)
Python Apps → stdout (JSON)
→ Filebeat → Kafka → Logstash → Elasticsearch (dedicated hot/warm/cold nodes)
→ Kibana
Kafka decouples ingestion from processing. If Elasticsearch is slow or down, logs buffer in Kafka instead of being lost.
Large scale (> 1TB/day)
Python Apps → stdout (JSON)
→ Filebeat → Kafka → Logstash (fleet)
→ Elasticsearch (cross-cluster, with dedicated ingest nodes)
→ Kibana (behind load balancer)
At this scale, cost optimization becomes the primary engineering challenge.
Structured logging schema design
Define a consistent schema across services
LOG_SCHEMA = {
# Required fields
"timestamp": "ISO 8601",
"level": "DEBUG|INFO|WARNING|ERROR|CRITICAL",
"service": "service name",
"message": "human-readable description",
# Request context (when applicable)
"correlation_id": "UUID",
"request_id": "UUID",
"method": "HTTP method",
"path": "/api/endpoint",
"status_code": 200,
"duration_ms": 45.2,
# User context
"user_id": "anonymized ID",
"tenant_id": "organization",
# Error context (when applicable)
"error_type": "ValueError",
"error_message": "description",
"stack_trace": "full traceback"
}
Python implementation with structlog
import structlog
import uuid
import time
def setup_logging(service_name: str):
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
add_service_info(service_name),
structlog.processors.ExceptionRenderer(),
structlog.processors.JSONRenderer()
]
)
def add_service_info(service_name):
def processor(logger, method_name, event_dict):
event_dict["service"] = service_name
event_dict["hostname"] = socket.gethostname()
return event_dict
return processor
Elasticsearch index templates
Explicit field mappings
Without explicit mappings, Elasticsearch guesses field types. This leads to problems:
"duration_ms": "45.2"(string in one log) vs"duration_ms": 45.2(number in another) causes mapping conflicts.correlation_idgets analyzed as full-text, wasting resources.
PUT _index_template/python-logs
{
"index_patterns": ["python-logs-*"],
"template": {
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1,
"index.lifecycle.name": "python-logs-policy",
"index.lifecycle.rollover_alias": "python-logs"
},
"mappings": {
"properties": {
"timestamp": {"type": "date"},
"level": {"type": "keyword"},
"service": {"type": "keyword"},
"message": {"type": "text"},
"correlation_id": {"type": "keyword"},
"request_id": {"type": "keyword"},
"method": {"type": "keyword"},
"path": {"type": "keyword"},
"status_code": {"type": "short"},
"duration_ms": {"type": "float"},
"user_id": {"type": "keyword"},
"tenant_id": {"type": "keyword"},
"error_type": {"type": "keyword"},
"error_message": {"type": "text"},
"stack_trace": {
"type": "text",
"index": false
}
},
"dynamic_templates": [
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {"type": "keyword", "ignore_above": 256}
}
}
]
}
}
}
Key decisions:
- keyword for fields you filter/aggregate by (level, service, path) — exact match only, much less storage.
- text for fields you full-text search (message, error_message).
index: falsefor stack traces — stored but not searchable, saves significant disk.- dynamic_templates catch unexpected fields as keywords to prevent accidental text analysis.
Logstash processing pipeline
Python-specific enrichment
# logstash/pipeline/python-logs.conf
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["python-logs"]
group_id => "logstash-python"
codec => json
}
}
filter {
# Normalize level names
mutate {
uppercase => ["level"]
}
# Parse Python stack traces that span multiple lines
if [stack_trace] {
mutate {
gsub => ["stack_trace", "\n", "\\n"]
}
}
# Extract endpoint pattern from path
# /users/12345/orders -> /users/:id/orders
if [path] {
ruby {
code => '
path = event.get("path")
normalized = path.gsub(/\/\d+/, "/:id")
.gsub(/\/[0-9a-f]{8}-[0-9a-f]{4}/, "/:uuid")
event.set("path_pattern", normalized)
'
}
}
# Geo-IP enrichment for client IPs
if [client_ip] {
geoip {
source => "client_ip"
target => "geo"
}
}
# Drop verbose debug logs in production
if [level] == "DEBUG" and [service] != "critical-service" {
drop {}
}
# Redact sensitive fields
if [email] {
mutate {
gsub => ["email", "(?<=.{3}).+(?=@)", "***"]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "python-logs-%{+YYYY.MM.dd}"
template_name => "python-logs"
}
}
Ingest pipelines (Logstash alternative)
For simpler transformations, Elasticsearch’s built-in ingest pipelines avoid the Logstash deployment:
PUT _ingest/pipeline/python-log-pipeline
{
"processors": [
{
"date": {
"field": "timestamp",
"formats": ["ISO8601"],
"target_field": "@timestamp"
}
},
{
"uppercase": {
"field": "level"
}
},
{
"remove": {
"field": ["password", "token", "secret"],
"ignore_missing": true
}
}
]
}
Reference in Filebeat config: output.elasticsearch.pipeline: "python-log-pipeline".
Index Lifecycle Management (ILM)
PUT _ilm/policy/python-logs-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": "50gb",
"max_age": "1d"
},
"set_priority": {"priority": 100}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {"number_of_shards": 1},
"forcemerge": {"max_num_segments": 1},
"set_priority": {"priority": 50},
"allocate": {
"require": {"data": "warm"}
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"set_priority": {"priority": 0},
"freeze": {},
"allocate": {
"require": {"data": "cold"}
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
Cost impact of ILM
| Phase | Storage cost (relative) | Query speed |
|---|---|---|
| Hot (NVMe SSD) | 1.0x | Fast |
| Warm (SSD) | 0.5x | Medium (force-merged) |
| Cold (HDD) | 0.15x | Slow (frozen) |
| Delete | 0x | N/A |
For a 500GB/day pipeline, proper ILM reduces monthly storage costs by 60-70% compared to keeping everything on hot nodes.
Sending logs directly from Python
For serverless or short-lived Python scripts where Filebeat isn’t practical:
from elasticsearch import Elasticsearch, helpers
from datetime import datetime
import logging
import queue
import threading
class ElasticsearchHandler(logging.Handler):
def __init__(self, hosts, index_prefix="python-logs"):
super().__init__()
self.es = Elasticsearch(hosts)
self.index_prefix = index_prefix
self.buffer = queue.Queue(maxsize=1000)
self._start_flush_thread()
def emit(self, record):
doc = {
"@timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": self.format(record),
"logger": record.name,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
doc["exception"] = self.formatException(record.exc_info)
# Add extra fields
for key, value in record.__dict__.items():
if key not in logging.LogRecord("").__dict__ and key != "message":
doc[key] = value
try:
self.buffer.put_nowait(doc)
except queue.Full:
pass # drop rather than block
def _start_flush_thread(self):
def flush_loop():
while True:
docs = []
while len(docs) < 100:
try:
docs.append(self.buffer.get(timeout=5))
except queue.Empty:
break
if docs:
self._bulk_index(docs)
thread = threading.Thread(target=flush_loop, daemon=True)
thread.start()
def _bulk_index(self, docs):
index = f"{self.index_prefix}-{datetime.utcnow():%Y.%m.%d}"
actions = [{"_index": index, "_source": doc} for doc in docs]
try:
helpers.bulk(self.es, actions, raise_on_error=False)
except Exception:
pass # log shipping should never crash the app
Kibana saved searches and alerts
Saved search for Python exceptions
{
"query": {
"bool": {
"must": [
{"term": {"level": "ERROR"}},
{"exists": {"field": "error_type"}}
],
"must_not": [
{"term": {"error_type": "NotFound"}}
]
}
},
"sort": [{"@timestamp": "desc"}]
}
Kibana alerting rule
Create a Kibana alert that fires when Python error count exceeds a threshold:
- Rule type: Elasticsearch query
- Index:
python-logs-* - Query:
level: "ERROR" AND service: "order-api" - Threshold: Count > 50 in last 5 minutes
- Action: Send to Slack webhook
Performance tuning
Bulk indexing settings
PUT python-logs-*/_settings
{
"index.refresh_interval": "30s",
"index.translog.durability": "async",
"index.translog.sync_interval": "30s"
}
Increasing refresh_interval from 1s (default) to 30s reduces I/O significantly. Logs are not real-time — a 30-second delay in searchability is acceptable.
Shard sizing
Target 20-50GB per shard. With 500GB/day:
- 2 primary shards per daily index = 250GB each (too large)
- 10 primary shards = 50GB each (ideal)
Over-sharding (too many small shards) wastes memory. Under-sharding slows searches and rebalancing.
One thing to remember: ELK’s value for Python services comes from three investments: structured logging on the application side, explicit field mappings on the Elasticsearch side, and ILM policies that control storage costs. Skip any one of these and the system becomes either useless or unsustainably expensive.
See Also
- Python Alerting Patterns Alerting is a smoke detector for your code — it wakes you up when something is burning, not when someone is cooking.
- Python Correlation Ids Correlation IDs are name tags for requests — they let you follow one visitor's journey through a crowded theme park of services.
- Python Grafana Dashboards Python Grafana turns boring numbers from your Python app into colorful, real-time dashboards — like a car's dashboard but for your code.
- Python Logging Best Practices Treat logs like a flight recorder so you can understand failures after they happen, not just during development.
- Python Logging Handlers Think of logging handlers as mailboxes that decide where your app's messages end up — screen, file, or faraway server.