Nomad Job Scheduling with Python — Deep Dive

Nomad’s scheduling algorithm

Nomad’s scheduler runs an evaluation whenever cluster state changes. The process:

  1. Feasibility check — filter nodes that meet constraints (datacenter, CPU, memory, affinity rules)
  2. Ranking — score feasible nodes using bin-packing (maximize utilization) or spread (maximize availability)
  3. Plan — create allocation proposals and check for conflicts
  4. Apply — commit allocations and start tasks on selected nodes

The default bin-packing scheduler tries to fill nodes efficiently, reducing the number of machines needed. The spread scheduler distributes load for better fault tolerance. You choose per-job.

Programmatic job submission from Python

The python-nomad library enables full lifecycle management:

import nomad
import json

client = nomad.Nomad(host="nomad.internal", timeout=30)

def submit_python_service(
    name: str,
    image: str,
    count: int = 3,
    cpu: int = 500,
    memory: int = 256,
    env: dict | None = None,
) -> dict:
    """Submit a Python service job to Nomad."""
    job_spec = {
        "Job": {
            "ID": name,
            "Name": name,
            "Type": "service",
            "Datacenters": ["dc1"],
            "TaskGroups": [{
                "Name": name,
                "Count": count,
                "Networks": [{
                    "DynamicPorts": [{"Label": "http", "To": 8000}]
                }],
                "Services": [{
                    "Name": name,
                    "PortLabel": "http",
                    "Checks": [{
                        "Type": "http",
                        "Path": "/health",
                        "Interval": 10_000_000_000,  # 10s in nanoseconds
                        "Timeout": 3_000_000_000,
                    }],
                }],
                "Tasks": [{
                    "Name": f"{name}-app",
                    "Driver": "docker",
                    "Config": {
                        "image": image,
                        "ports": ["http"],
                    },
                    "Env": env or {},
                    "Resources": {
                        "CPU": cpu,
                        "MemoryMB": memory,
                    },
                }],
                "Update": {
                    "MaxParallel": 1,
                    "MinHealthyTime": 30_000_000_000,
                    "HealthyDeadline": 300_000_000_000,
                    "AutoRevert": True,
                },
            }],
        }
    }

    # Plan first (dry run)
    plan = client.job.plan_job(name, job_spec)
    print(f"Plan: {plan['Diff']['Type']} — "
          f"creating {len(plan.get('CreatedEvals', []))} evaluations")

    # Submit
    response = client.job.register_job(name, job_spec)
    return {
        "eval_id": response["EvalID"],
        "job_id": name,
        "status": "submitted",
    }

Rolling deployment strategies

Nomad’s update stanza controls how changes roll out:

update {
  max_parallel     = 1        # Update one allocation at a time
  min_healthy_time = "30s"    # Wait 30s after healthy before continuing
  healthy_deadline = "5m"     # Fail if not healthy within 5 minutes
  auto_revert      = true     # Roll back on failure
  canary           = 1        # Deploy 1 canary before full rollout
  auto_promote     = false    # Require manual promotion after canary
}

Python-driven canary promotion

def monitor_canary_and_promote(
    job_id: str,
    client: nomad.Nomad,
    check_interval: int = 15,
    max_checks: int = 20,
) -> dict:
    """Monitor canary health and promote if stable."""
    import time

    deployment = None
    for _ in range(10):
        deployments = client.job.get_deployments(job_id)
        active = [d for d in deployments if d["Status"] == "running"]
        if active:
            deployment = active[0]
            break
        time.sleep(5)

    if not deployment:
        return {"success": False, "reason": "No active deployment found"}

    deploy_id = deployment["ID"]

    # Monitor canary health
    for check in range(max_checks):
        status = client.deployment.get_deployment(deploy_id)

        all_healthy = True
        for group_name, group_status in status.get("TaskGroups", {}).items():
            healthy = group_status.get("HealthyAllocs", 0)
            desired_canaries = group_status.get("DesiredCanaries", 0)
            if healthy < desired_canaries:
                all_healthy = False

        if all_healthy and check >= 2:  # At least 30s of health
            # Promote canary to full deployment
            client.deployment.promote_deployment(deploy_id, all=True)
            return {
                "success": True,
                "deployment_id": deploy_id,
                "action": "promoted",
            }

        if status["Status"] != "running":
            return {
                "success": False,
                "deployment_id": deploy_id,
                "status": status["Status"],
            }

        time.sleep(check_interval)

    return {"success": False, "reason": "Canary did not stabilize in time"}

Auto-scaling Python workloads

Nomad supports external auto-scalers. Here’s a Python implementation that scales based on queue depth:

import nomad
import redis
import time
import logging

log = logging.getLogger(__name__)

