Python Log Rotation Management — Deep Dive

The multi-process rotation problem

Python’s RotatingFileHandler is not safe for multi-process applications. When two processes write to the same log file and one triggers rotation, the rename operation can cause the other process to lose its file handle or write to the wrong file.

Solutions for multi-process logging

Option 1: WatchedFileHandler + external logrotate

The safest approach for multi-process Python apps. Each process uses WatchedFileHandler, which checks on every log write whether the file has been renamed or deleted:

import logging
from logging.handlers import WatchedFileHandler
import os

def setup_process_safe_logging(log_path: str) -> logging.Logger:
    logger = logging.getLogger(f'worker-{os.getpid()}')
    handler = WatchedFileHandler(log_path)
    handler.setFormatter(logging.Formatter(
        '%(asctime)s [PID %(process)d] %(levelname)s %(name)s: %(message)s'
    ))
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    return logger

When logrotate renames the file, WatchedFileHandler detects the inode change on the next write and reopens the file. The brief window between rename and reopen can cause a few log lines to be lost, but in practice this is acceptable for most applications.

Option 2: QueueHandler with a dedicated logging process

For applications that cannot tolerate any log loss:

import logging
import logging.handlers
from multiprocessing import Queue, Process

def logging_listener(queue: Queue, log_path: str):
    """Dedicated process that handles all log writes and rotation."""
    handler = logging.handlers.RotatingFileHandler(
        log_path, maxBytes=50 * 1024 * 1024, backupCount=10
    )
    handler.setFormatter(logging.Formatter(
        '%(asctime)s %(levelname)s %(name)s: %(message)s'
    ))

    while True:
        record = queue.get()
        if record is None:
            break
        handler.emit(record)

def setup_queue_logging(queue: Queue) -> logging.Logger:
    """Set up a worker process to send logs through the queue."""
    logger = logging.getLogger(f'worker-{os.getpid()}')
    handler = logging.handlers.QueueHandler(queue)
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    return logger

# Main process
log_queue = Queue()
listener = Process(target=logging_listener, args=(log_queue, '/var/log/myapp/app.log'))
listener.start()

# Worker processes use setup_queue_logging(log_queue)

Option 3: concurrent-log-handler

A third-party library that uses file locking for process-safe rotation:

from concurrent_log_handler import ConcurrentRotatingFileHandler

handler = ConcurrentRotatingFileHandler(
    'app.log',
    maxBytes=10 * 1024 * 1024,
    backupCount=5,
    use_gzip=True,  # Compress rotated files
)

This uses portalocker for cross-platform file locking, adding slight overhead per log write but guaranteeing correctness.

Building a compressing rotation handler

Python’s built-in handlers do not compress rotated files. Here is a custom handler that gzips old logs:

import gzip
import os
import shutil
import logging
from logging.handlers import RotatingFileHandler

