Python Log Aggregation with ELK — Deep Dive

Running ELK at production scale for Python services requires careful attention to index design, field mapping, log processing pipelines, and cost management. This guide covers the engineering decisions that separate a useful log aggregation system from an expensive disk-eating monster.

Architecture for Python services

Small scale (< 100GB/day)

Python Apps → stdout (JSON)
           → Filebeat → Elasticsearch (3-node cluster)
           → Kibana

Direct Filebeat-to-Elasticsearch works well for smaller deployments. No Logstash needed.

Medium scale (100GB-1TB/day)

Python Apps → stdout (JSON)
           → Filebeat → Kafka → Logstash → Elasticsearch (dedicated hot/warm/cold nodes)
           → Kibana

Kafka decouples ingestion from processing. If Elasticsearch is slow or down, logs buffer in Kafka instead of being lost.

Large scale (> 1TB/day)

Python Apps → stdout (JSON)
           → Filebeat → Kafka → Logstash (fleet)
           → Elasticsearch (cross-cluster, with dedicated ingest nodes)
           → Kibana (behind load balancer)

At this scale, cost optimization becomes the primary engineering challenge.

Structured logging schema design

Define a consistent schema across services

LOG_SCHEMA = {
    # Required fields
    "timestamp": "ISO 8601",
    "level": "DEBUG|INFO|WARNING|ERROR|CRITICAL",
    "service": "service name",
    "message": "human-readable description",

    # Request context (when applicable)
    "correlation_id": "UUID",
    "request_id": "UUID",
    "method": "HTTP method",
    "path": "/api/endpoint",
    "status_code": 200,
    "duration_ms": 45.2,

    # User context
    "user_id": "anonymized ID",
    "tenant_id": "organization",

    # Error context (when applicable)
    "error_type": "ValueError",
    "error_message": "description",
    "stack_trace": "full traceback"
}

Python implementation with structlog

import structlog
import uuid
import time

def setup_logging(service_name: str):
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            add_service_info(service_name),
            structlog.processors.ExceptionRenderer(),
            structlog.processors.JSONRenderer()
        ]
    )

def add_service_info(service_name):
    def processor(logger, method_name, event_dict):
        event_dict["service"] = service_name
        event_dict["hostname"] = socket.gethostname()
        return event_dict
    return processor

Elasticsearch index templates

Explicit field mappings

Without explicit mappings, Elasticsearch guesses field types. This leads to problems:

  • "duration_ms": "45.2" (string in one log) vs "duration_ms": 45.2 (number in another) causes mapping conflicts.
  • correlation_id gets analyzed as full-text, wasting resources.
PUT _index_template/python-logs
{
  "index_patterns": ["python-logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 2,
      "number_of_replicas": 1,
      "index.lifecycle.name": "python-logs-policy",
      "index.lifecycle.rollover_alias": "python-logs"
    },
    "mappings": {
      "properties": {
        "timestamp": {"type": "date"},
        "level": {"type": "keyword"},
        "service": {"type": "keyword"},
        "message": {"type": "text"},
        "correlation_id": {"type": "keyword"},
        "request_id": {"type": "keyword"},
        "method": {"type": "keyword"},
        "path": {"type": "keyword"},
        "status_code": {"type": "short"},
        "duration_ms": {"type": "float"},
        "user_id": {"type": "keyword"},
        "tenant_id": {"type": "keyword"},
        "error_type": {"type": "keyword"},
        "error_message": {"type": "text"},
        "stack_trace": {
          "type": "text",
          "index": false
        }
      },
      "dynamic_templates": [
        {
          "strings_as_keywords": {
            "match_mapping_type": "string",
            "mapping": {"type": "keyword", "ignore_above": 256}
          }
        }
      ]
    }
  }
}

Key decisions:

  • keyword for fields you filter/aggregate by (level, service, path) — exact match only, much less storage.
  • text for fields you full-text search (message, error_message).
  • index: false for stack traces — stored but not searchable, saves significant disk.
  • dynamic_templates catch unexpected fields as keywords to prevent accidental text analysis.

Logstash processing pipeline

Python-specific enrichment

# logstash/pipeline/python-logs.conf
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["python-logs"]
    group_id => "logstash-python"
    codec => json
  }
}

filter {
  # Normalize level names
  mutate {
    uppercase => ["level"]
  }

  # Parse Python stack traces that span multiple lines
  if [stack_trace] {
    mutate {
      gsub => ["stack_trace", "\n", "\\n"]
    }
  }

  # Extract endpoint pattern from path
  # /users/12345/orders -> /users/:id/orders
  if [path] {
    ruby {
      code => '
        path = event.get("path")
        normalized = path.gsub(/\/\d+/, "/:id")
                        .gsub(/\/[0-9a-f]{8}-[0-9a-f]{4}/, "/:uuid")
        event.set("path_pattern", normalized)
      '
    }
  }

  # Geo-IP enrichment for client IPs
  if [client_ip] {
    geoip {
      source => "client_ip"
      target => "geo"
    }
  }

  # Drop verbose debug logs in production
  if [level] == "DEBUG" and [service] != "critical-service" {
    drop {}
  }

  # Redact sensitive fields
  if [email] {
    mutate {
      gsub => ["email", "(?<=.{3}).+(?=@)", "***"]
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "python-logs-%{+YYYY.MM.dd}"
    template_name => "python-logs"
  }
}

Ingest pipelines (Logstash alternative)

For simpler transformations, Elasticsearch’s built-in ingest pipelines avoid the Logstash deployment:

PUT _ingest/pipeline/python-log-pipeline
{
  "processors": [
    {
      "date": {
        "field": "timestamp",
        "formats": ["ISO8601"],
        "target_field": "@timestamp"
      }
    },
    {
      "uppercase": {
        "field": "level"
      }
    },
    {
      "remove": {
        "field": ["password", "token", "secret"],
        "ignore_missing": true
      }
    }
  ]
}

Reference in Filebeat config: output.elasticsearch.pipeline: "python-log-pipeline".

Index Lifecycle Management (ILM)

PUT _ilm/policy/python-logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_primary_shard_size": "50gb",
            "max_age": "1d"
          },
          "set_priority": {"priority": 100}
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": {"number_of_shards": 1},
          "forcemerge": {"max_num_segments": 1},
          "set_priority": {"priority": 50},
          "allocate": {
            "require": {"data": "warm"}
          }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "set_priority": {"priority": 0},
          "freeze": {},
          "allocate": {
            "require": {"data": "cold"}
          }
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

