Ansible Automation with Python — Deep Dive
Ansible’s Python architecture
Ansible’s execution flow reveals its deep Python integration. When you run a playbook:
- The controller (your machine) parses YAML playbooks into Python data structures
- For each host, Ansible selects tasks based on conditionals and loops
- Each module is packaged as a self-contained Python script with
ansible.module_utilsbundled in - The script is transferred via SSH (or another connection plugin) and executed on the remote host
- The module outputs JSON, which the controller parses to determine success, change status, and results
This “push and execute” model means you can debug modules by running them directly on the target machine with JSON input — a significant advantage over opaque agent-based tools.
Writing production-grade custom modules
Beyond the basic module structure, production modules handle check mode, diff output, and complex state management:
#!/usr/bin/python
from ansible.module_utils.basic import AnsibleModule
import json
import os
DOCUMENTATION = r"""
---
module: app_config
short_description: Manage application configuration files
description:
- Ensures a JSON configuration file matches the desired state
- Supports check mode and diff output
options:
path:
description: Path to the configuration file
required: true
type: str
settings:
description: Dictionary of settings to ensure
required: true
type: dict
merge:
description: Whether to merge with existing settings or replace
default: true
type: bool
"""
def load_config(path):
if os.path.exists(path):
with open(path) as f:
return json.load(f)
return {}
def main():
module = AnsibleModule(
argument_spec={
"path": {"type": "str", "required": True},
"settings": {"type": "dict", "required": True},
"merge": {"type": "bool", "default": True},
},
supports_check_mode=True,
)
path = module.params["path"]
desired = module.params["settings"]
merge = module.params["merge"]
current = load_config(path)
if merge:
target = {**current, **desired}
else:
target = desired
changed = current != target
diff = {}
if module._diff:
diff = {
"before": json.dumps(current, indent=2),
"after": json.dumps(target, indent=2),
}
if changed and not module.check_mode:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
json.dump(target, f, indent=2)
module.exit_json(
changed=changed,
diff=diff,
path=path,
config=target,
)
if __name__ == "__main__":
main()
Key patterns:
supports_check_mode=Trueenables dry-run execution — Ansible can preview changes without applying them- Diff output integrates with
ansible-playbook --diffto show what would change - Documentation string follows Ansible’s format and generates
ansible-docoutput
Ansible-runner: Python API integration
ansible-runner lets Python applications invoke Ansible programmatically:
import ansible_runner
def deploy_application(version: str, hosts: list[str]) -> dict:
"""Run deployment playbook and return structured results."""
inventory = {
"all": {
"hosts": {host: {} for host in hosts},
"vars": {
"app_version": version,
"ansible_python_interpreter": "/usr/bin/python3",
},
}
}
result = ansible_runner.run(
playbook="deploy.yml",
inventory=inventory,
project_dir="/opt/ansible/playbooks",
quiet=True,
)
summary = {
"status": result.status, # "successful", "failed", "timeout"
"rc": result.rc,
"stats": result.stats,
"host_results": {},
}
for event in result.events:
if event["event"] == "runner_on_ok":
host = event["event_data"]["host"]
task = event["event_data"]["task"]
summary["host_results"].setdefault(host, []).append({
"task": task,
"changed": event["event_data"].get("res", {}).get("changed", False),
})
return summary
This enables building deployment dashboards, ChatOps bots, or self-service portals that trigger Ansible behind the scenes.
Dynamic inventory from Python
Static inventory files break down when infrastructure is dynamic. Python inventory scripts query cloud APIs:
#!/usr/bin/env python3
"""Dynamic inventory for AWS EC2 instances."""
import json
import boto3
def get_inventory():
ec2 = boto3.client("ec2")
response = ec2.describe_instances(
Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
)
inventory = {"_meta": {"hostvars": {}}}
for reservation in response["Reservations"]:
for instance in reservation["Instances"]:
ip = instance.get("PrivateIpAddress")
if not ip:
continue
# Group by tags
tags = {t["Key"]: t["Value"] for t in instance.get("Tags", [])}
role = tags.get("Role", "ungrouped")
env = tags.get("Environment", "unknown")
# Add to role group
inventory.setdefault(role, {"hosts": []})
inventory[role]["hosts"].append(ip)
# Add to environment group
inventory.setdefault(env, {"hosts": []})
inventory[env]["hosts"].append(ip)
# Host-specific variables
inventory["_meta"]["hostvars"][ip] = {
"instance_id": instance["InstanceId"],
"instance_type": instance["InstanceType"],
"ami_id": instance["ImageId"],
"availability_zone": instance["Placement"]["AvailabilityZone"],
}
return inventory
if __name__ == "__main__":
print(json.dumps(get_inventory(), indent=2))
Custom callback plugins for observability
Callback plugins intercept Ansible events for logging, metrics, or alerting:
from ansible.plugins.callback import CallbackBase
import requests
import time
class CallbackModule(CallbackBase):
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = "notification"
CALLBACK_NAME = "slack_notify"
def __init__(self):
super().__init__()
self.start_time = None
self.task_results = {"ok": 0, "changed": 0, "failed": 0}
def v2_playbook_on_start(self, playbook):
self.start_time = time.time()
def v2_runner_on_ok(self, result):
if result._result.get("changed"):
self.task_results["changed"] += 1
else:
self.task_results["ok"] += 1
def v2_runner_on_failed(self, result, ignore_errors=False):
self.task_results["failed"] += 1
def v2_playbook_on_stats(self, stats):
duration = time.time() - self.start_time
emoji = "🔴" if self.task_results["failed"] > 0 else "🟢"
msg = (
f"{emoji} Ansible run complete in {duration:.0f}s | "
f"OK: {self.task_results['ok']} | "
f"Changed: {self.task_results['changed']} | "
f"Failed: {self.task_results['failed']}"
)
webhook_url = os.environ.get("SLACK_WEBHOOK")
if webhook_url:
requests.post(webhook_url, json={"text": msg})
Advanced playbook patterns with Python
Jinja2 filters in Python
Custom Jinja2 filters extend template logic:
# filter_plugins/custom_filters.py
def to_env_var(name):
"""Convert 'my-service-name' to 'MY_SERVICE_NAME'."""
return name.replace("-", "_").upper()
def mask_secret(value, visible=4):
"""Show last N characters of a secret."""
if len(value) <= visible:
return "*" * len(value)
return "*" * (len(value) - visible) + value[-visible:]
class FilterModule:
def filters(self):
return {
"to_env_var": to_env_var,
"mask_secret": mask_secret,
}
Used in playbooks:
- name: Set environment variable
lineinfile:
path: /etc/environment
line: "{{ service_name | to_env_var }}={{ api_key | mask_secret }}"
Lookup plugins for external data
from ansible.plugins.lookup import LookupBase
import hvac
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
client = hvac.Client(url=kwargs.get("url", "https://vault:8200"))
client.token = kwargs.get("token")
results = []
for term in terms:
secret = client.secrets.kv.v2.read_secret_version(path=term)
results.append(secret["data"]["data"])
return results
Testing Ansible code
Molecule for integration testing
Molecule creates ephemeral test environments:
# molecule/default/molecule.yml
driver:
name: docker
platforms:
- name: ubuntu-test
image: ubuntu:22.04
pre_build_image: true
provisioner:
name: ansible
verifier:
name: testinfra
Testinfra for verification (Python)
# molecule/default/tests/test_default.py
def test_nginx_installed(host):
nginx = host.package("nginx")
assert nginx.is_installed
def test_nginx_running(host):
service = host.service("nginx")
assert service.is_running
assert service.is_enabled
def test_config_file(host):
config = host.file("/etc/myapp/config.json")
assert config.exists
assert config.user == "myapp"
assert config.mode == 0o640
Testinfra uses pytest under the hood, so all pytest features (fixtures, parametrize, markers) work.
Performance optimization
- Pipelining: Set
pipelining = Trueinansible.cfgto reduce SSH operations — modules execute over the existing connection instead of creating new ones - Forks: Increase
forks(default 5) to run tasks on more hosts simultaneously - Mitogen: The Mitogen connection plugin can reduce Ansible execution time by 2-7x by replacing SSH file transfers with in-process Python execution
- Fact caching: Enable Redis or JSON file fact caching to avoid re-gathering facts on every run
The one thing to remember: Ansible’s value for Python developers lies in its extensibility — custom modules, dynamic inventories, callback plugins, and programmatic execution via ansible-runner turn it from a configuration tool into a full automation framework.
See Also
- Python Docker Compose Orchestration How Python developers use Docker Compose to run multiple services together like a conductor leading an orchestra
- Python Etcd Distributed Config How Python applications use etcd to share configuration across many servers and react to changes instantly
- Python Helm Charts Python Why Python developers use Helm charts to package and deploy their apps to Kubernetes clusters
- Python Nomad Job Scheduling How Python developers use HashiCorp Nomad to run their programs across many computers without managing each one
- Python Pulumi Infrastructure How Python developers use Pulumi to build cloud infrastructure using the same language they already know