Nomad Job Scheduling with Python — Deep Dive
Nomad’s scheduling algorithm
Nomad’s scheduler runs an evaluation whenever cluster state changes. The process:
- Feasibility check — filter nodes that meet constraints (datacenter, CPU, memory, affinity rules)
- Ranking — score feasible nodes using bin-packing (maximize utilization) or spread (maximize availability)
- Plan — create allocation proposals and check for conflicts
- Apply — commit allocations and start tasks on selected nodes
The default bin-packing scheduler tries to fill nodes efficiently, reducing the number of machines needed. The spread scheduler distributes load for better fault tolerance. You choose per-job.
Programmatic job submission from Python
The python-nomad library enables full lifecycle management:
import nomad
import json
client = nomad.Nomad(host="nomad.internal", timeout=30)
def submit_python_service(
name: str,
image: str,
count: int = 3,
cpu: int = 500,
memory: int = 256,
env: dict | None = None,
) -> dict:
"""Submit a Python service job to Nomad."""
job_spec = {
"Job": {
"ID": name,
"Name": name,
"Type": "service",
"Datacenters": ["dc1"],
"TaskGroups": [{
"Name": name,
"Count": count,
"Networks": [{
"DynamicPorts": [{"Label": "http", "To": 8000}]
}],
"Services": [{
"Name": name,
"PortLabel": "http",
"Checks": [{
"Type": "http",
"Path": "/health",
"Interval": 10_000_000_000, # 10s in nanoseconds
"Timeout": 3_000_000_000,
}],
}],
"Tasks": [{
"Name": f"{name}-app",
"Driver": "docker",
"Config": {
"image": image,
"ports": ["http"],
},
"Env": env or {},
"Resources": {
"CPU": cpu,
"MemoryMB": memory,
},
}],
"Update": {
"MaxParallel": 1,
"MinHealthyTime": 30_000_000_000,
"HealthyDeadline": 300_000_000_000,
"AutoRevert": True,
},
}],
}
}
# Plan first (dry run)
plan = client.job.plan_job(name, job_spec)
print(f"Plan: {plan['Diff']['Type']} — "
f"creating {len(plan.get('CreatedEvals', []))} evaluations")
# Submit
response = client.job.register_job(name, job_spec)
return {
"eval_id": response["EvalID"],
"job_id": name,
"status": "submitted",
}
Rolling deployment strategies
Nomad’s update stanza controls how changes roll out:
update {
max_parallel = 1 # Update one allocation at a time
min_healthy_time = "30s" # Wait 30s after healthy before continuing
healthy_deadline = "5m" # Fail if not healthy within 5 minutes
auto_revert = true # Roll back on failure
canary = 1 # Deploy 1 canary before full rollout
auto_promote = false # Require manual promotion after canary
}
Python-driven canary promotion
def monitor_canary_and_promote(
job_id: str,
client: nomad.Nomad,
check_interval: int = 15,
max_checks: int = 20,
) -> dict:
"""Monitor canary health and promote if stable."""
import time
deployment = None
for _ in range(10):
deployments = client.job.get_deployments(job_id)
active = [d for d in deployments if d["Status"] == "running"]
if active:
deployment = active[0]
break
time.sleep(5)
if not deployment:
return {"success": False, "reason": "No active deployment found"}
deploy_id = deployment["ID"]
# Monitor canary health
for check in range(max_checks):
status = client.deployment.get_deployment(deploy_id)
all_healthy = True
for group_name, group_status in status.get("TaskGroups", {}).items():
healthy = group_status.get("HealthyAllocs", 0)
desired_canaries = group_status.get("DesiredCanaries", 0)
if healthy < desired_canaries:
all_healthy = False
if all_healthy and check >= 2: # At least 30s of health
# Promote canary to full deployment
client.deployment.promote_deployment(deploy_id, all=True)
return {
"success": True,
"deployment_id": deploy_id,
"action": "promoted",
}
if status["Status"] != "running":
return {
"success": False,
"deployment_id": deploy_id,
"status": status["Status"],
}
time.sleep(check_interval)
return {"success": False, "reason": "Canary did not stabilize in time"}
Auto-scaling Python workloads
Nomad supports external auto-scalers. Here’s a Python implementation that scales based on queue depth:
import nomad
import redis
import time
import logging
log = logging.getLogger(__name__)
class QueueBasedScaler:
def __init__(
self,
nomad_host: str,
redis_url: str,
job_id: str,
group_name: str,
queue_name: str,
messages_per_worker: int = 100,
min_count: int = 1,
max_count: int = 20,
cooldown_seconds: int = 60,
):
self.client = nomad.Nomad(host=nomad_host)
self.redis = redis.from_url(redis_url)
self.job_id = job_id
self.group_name = group_name
self.queue_name = queue_name
self.messages_per_worker = messages_per_worker
self.min_count = min_count
self.max_count = max_count
self.cooldown = cooldown_seconds
self.last_scale_time = 0
def get_desired_count(self) -> int:
queue_length = self.redis.llen(self.queue_name)
desired = max(
self.min_count,
min(self.max_count, queue_length // self.messages_per_worker + 1),
)
return desired
def get_current_count(self) -> int:
job = self.client.job.get_job(self.job_id)
for group in job.get("TaskGroups", []):
if group["Name"] == self.group_name:
return group["Count"]
return 0
def scale(self) -> dict | None:
now = time.time()
if now - self.last_scale_time < self.cooldown:
return None
current = self.get_current_count()
desired = self.get_desired_count()
if current == desired:
return None
log.info(f"Scaling {self.job_id}/{self.group_name}: {current} -> {desired}")
self.client.job.scale_job(
self.job_id,
{self.group_name: desired},
message=f"Queue scaler: {self.redis.llen(self.queue_name)} messages",
)
self.last_scale_time = now
return {"previous": current, "new": desired}
def run(self, interval: int = 30):
"""Run the scaler loop."""
while True:
try:
result = self.scale()
if result:
log.info(f"Scaled: {result}")
except Exception as e:
log.error(f"Scaler error: {e}")
time.sleep(interval)
Batch job patterns for Python
Data processing pipeline
job "etl-pipeline" {
type = "batch"
datacenters = ["dc1"]
parameterized {
payload = "forbidden"
meta_required = ["date", "source"]
}
group "processor" {
task "transform" {
driver = "docker"
config {
image = "ghcr.io/myorg/etl:latest"
command = "python"
args = ["-m", "etl.run", "--date", "${NOMAD_META_date}", "--source", "${NOMAD_META_source}"]
}
resources {
cpu = 2000
memory = 1024
}
}
}
}
Dispatch from Python:
client.job.dispatch_job("etl-pipeline", meta={
"date": "2026-03-28",
"source": "clickstream",
})
Nomad with Consul and Vault
The HashiCorp stack integrates tightly:
- Consul provides service discovery — your Python services register automatically and find each other by name
- Vault injects secrets into task environments — database passwords, API keys, TLS certificates rotate without redeployment
- Nomad handles scheduling and lifecycle
task "api" {
vault {
policies = ["api-secrets"]
}
template {
data = <<EOF
DATABASE_URL=postgresql://{{ with secret "database/creds/api" }}{{ .Data.username }}:{{ .Data.password }}{{ end }}@db.service.consul:5432/app
EOF
destination = "secrets/env"
env = true
}
}
Multi-region federation
Nomad supports multi-region deployments where a job spans datacenters:
multiregion {
strategy {
max_parallel = 1
on_failure = "fail_all"
}
region "us-east" { count = 3, datacenters = ["dc-east"] }
region "eu-west" { count = 2, datacenters = ["dc-west"] }
}
Python orchestration can coordinate cross-region deployments by monitoring deployment status across regions.
Observability
def get_cluster_status(client: nomad.Nomad) -> dict:
"""Aggregate cluster health metrics."""
nodes = client.nodes.get_nodes()
jobs = client.jobs.get_jobs()
return {
"nodes": {
"total": len(nodes),
"ready": sum(1 for n in nodes if n["Status"] == "ready"),
"down": sum(1 for n in nodes if n["Status"] == "down"),
},
"jobs": {
"total": len(jobs),
"running": sum(1 for j in jobs if j["Status"] == "running"),
"dead": sum(1 for j in jobs if j["Status"] == "dead"),
},
"allocations": _count_allocation_states(client, jobs),
}
def _count_allocation_states(client, jobs):
states = {"running": 0, "pending": 0, "failed": 0}
for job in jobs[:20]: # Limit to avoid API overload
allocs = client.job.get_allocations(job["ID"])
for alloc in allocs:
status = alloc.get("ClientStatus", "unknown")
if status in states:
states[status] += 1
return states
The one thing to remember: Nomad’s clean HTTP API and Python SDK make it uniquely scriptable — from job submission through canary promotion to custom auto-scaling — giving Python teams full programmatic control over their workload scheduling.
See Also
- Python Ansible Automation How Python powers Ansible to automatically set up and manage hundreds of servers without logging into each one
- Python Docker Compose Orchestration How Python developers use Docker Compose to run multiple services together like a conductor leading an orchestra
- Python Etcd Distributed Config How Python applications use etcd to share configuration across many servers and react to changes instantly
- Python Helm Charts Python Why Python developers use Helm charts to package and deploy their apps to Kubernetes clusters
- Python Pulumi Infrastructure How Python developers use Pulumi to build cloud infrastructure using the same language they already know