Cost impact of ILM

PhaseStorage cost (relative)Query speed
Hot (NVMe SSD)1.0xFast
Warm (SSD)0.5xMedium (force-merged)
Cold (HDD)0.15xSlow (frozen)
Delete0xN/A

For a 500GB/day pipeline, proper ILM reduces monthly storage costs by 60-70% compared to keeping everything on hot nodes.

Sending logs directly from Python

For serverless or short-lived Python scripts where Filebeat isn’t practical:

from elasticsearch import Elasticsearch, helpers
from datetime import datetime
import logging
import queue
import threading

class ElasticsearchHandler(logging.Handler):
    def __init__(self, hosts, index_prefix="python-logs"):
        super().__init__()
        self.es = Elasticsearch(hosts)
        self.index_prefix = index_prefix
        self.buffer = queue.Queue(maxsize=1000)
        self._start_flush_thread()

    def emit(self, record):
        doc = {
            "@timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": self.format(record),
            "logger": record.name,
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        if record.exc_info:
            doc["exception"] = self.formatException(record.exc_info)

        # Add extra fields
        for key, value in record.__dict__.items():
            if key not in logging.LogRecord("").__dict__ and key != "message":
                doc[key] = value

        try:
            self.buffer.put_nowait(doc)
        except queue.Full:
            pass  # drop rather than block

    def _start_flush_thread(self):
        def flush_loop():
            while True:
                docs = []
                while len(docs) < 100:
                    try:
                        docs.append(self.buffer.get(timeout=5))
                    except queue.Empty:
                        break
                if docs:
                    self._bulk_index(docs)

        thread = threading.Thread(target=flush_loop, daemon=True)
        thread.start()

    def _bulk_index(self, docs):
        index = f"{self.index_prefix}-{datetime.utcnow():%Y.%m.%d}"
        actions = [{"_index": index, "_source": doc} for doc in docs]
        try:
            helpers.bulk(self.es, actions, raise_on_error=False)
        except Exception:
            pass  # log shipping should never crash the app

Kibana saved searches and alerts

Saved search for Python exceptions

{
  "query": {
    "bool": {
      "must": [
        {"term": {"level": "ERROR"}},
        {"exists": {"field": "error_type"}}
      ],
      "must_not": [
        {"term": {"error_type": "NotFound"}}
      ]
    }
  },
  "sort": [{"@timestamp": "desc"}]
}

Kibana alerting rule

Create a Kibana alert that fires when Python error count exceeds a threshold:

  1. Rule type: Elasticsearch query
  2. Index: python-logs-*
  3. Query: level: "ERROR" AND service: "order-api"
  4. Threshold: Count > 50 in last 5 minutes
  5. Action: Send to Slack webhook

Performance tuning

Bulk indexing settings

PUT python-logs-*/_settings
{
  "index.refresh_interval": "30s",
  "index.translog.durability": "async",
  "index.translog.sync_interval": "30s"
}

Increasing refresh_interval from 1s (default) to 30s reduces I/O significantly. Logs are not real-time — a 30-second delay in searchability is acceptable.

Shard sizing

Target 20-50GB per shard. With 500GB/day:

  • 2 primary shards per daily index = 250GB each (too large)
  • 10 primary shards = 50GB each (ideal)

Over-sharding (too many small shards) wastes memory. Under-sharding slows searches and rebalancing.

One thing to remember: ELK’s value for Python services comes from three investments: structured logging on the application side, explicit field mappings on the Elasticsearch side, and ILM policies that control storage costs. Skip any one of these and the system becomes either useless or unsustainably expensive.

pythonobservabilityelasticsearcharchitecture

See Also

  • Python Alerting Patterns Alerting is a smoke detector for your code — it wakes you up when something is burning, not when someone is cooking.
  • Python Correlation Ids Correlation IDs are name tags for requests — they let you follow one visitor's journey through a crowded theme park of services.
  • Python Grafana Dashboards Python Grafana turns boring numbers from your Python app into colorful, real-time dashboards — like a car's dashboard but for your code.
  • Python Logging Best Practices Treat logs like a flight recorder so you can understand failures after they happen, not just during development.
  • Python Logging Handlers Think of logging handlers as mailboxes that decide where your app's messages end up — screen, file, or faraway server.