class CompressingRotatingFileHandler(RotatingFileHandler):
    """RotatingFileHandler that gzips rotated files."""

    def rotation_filename(self, default_name: str) -> str:
        return default_name + '.gz'

    def rotate(self, source: str, dest: str):
        if os.path.exists(source):
            with open(source, 'rb') as f_in:
                with gzip.open(dest, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            os.remove(source)

# Usage
handler = CompressingRotatingFileHandler(
    'app.log',
    maxBytes=50 * 1024 * 1024,
    backupCount=10,
)

This produces files like app.log.1.gz, app.log.2.gz, etc. The compression happens synchronously during rotation, so there is a brief latency spike. For very large files, consider deferring compression to a background thread.

Async compression for high-throughput applications

import threading

class AsyncCompressingHandler(RotatingFileHandler):
    """Compress rotated files in a background thread."""

    def rotate(self, source: str, dest: str):
        # First, do the standard rename
        if os.path.exists(source):
            temp_dest = dest + '.tmp'
            os.rename(source, temp_dest)

            # Compress in background
            def compress():
                with open(temp_dest, 'rb') as f_in:
                    with gzip.open(dest, 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
                os.remove(temp_dest)

            threading.Thread(target=compress, daemon=True).start()

Time-based rotation with size limits

A combined approach that rotates daily but also triggers rotation if a file grows too large mid-day:

import time
from logging.handlers import BaseRotatingHandler

class SizeAndTimeRotatingHandler(BaseRotatingHandler):
    """Rotate by time (daily) and by size (whichever comes first)."""

    def __init__(self, filename, max_bytes=50*1024*1024, backup_count=30):
        super().__init__(filename, mode='a')
        self.max_bytes = max_bytes
        self.backup_count = backup_count
        self.rotation_date = self._current_date()

    def _current_date(self):
        return time.strftime('%Y-%m-%d', time.gmtime())

    def _rotation_filename(self):
        date = self._current_date()
        base = self.baseFilename
        # Find next available sequence number for today
        seq = 1
        while os.path.exists(f"{base}.{date}.{seq}.gz"):
            seq += 1
        return f"{base}.{date}.{seq}"

    def shouldRollover(self, record):
        today = self._current_date()
        if today != self.rotation_date:
            return True
        if self.max_bytes > 0:
            self.stream.seek(0, 2)
            if self.stream.tell() + len(self.format(record)) >= self.max_bytes:
                return True
        return False

    def doRollover(self):
        self.stream.close()
        dest = self._rotation_filename()
        os.rename(self.baseFilename, dest)

        # Compress in background
        threading.Thread(
            target=self._compress, args=(dest,), daemon=True
        ).start()

        # Clean up old files
        self._cleanup()

        self.rotation_date = self._current_date()
        self.stream = self._open()

    def _compress(self, filepath):
        gz_path = filepath + '.gz'
        with open(filepath, 'rb') as f_in:
            with gzip.open(gz_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        os.remove(filepath)

    def _cleanup(self):
        import glob
        pattern = self.baseFilename + '.*'
        files = sorted(glob.glob(pattern), reverse=True)
        for old_file in files[self.backup_count:]:
            os.remove(old_file)

Logrotate integration patterns

Signal-based reopening

Instead of copytruncate (which can lose data during the copy), use a signal handler:

import signal
import logging

_file_handlers: list[logging.FileHandler] = []

def reopen_logs(signum, frame):
    """Reopen all file handlers — called by logrotate via postrotate."""
    for handler in _file_handlers:
        handler.close()
        handler.stream = handler._open()

signal.signal(signal.SIGUSR1, reopen_logs)

Logrotate config:

/var/log/myapp/*.log {
    daily
    rotate 14
    compress
    delaycompress
    missingok
    notifempty
    postrotate
        kill -USR1 $(cat /var/run/myapp.pid) 2>/dev/null || true
    endscript
}

Generating logrotate configs from Python

For applications that manage their own deployment:

def generate_logrotate_config(
    log_paths: list[str],
    rotate_count: int = 14,
    frequency: str = 'daily',
    compress: bool = True,
    pid_file: str = '/var/run/myapp.pid',
) -> str:
    paths = ' '.join(log_paths)
    config = f"""{paths} {{
    {frequency}
    rotate {rotate_count}
    {'compress' if compress else ''}
    {'delaycompress' if compress else ''}
    missingok
    notifempty
    create 0640 appuser appuser
    postrotate
        kill -USR1 $(cat {pid_file}) 2>/dev/null || true
    endscript
}}"""
    return config

Log lifecycle management

In production, logs go through a lifecycle: write → rotate → compress → archive → delete. Building this pipeline:

from pathlib import Path
from datetime import datetime, timedelta

class LogLifecycleManager:
    def __init__(self, log_dir: str, archive_dir: str):
        self.log_dir = Path(log_dir)
        self.archive_dir = Path(archive_dir)
        self.archive_dir.mkdir(parents=True, exist_ok=True)

    def compress_old_logs(self, older_than_hours: int = 1):
        """Compress plain-text rotated logs."""
        threshold = time.time() - (older_than_hours * 3600)
        for log_file in self.log_dir.glob('*.log.[0-9]*'):
            if log_file.stat().st_mtime < threshold:
                gz_path = log_file.with_suffix(log_file.suffix + '.gz')
                with open(log_file, 'rb') as f_in:
                    with gzip.open(gz_path, 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
                log_file.unlink()

    def archive_old_logs(self, older_than_days: int = 7):
        """Move compressed logs to archive storage."""
        threshold = time.time() - (older_than_days * 86400)
        for gz_file in self.log_dir.glob('*.gz'):
            if gz_file.stat().st_mtime < threshold:
                dest = self.archive_dir / gz_file.name
                shutil.move(str(gz_file), str(dest))

    def purge_archives(self, older_than_days: int = 90):
        """Delete archived logs beyond retention period."""
        threshold = time.time() - (older_than_days * 86400)
        for archive in self.archive_dir.glob('*.gz'):
            if archive.stat().st_mtime < threshold:
                archive.unlink()

    def run_lifecycle(self):
        """Execute the full log lifecycle pipeline."""
        self.compress_old_logs(older_than_hours=1)
        self.archive_old_logs(older_than_days=7)
        self.purge_archives(older_than_days=90)

Container logging considerations

In containerized environments, the best practice shifts dramatically:

import logging
import sys

def setup_container_logging():
    """For containers: log to stdout, let the runtime handle rotation."""
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter(
        '{"time":"%(asctime)s","level":"%(levelname)s",'
        '"logger":"%(name)s","message":"%(message)s"}'
    ))
    logger = logging.getLogger()
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

Docker’s json-file logging driver rotates by default (10 MB, 1 file). Configure it in daemon.json:

{
  "log-driver": "json-file",
  "log-opts": {
    "max-size": "50m",
    "max-file": "5"
  }
}

In this model, Python should not do any rotation at all. The container runtime owns the log lifecycle.

One thing to remember: The right log rotation strategy depends on your deployment model — multi-process apps need process-safe handlers or external rotation, containers should log to stdout, and traditional server deployments benefit from WatchedFileHandler paired with system logrotate for compression, archival, and cleanup.

pythonloggingsystem-administrationautomationdevops

See Also

  • Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
  • Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
  • Python Network Interface Monitoring How Python watches your computer's network connections — like having a traffic counter on every road leading to your house.
  • Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.
  • Python Psutil System Monitoring How Python's psutil library lets your program check on your computer's health — like a doctor with a stethoscope for your machine.