class QueueBasedScaler:
    def __init__(
        self,
        nomad_host: str,
        redis_url: str,
        job_id: str,
        group_name: str,
        queue_name: str,
        messages_per_worker: int = 100,
        min_count: int = 1,
        max_count: int = 20,
        cooldown_seconds: int = 60,
    ):
        self.client = nomad.Nomad(host=nomad_host)
        self.redis = redis.from_url(redis_url)
        self.job_id = job_id
        self.group_name = group_name
        self.queue_name = queue_name
        self.messages_per_worker = messages_per_worker
        self.min_count = min_count
        self.max_count = max_count
        self.cooldown = cooldown_seconds
        self.last_scale_time = 0

    def get_desired_count(self) -> int:
        queue_length = self.redis.llen(self.queue_name)
        desired = max(
            self.min_count,
            min(self.max_count, queue_length // self.messages_per_worker + 1),
        )
        return desired

    def get_current_count(self) -> int:
        job = self.client.job.get_job(self.job_id)
        for group in job.get("TaskGroups", []):
            if group["Name"] == self.group_name:
                return group["Count"]
        return 0

    def scale(self) -> dict | None:
        now = time.time()
        if now - self.last_scale_time < self.cooldown:
            return None

        current = self.get_current_count()
        desired = self.get_desired_count()

        if current == desired:
            return None

        log.info(f"Scaling {self.job_id}/{self.group_name}: {current} -> {desired}")

        self.client.job.scale_job(
            self.job_id,
            {self.group_name: desired},
            message=f"Queue scaler: {self.redis.llen(self.queue_name)} messages",
        )

        self.last_scale_time = now
        return {"previous": current, "new": desired}

    def run(self, interval: int = 30):
        """Run the scaler loop."""
        while True:
            try:
                result = self.scale()
                if result:
                    log.info(f"Scaled: {result}")
            except Exception as e:
                log.error(f"Scaler error: {e}")
            time.sleep(interval)

Batch job patterns for Python

Data processing pipeline

job "etl-pipeline" {
  type = "batch"
  datacenters = ["dc1"]

  parameterized {
    payload       = "forbidden"
    meta_required = ["date", "source"]
  }

  group "processor" {
    task "transform" {
      driver = "docker"

      config {
        image   = "ghcr.io/myorg/etl:latest"
        command = "python"
        args    = ["-m", "etl.run", "--date", "${NOMAD_META_date}", "--source", "${NOMAD_META_source}"]
      }

      resources {
        cpu    = 2000
        memory = 1024
      }
    }
  }
}

Dispatch from Python:

client.job.dispatch_job("etl-pipeline", meta={
    "date": "2026-03-28",
    "source": "clickstream",
})

Nomad with Consul and Vault

The HashiCorp stack integrates tightly:

  • Consul provides service discovery — your Python services register automatically and find each other by name
  • Vault injects secrets into task environments — database passwords, API keys, TLS certificates rotate without redeployment
  • Nomad handles scheduling and lifecycle
task "api" {
  vault {
    policies = ["api-secrets"]
  }

  template {
    data        = <<EOF
DATABASE_URL=postgresql://{{ with secret "database/creds/api" }}{{ .Data.username }}:{{ .Data.password }}{{ end }}@db.service.consul:5432/app
EOF
    destination = "secrets/env"
    env         = true
  }
}

Multi-region federation

Nomad supports multi-region deployments where a job spans datacenters:

multiregion {
  strategy {
    max_parallel = 1
    on_failure   = "fail_all"
  }
  region "us-east" { count = 3, datacenters = ["dc-east"] }
  region "eu-west" { count = 2, datacenters = ["dc-west"] }
}

Python orchestration can coordinate cross-region deployments by monitoring deployment status across regions.

Observability

def get_cluster_status(client: nomad.Nomad) -> dict:
    """Aggregate cluster health metrics."""
    nodes = client.nodes.get_nodes()
    jobs = client.jobs.get_jobs()

    return {
        "nodes": {
            "total": len(nodes),
            "ready": sum(1 for n in nodes if n["Status"] == "ready"),
            "down": sum(1 for n in nodes if n["Status"] == "down"),
        },
        "jobs": {
            "total": len(jobs),
            "running": sum(1 for j in jobs if j["Status"] == "running"),
            "dead": sum(1 for j in jobs if j["Status"] == "dead"),
        },
        "allocations": _count_allocation_states(client, jobs),
    }

def _count_allocation_states(client, jobs):
    states = {"running": 0, "pending": 0, "failed": 0}
    for job in jobs[:20]:  # Limit to avoid API overload
        allocs = client.job.get_allocations(job["ID"])
        for alloc in allocs:
            status = alloc.get("ClientStatus", "unknown")
            if status in states:
                states[status] += 1
    return states

The one thing to remember: Nomad’s clean HTTP API and Python SDK make it uniquely scriptable — from job submission through canary promotion to custom auto-scaling — giving Python teams full programmatic control over their workload scheduling.

pythonnomadschedulingorchestration

See Also