Python systemd Integration — Deep Dive
Socket activation
Socket activation is one of systemd’s most powerful features. Instead of your Python application binding to a port itself, systemd creates the socket and passes it to your application. This enables:
- Zero-downtime restarts: The socket stays open during service restart, and connections queue in the kernel
- On-demand startup: The service starts only when a connection arrives
- Privilege separation: systemd binds to privileged ports (< 1024) and passes the socket to an unprivileged process
Setting up socket activation
Create two unit files. First, /etc/systemd/system/myapp.socket:
[Unit]
Description=My App Socket
[Socket]
ListenStream=8080
ReusePort=true
[Install]
WantedBy=sockets.target
Then modify the service file:
[Service]
Type=notify
User=appuser
ExecStart=/opt/myapp/venv/bin/python -m myapp.server
NonBlocking=true
Receiving activated sockets in Python
import socket
import os
LISTEN_FDS_START = 3 # systemd convention
def get_systemd_socket() -> socket.socket | None:
"""Retrieve a socket passed by systemd socket activation."""
listen_fds = int(os.environ.get('LISTEN_FDS', 0))
listen_pid = int(os.environ.get('LISTEN_PID', 0))
if listen_fds == 0 or listen_pid != os.getpid():
return None
# File descriptor 3 is the first activated socket
sock = socket.fromfd(LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
return sock
def start_server():
sock = get_systemd_socket()
if sock:
print("Using systemd-activated socket")
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', 8080))
sock.listen(128)
print("Using self-created socket")
return sock
For frameworks that accept a socket, pass it directly. For example, with uvicorn:
import uvicorn
sock = get_systemd_socket()
if sock:
uvicorn.run("myapp:app", fd=sock.fileno())
else:
uvicorn.run("myapp:app", host="0.0.0.0", port=8080)
Resource controls with cgroups
systemd manages each service in its own cgroup, giving you fine-grained resource control:
[Service]
# Memory limits
MemoryMax=512M
MemoryHigh=400M # Throttle before hard limit
# CPU limits
CPUQuota=200% # Max 2 CPU cores worth
CPUWeight=100 # Relative weight vs other services
# I/O limits
IOWeight=100
IOReadBandwidthMax=/dev/sda 50M
# Process limits
LimitNOFILE=65536
TasksMax=256
# Security hardening
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
NoNewPrivileges=true
ReadWritePaths=/opt/myapp/data /var/log/myapp
Monitoring resource usage
import subprocess
import json
def get_service_resources(service_name: str) -> dict:
"""Get current resource usage for a systemd service."""
result = subprocess.run(
['systemctl', 'show', service_name,
'--property=MemoryCurrent,CPUUsageNSec,TasksCurrent'],
capture_output=True, text=True
)
props = {}
for line in result.stdout.strip().split('\n'):
key, _, value = line.partition('=')
if value and value != '[not set]':
props[key] = int(value) if value.isdigit() else value
return props
# Example: {'MemoryCurrent': 134217728, 'CPUUsageNSec': 5432000000, 'TasksCurrent': 12}
Programmatic service management
Using D-Bus to control services
import dbus
def manage_service(name: str, action: str):
"""Start, stop, or restart a systemd service via D-Bus."""
bus = dbus.SystemBus()
systemd = bus.get_object(
'org.freedesktop.systemd1',
'/org/freedesktop/systemd1'
)
manager = dbus.Interface(systemd, 'org.freedesktop.systemd1.Manager')
if action == 'start':
manager.StartUnit(f'{name}.service', 'replace')
elif action == 'stop':
manager.StopUnit(f'{name}.service', 'replace')
elif action == 'restart':
manager.RestartUnit(f'{name}.service', 'replace')
elif action == 'status':
unit_path = manager.GetUnit(f'{name}.service')
unit = bus.get_object('org.freedesktop.systemd1', unit_path)
props = dbus.Interface(unit, 'org.freedesktop.DBus.Properties')
return {
'ActiveState': str(props.Get('org.freedesktop.systemd1.Unit', 'ActiveState')),
'SubState': str(props.Get('org.freedesktop.systemd1.Unit', 'SubState')),
'MainPID': int(props.Get('org.freedesktop.systemd1.Service', 'MainPID')),
}
Using subprocess for simpler cases
import subprocess
def service_is_active(name: str) -> bool:
result = subprocess.run(
['systemctl', 'is-active', '--quiet', name],
capture_output=True
)
return result.returncode == 0
def service_status(name: str) -> dict:
result = subprocess.run(
['systemctl', 'show', name, '--no-pager',
'--property=ActiveState,SubState,MainPID,MemoryCurrent'],
capture_output=True, text=True
)
props = {}
for line in result.stdout.strip().split('\n'):
if '=' in line:
key, _, value = line.partition('=')
props[key] = value
return props
Zero-downtime deployment pattern
Combining socket activation with a deployment script:
import subprocess
import time
def zero_downtime_deploy(service: str, new_version_path: str):
"""Deploy a new version with zero downtime using socket activation."""
# 1. Update the application code
subprocess.run(['rsync', '-a', new_version_path, '/opt/myapp/'], check=True)
# 2. Run database migrations
subprocess.run(
['/opt/myapp/venv/bin/python', '-m', 'myapp.migrate'],
check=True
)
# 3. Restart the service (socket stays open, connections queue)
subprocess.run(['systemctl', 'restart', f'{service}.service'], check=True)
# 4. Wait for readiness
for attempt in range(30):
result = subprocess.run(
['systemctl', 'is-active', '--quiet', f'{service}.service'],
capture_output=True
)
if result.returncode == 0:
print(f"Service ready after {attempt + 1} attempts")
return True
time.sleep(1)
raise RuntimeError("Service failed to become ready within 30 seconds")
Structured journal logging
Writing structured fields
The systemd journal supports arbitrary key-value fields, which makes log analysis far more powerful than plain text:
from systemd import journal
def log_request(method: str, path: str, status: int, duration_ms: float, user_id: str):
journal.send(
f"{method} {path} → {status} ({duration_ms:.0f}ms)",
PRIORITY=journal.LOG_INFO if status < 400 else journal.LOG_WARNING,
HTTP_METHOD=method,
HTTP_PATH=path,
HTTP_STATUS=str(status),
DURATION_MS=f"{duration_ms:.2f}",
USER_ID=user_id,
SYSLOG_IDENTIFIER="myapp",
)
Querying the journal programmatically
from systemd import journal
def get_recent_errors(service: str, hours: int = 1) -> list[dict]:
"""Fetch recent error logs for a service."""
reader = journal.Reader()
reader.add_match(_SYSTEMD_UNIT=f"{service}.service")
reader.add_match(PRIORITY=str(journal.LOG_ERR))
reader.seek_realtime(time.time() - (hours * 3600))
errors = []
for entry in reader:
errors.append({
'timestamp': entry['__REALTIME_TIMESTAMP'],
'message': entry.get('MESSAGE', ''),
'pid': entry.get('_PID'),
})
return errors
Templated services for multi-instance deployment
systemd template units let you run multiple instances of the same service:
Create /etc/systemd/system/myapp@.service:
[Unit]
Description=My App Worker %i
[Service]
Type=notify
User=appuser
ExecStart=/opt/myapp/venv/bin/python -m myapp.worker --instance %i
Environment=WORKER_ID=%i
[Install]
WantedBy=multi-user.target
Then manage instances:
def scale_workers(count: int):
"""Scale worker instances to the desired count."""
# Start desired workers
for i in range(count):
subprocess.run(
['systemctl', 'enable', '--now', f'myapp@{i}.service'],
check=True
)
# Stop excess workers
result = subprocess.run(
['systemctl', 'list-units', 'myapp@*.service', '--no-legend', '--plain'],
capture_output=True, text=True
)
for line in result.stdout.strip().split('\n'):
if not line:
continue
unit = line.split()[0]
instance = unit.split('@')[1].split('.')[0]
if int(instance) >= count:
subprocess.run(['systemctl', 'disable', '--now', unit], check=True)
Security hardening checklist
A production service unit should include these hardening options:
[Service]
# Filesystem isolation
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/opt/myapp/data
# Capability restrictions
NoNewPrivileges=true
CapabilityBoundingSet=
AmbientCapabilities=
# Network restrictions (if only localhost needed)
RestrictAddressFamilies=AF_INET AF_INET6 AF_UNIX
# System call filtering
SystemCallFilter=@system-service
SystemCallArchitectures=native
# Misc hardening
PrivateDevices=true
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectControlGroups=true
Verify hardening with: systemd-analyze security myapp.service
One thing to remember: systemd integration is not just about starting your Python app — it is about building a deployment-ready service with socket activation for zero-downtime restarts, cgroup controls for resource isolation, structured journal logging for observability, and security hardening that locks down the attack surface.
See Also
- Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
- Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
- Python Log Rotation Management Why your program's diary needs page limits — and how Python keeps log files from eating all your disk space.
- Python Network Interface Monitoring How Python watches your computer's network connections — like having a traffic counter on every road leading to your house.
- Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.