Vault Integration Patterns in Python — Deep Dive
Connecting to Vault with hvac
The hvac library is the standard Python client for HashiCorp Vault. Install with pip install hvac.
import hvac
# Basic token authentication
client = hvac.Client(
url="https://vault.example.com:8200",
token="hvs.CAESIJlE..." # In practice, never hardcode — use env or file
)
# Verify connection
assert client.is_authenticated()
print(f"Connected to Vault, token accessor: {client.lookup_token()['data']['accessor']}")
AppRole authentication for production services
AppRole is the standard authentication method for automated services. The Role ID identifies the application, while the Secret ID acts as a one-time password.
import hvac
import os
class VaultClient:
"""Production vault client with AppRole auth and automatic renewal."""
def __init__(self, vault_url: str, role_id: str, secret_id: str):
self.client = hvac.Client(url=vault_url)
self._authenticate(role_id, secret_id)
def _authenticate(self, role_id: str, secret_id: str):
"""Authenticate with AppRole and store the token."""
response = self.client.auth.approle.login(
role_id=role_id,
secret_id=secret_id
)
self.client.token = response["auth"]["client_token"]
self.token_lease_duration = response["auth"]["lease_duration"]
self.token_renewable = response["auth"]["renewable"]
print(f"Authenticated, token valid for {self.token_lease_duration}s")
def read_secret(self, path: str, mount_point: str = "secret") -> dict:
"""Read a KV v2 secret."""
response = self.client.secrets.kv.v2.read_secret_version(
path=path,
mount_point=mount_point
)
return response["data"]["data"]
def write_secret(self, path: str, data: dict, mount_point: str = "secret"):
"""Write a KV v2 secret."""
self.client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=data,
mount_point=mount_point
)
# Usage
vault = VaultClient(
vault_url=os.environ["VAULT_ADDR"],
role_id=os.environ["VAULT_ROLE_ID"],
secret_id=os.environ["VAULT_SECRET_ID"]
)
db_config = vault.read_secret("myapp/database")
print(f"DB host: {db_config['host']}, user: {db_config['username']}")
Dynamic database credentials
Instead of storing static passwords, Vault generates unique, time-limited database credentials for each requester:
class DynamicDatabaseCredentials:
"""Fetch and manage dynamic database credentials from Vault."""
def __init__(self, vault_client: hvac.Client, db_role: str = "myapp-readonly"):
self.client = vault_client
self.db_role = db_role
self._lease_id = None
self._credentials = None
self._expires_at = None
def get_credentials(self) -> dict:
"""Get current credentials, fetching new ones if expired."""
import time
if self._credentials and self._expires_at and time.time() < self._expires_at:
return self._credentials
return self._fetch_new_credentials()
def _fetch_new_credentials(self) -> dict:
"""Request fresh credentials from Vault's database secret engine."""
import time
response = self.client.secrets.databases.generate_credentials(
name=self.db_role,
mount_point="database"
)
self._credentials = {
"username": response["data"]["username"],
"password": response["data"]["password"]
}
self._lease_id = response["lease_id"]
self._expires_at = time.time() + response["lease_duration"] - 30 # 30s buffer
print(f"New DB credentials: user={self._credentials['username']}, "
f"lease={response['lease_duration']}s")
return self._credentials
def renew_lease(self):
"""Extend the current credential lease."""
if self._lease_id:
self.client.sys.renew_lease(
lease_id=self._lease_id,
increment=3600 # Request 1 hour extension
)
print(f"Lease renewed: {self._lease_id[:20]}...")
def revoke_on_shutdown(self):
"""Revoke credentials during graceful shutdown."""
if self._lease_id:
self.client.sys.revoke_lease(self._lease_id)
print("Database credentials revoked")
Integrating dynamic credentials with SQLAlchemy
Combining Vault dynamic credentials with a connection pool that handles credential rotation:
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool
import threading
class VaultManagedDatabase:
"""SQLAlchemy engine with Vault-managed rotating credentials."""
def __init__(self, vault_client: hvac.Client, db_host: str,
db_name: str, db_role: str):
self.creds = DynamicDatabaseCredentials(vault_client, db_role)
self.db_host = db_host
self.db_name = db_name
self._engine = None
self._renewal_thread = None
def get_engine(self):
"""Create engine with current credentials."""
if self._engine:
return self._engine
creds = self.creds.get_credentials()
url = (f"postgresql://{creds['username']}:{creds['password']}"
f"@{self.db_host}/{self.db_name}")
self._engine = create_engine(
url,
pool_size=5,
pool_recycle=1800, # Recycle connections before credential expiry
pool_pre_ping=True # Verify connections are alive
)
# Handle credential expiry by recreating connections
@event.listens_for(self._engine, "connect")
def on_connect(dbapi_connection, connection_record):
connection_record.info["created_at"] = __import__("time").time()
self._start_renewal_loop()
return self._engine
def _start_renewal_loop(self):
"""Background thread to renew credentials before expiry."""
def renewal_loop():
import time
while True:
time.sleep(300) # Check every 5 minutes
try:
self.creds.renew_lease()
except Exception as e:
print(f"Lease renewal failed: {e}")
# Fetch new credentials and recreate engine
self._recreate_engine()
self._renewal_thread = threading.Thread(target=renewal_loop, daemon=True)
self._renewal_thread.start()
def _recreate_engine(self):
"""Recreate engine with fresh credentials."""
if self._engine:
self._engine.dispose() # Close all existing connections
self._engine = None
self.get_engine() # Rebuild with new credentials
def shutdown(self):
"""Graceful shutdown: revoke credentials."""
if self._engine:
self._engine.dispose()
self.creds.revoke_on_shutdown()
Transit encryption — encrypt without managing keys
Vault’s Transit engine provides encryption-as-a-service. Your application sends plaintext to Vault and receives ciphertext. The encryption key never leaves Vault.
class TransitEncryption:
"""Encrypt/decrypt data using Vault's Transit secret engine."""
def __init__(self, vault_client: hvac.Client, key_name: str = "myapp-data"):
self.client = vault_client
self.key_name = key_name
def encrypt(self, plaintext: str) -> str:
"""Encrypt plaintext string, returns Vault ciphertext."""
import base64
encoded = base64.b64encode(plaintext.encode()).decode()
response = self.client.secrets.transit.encrypt_data(
name=self.key_name,
plaintext=encoded,
mount_point="transit"
)
return response["data"]["ciphertext"] # vault:v1:xxxxx
def decrypt(self, ciphertext: str) -> str:
"""Decrypt Vault ciphertext back to plaintext."""
import base64
response = self.client.secrets.transit.decrypt_data(
name=self.key_name,
ciphertext=ciphertext,
mount_point="transit"
)
return base64.b64decode(response["data"]["plaintext"]).decode()
def rotate_key(self):
"""Rotate the encryption key. Old ciphertexts remain decryptable."""
self.client.secrets.transit.rotate_key(
name=self.key_name,
mount_point="transit"
)
print(f"Key '{self.key_name}' rotated to new version")
def rewrap(self, ciphertext: str) -> str:
"""Re-encrypt with latest key version without exposing plaintext."""
response = self.client.secrets.transit.rewrap_data(
name=self.key_name,
ciphertext=ciphertext,
mount_point="transit"
)
return response["data"]["ciphertext"]
# Usage
transit = TransitEncryption(vault.client)
encrypted = transit.encrypt("sensitive patient data")
print(f"Encrypted: {encrypted}") # vault:v1:8SDd3...
decrypted = transit.decrypt(encrypted)
print(f"Decrypted: {decrypted}") # sensitive patient data
Kubernetes integration with Vault Agent Injector
In Kubernetes, the Vault Agent Injector automatically adds a sidecar that handles authentication and secret injection:
# deployment.yaml with Vault annotations
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
spec:
template:
metadata:
annotations:
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "myapp"
vault.hashicorp.com/agent-inject-secret-db: "database/creds/myapp-role"
vault.hashicorp.com/agent-inject-template-db: |
{{- with secret "database/creds/myapp-role" -}}
DB_USER={{ .Data.username }}
DB_PASS={{ .Data.password }}
{{- end }}
spec:
serviceAccountName: myapp
containers:
- name: myapp
image: myapp:latest
The Python application reads injected secrets from files:
import os
def read_vault_injected_secret(secret_name: str) -> str:
"""Read secret injected by Vault Agent sidecar."""
secret_path = f"/vault/secrets/{secret_name}"
with open(secret_path) as f:
return f.read().strip()
# Parse key=value format from template
def parse_vault_env_file(filename: str) -> dict:
"""Parse Vault Agent template output as environment variables."""
config = {}
secret_path = f"/vault/secrets/{filename}"
if os.path.exists(secret_path):
with open(secret_path) as f:
for line in f:
line = line.strip()
if "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
config[key] = value
return config
# Use in application startup
db_config = parse_vault_env_file("db")
db_user = db_config.get("DB_USER")
db_pass = db_config.get("DB_PASS")
Multi-cloud secret access pattern
When running across AWS, GCP, and on-prem, abstract vault access behind a provider interface:
from abc import ABC, abstractmethod
class SecretProvider(ABC):
@abstractmethod
def get_secret(self, name: str) -> str: ...
class HashiCorpVaultProvider(SecretProvider):
def __init__(self, client: hvac.Client, mount: str = "secret"):
self.client = client
self.mount = mount
def get_secret(self, name: str) -> str:
data = self.client.secrets.kv.v2.read_secret_version(
path=name, mount_point=self.mount
)
return data["data"]["data"]
class AWSSecretsManagerProvider(SecretProvider):
def __init__(self):
import boto3
self.client = boto3.client("secretsmanager")
def get_secret(self, name: str) -> str:
import json
response = self.client.get_secret_value(SecretId=name)
return json.loads(response["SecretString"])
class GCPSecretManagerProvider(SecretProvider):
def __init__(self, project_id: str):
from google.cloud import secretmanager
self.client = secretmanager.SecretManagerServiceClient()
self.project = project_id
def get_secret(self, name: str) -> str:
resource = f"projects/{self.project}/secrets/{name}/versions/latest"
response = self.client.access_secret_version(request={"name": resource})
return response.payload.data.decode()
def get_secret_provider() -> SecretProvider:
"""Factory: select provider based on environment."""
provider = os.environ.get("SECRET_PROVIDER", "vault")
if provider == "aws":
return AWSSecretsManagerProvider()
elif provider == "gcp":
return GCPSecretManagerProvider(os.environ["GCP_PROJECT"])
else:
client = hvac.Client(url=os.environ["VAULT_ADDR"], token=os.environ["VAULT_TOKEN"])
return HashiCorpVaultProvider(client)
The one thing to remember: Production Vault integration in Python centers on three patterns — AppRole authentication with hvac for service identity, dynamic credentials with lease renewal for databases, and Transit encryption for data protection without key management — with Kubernetes sidecar injection providing the simplest deployment model.
See Also
- Python Certificate Management How websites prove they are who they say they are — like a digital passport checked every time you visit
- Python Data Masking Techniques How companies hide real names, emails, and credit card numbers while keeping data useful for testing and analytics
- Python Homomorphic Encryption How you can do math on locked data without ever unlocking it — like solving a puzzle inside a sealed box
- Python Key Management Practices Why the key to your encryption is more important than the encryption itself — and how to keep it safe
- Python Secure Multiparty Computation How a group of friends can figure out who earns the most without anyone revealing their actual salary