Python Log Rotation Management — Deep Dive
The multi-process rotation problem
Python’s RotatingFileHandler is not safe for multi-process applications. When two processes write to the same log file and one triggers rotation, the rename operation can cause the other process to lose its file handle or write to the wrong file.
Solutions for multi-process logging
Option 1: WatchedFileHandler + external logrotate
The safest approach for multi-process Python apps. Each process uses WatchedFileHandler, which checks on every log write whether the file has been renamed or deleted:
import logging
from logging.handlers import WatchedFileHandler
import os
def setup_process_safe_logging(log_path: str) -> logging.Logger:
logger = logging.getLogger(f'worker-{os.getpid()}')
handler = WatchedFileHandler(log_path)
handler.setFormatter(logging.Formatter(
'%(asctime)s [PID %(process)d] %(levelname)s %(name)s: %(message)s'
))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
When logrotate renames the file, WatchedFileHandler detects the inode change on the next write and reopens the file. The brief window between rename and reopen can cause a few log lines to be lost, but in practice this is acceptable for most applications.
Option 2: QueueHandler with a dedicated logging process
For applications that cannot tolerate any log loss:
import logging
import logging.handlers
from multiprocessing import Queue, Process
def logging_listener(queue: Queue, log_path: str):
"""Dedicated process that handles all log writes and rotation."""
handler = logging.handlers.RotatingFileHandler(
log_path, maxBytes=50 * 1024 * 1024, backupCount=10
)
handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s %(name)s: %(message)s'
))
while True:
record = queue.get()
if record is None:
break
handler.emit(record)
def setup_queue_logging(queue: Queue) -> logging.Logger:
"""Set up a worker process to send logs through the queue."""
logger = logging.getLogger(f'worker-{os.getpid()}')
handler = logging.handlers.QueueHandler(queue)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# Main process
log_queue = Queue()
listener = Process(target=logging_listener, args=(log_queue, '/var/log/myapp/app.log'))
listener.start()
# Worker processes use setup_queue_logging(log_queue)
Option 3: concurrent-log-handler
A third-party library that uses file locking for process-safe rotation:
from concurrent_log_handler import ConcurrentRotatingFileHandler
handler = ConcurrentRotatingFileHandler(
'app.log',
maxBytes=10 * 1024 * 1024,
backupCount=5,
use_gzip=True, # Compress rotated files
)
This uses portalocker for cross-platform file locking, adding slight overhead per log write but guaranteeing correctness.
Building a compressing rotation handler
Python’s built-in handlers do not compress rotated files. Here is a custom handler that gzips old logs:
import gzip
import os
import shutil
import logging
from logging.handlers import RotatingFileHandler
class CompressingRotatingFileHandler(RotatingFileHandler):
"""RotatingFileHandler that gzips rotated files."""
def rotation_filename(self, default_name: str) -> str:
return default_name + '.gz'
def rotate(self, source: str, dest: str):
if os.path.exists(source):
with open(source, 'rb') as f_in:
with gzip.open(dest, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(source)
# Usage
handler = CompressingRotatingFileHandler(
'app.log',
maxBytes=50 * 1024 * 1024,
backupCount=10,
)
This produces files like app.log.1.gz, app.log.2.gz, etc. The compression happens synchronously during rotation, so there is a brief latency spike. For very large files, consider deferring compression to a background thread.
Async compression for high-throughput applications
import threading
class AsyncCompressingHandler(RotatingFileHandler):
"""Compress rotated files in a background thread."""
def rotate(self, source: str, dest: str):
# First, do the standard rename
if os.path.exists(source):
temp_dest = dest + '.tmp'
os.rename(source, temp_dest)
# Compress in background
def compress():
with open(temp_dest, 'rb') as f_in:
with gzip.open(dest, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(temp_dest)
threading.Thread(target=compress, daemon=True).start()
Time-based rotation with size limits
A combined approach that rotates daily but also triggers rotation if a file grows too large mid-day:
import time
from logging.handlers import BaseRotatingHandler
class SizeAndTimeRotatingHandler(BaseRotatingHandler):
"""Rotate by time (daily) and by size (whichever comes first)."""
def __init__(self, filename, max_bytes=50*1024*1024, backup_count=30):
super().__init__(filename, mode='a')
self.max_bytes = max_bytes
self.backup_count = backup_count
self.rotation_date = self._current_date()
def _current_date(self):
return time.strftime('%Y-%m-%d', time.gmtime())
def _rotation_filename(self):
date = self._current_date()
base = self.baseFilename
# Find next available sequence number for today
seq = 1
while os.path.exists(f"{base}.{date}.{seq}.gz"):
seq += 1
return f"{base}.{date}.{seq}"
def shouldRollover(self, record):
today = self._current_date()
if today != self.rotation_date:
return True
if self.max_bytes > 0:
self.stream.seek(0, 2)
if self.stream.tell() + len(self.format(record)) >= self.max_bytes:
return True
return False
def doRollover(self):
self.stream.close()
dest = self._rotation_filename()
os.rename(self.baseFilename, dest)
# Compress in background
threading.Thread(
target=self._compress, args=(dest,), daemon=True
).start()
# Clean up old files
self._cleanup()
self.rotation_date = self._current_date()
self.stream = self._open()
def _compress(self, filepath):
gz_path = filepath + '.gz'
with open(filepath, 'rb') as f_in:
with gzip.open(gz_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(filepath)
def _cleanup(self):
import glob
pattern = self.baseFilename + '.*'
files = sorted(glob.glob(pattern), reverse=True)
for old_file in files[self.backup_count:]:
os.remove(old_file)
Logrotate integration patterns
Signal-based reopening
Instead of copytruncate (which can lose data during the copy), use a signal handler:
import signal
import logging
_file_handlers: list[logging.FileHandler] = []
def reopen_logs(signum, frame):
"""Reopen all file handlers — called by logrotate via postrotate."""
for handler in _file_handlers:
handler.close()
handler.stream = handler._open()
signal.signal(signal.SIGUSR1, reopen_logs)
Logrotate config:
/var/log/myapp/*.log {
daily
rotate 14
compress
delaycompress
missingok
notifempty
postrotate
kill -USR1 $(cat /var/run/myapp.pid) 2>/dev/null || true
endscript
}
Generating logrotate configs from Python
For applications that manage their own deployment:
def generate_logrotate_config(
log_paths: list[str],
rotate_count: int = 14,
frequency: str = 'daily',
compress: bool = True,
pid_file: str = '/var/run/myapp.pid',
) -> str:
paths = ' '.join(log_paths)
config = f"""{paths} {{
{frequency}
rotate {rotate_count}
{'compress' if compress else ''}
{'delaycompress' if compress else ''}
missingok
notifempty
create 0640 appuser appuser
postrotate
kill -USR1 $(cat {pid_file}) 2>/dev/null || true
endscript
}}"""
return config
Log lifecycle management
In production, logs go through a lifecycle: write → rotate → compress → archive → delete. Building this pipeline:
from pathlib import Path
from datetime import datetime, timedelta
class LogLifecycleManager:
def __init__(self, log_dir: str, archive_dir: str):
self.log_dir = Path(log_dir)
self.archive_dir = Path(archive_dir)
self.archive_dir.mkdir(parents=True, exist_ok=True)
def compress_old_logs(self, older_than_hours: int = 1):
"""Compress plain-text rotated logs."""
threshold = time.time() - (older_than_hours * 3600)
for log_file in self.log_dir.glob('*.log.[0-9]*'):
if log_file.stat().st_mtime < threshold:
gz_path = log_file.with_suffix(log_file.suffix + '.gz')
with open(log_file, 'rb') as f_in:
with gzip.open(gz_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
log_file.unlink()
def archive_old_logs(self, older_than_days: int = 7):
"""Move compressed logs to archive storage."""
threshold = time.time() - (older_than_days * 86400)
for gz_file in self.log_dir.glob('*.gz'):
if gz_file.stat().st_mtime < threshold:
dest = self.archive_dir / gz_file.name
shutil.move(str(gz_file), str(dest))
def purge_archives(self, older_than_days: int = 90):
"""Delete archived logs beyond retention period."""
threshold = time.time() - (older_than_days * 86400)
for archive in self.archive_dir.glob('*.gz'):
if archive.stat().st_mtime < threshold:
archive.unlink()
def run_lifecycle(self):
"""Execute the full log lifecycle pipeline."""
self.compress_old_logs(older_than_hours=1)
self.archive_old_logs(older_than_days=7)
self.purge_archives(older_than_days=90)
Container logging considerations
In containerized environments, the best practice shifts dramatically:
import logging
import sys
def setup_container_logging():
"""For containers: log to stdout, let the runtime handle rotation."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
'{"time":"%(asctime)s","level":"%(levelname)s",'
'"logger":"%(name)s","message":"%(message)s"}'
))
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
Docker’s json-file logging driver rotates by default (10 MB, 1 file). Configure it in daemon.json:
{
"log-driver": "json-file",
"log-opts": {
"max-size": "50m",
"max-file": "5"
}
}
In this model, Python should not do any rotation at all. The container runtime owns the log lifecycle.
One thing to remember: The right log rotation strategy depends on your deployment model — multi-process apps need process-safe handlers or external rotation, containers should log to stdout, and traditional server deployments benefit from WatchedFileHandler paired with system logrotate for compression, archival, and cleanup.
See Also
- Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
- Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
- Python Network Interface Monitoring How Python watches your computer's network connections — like having a traffic counter on every road leading to your house.
- Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.
- Python Psutil System Monitoring How Python's psutil library lets your program check on your computer's health — like a doctor with a stethoscope for your machine.