Vault Integration Patterns in Python — Deep Dive

Connecting to Vault with hvac

The hvac library is the standard Python client for HashiCorp Vault. Install with pip install hvac.

import hvac

# Basic token authentication
client = hvac.Client(
    url="https://vault.example.com:8200",
    token="hvs.CAESIJlE..."  # In practice, never hardcode — use env or file
)

# Verify connection
assert client.is_authenticated()
print(f"Connected to Vault, token accessor: {client.lookup_token()['data']['accessor']}")

AppRole authentication for production services

AppRole is the standard authentication method for automated services. The Role ID identifies the application, while the Secret ID acts as a one-time password.

import hvac
import os

class VaultClient:
    """Production vault client with AppRole auth and automatic renewal."""

    def __init__(self, vault_url: str, role_id: str, secret_id: str):
        self.client = hvac.Client(url=vault_url)
        self._authenticate(role_id, secret_id)

    def _authenticate(self, role_id: str, secret_id: str):
        """Authenticate with AppRole and store the token."""
        response = self.client.auth.approle.login(
            role_id=role_id,
            secret_id=secret_id
        )
        self.client.token = response["auth"]["client_token"]
        self.token_lease_duration = response["auth"]["lease_duration"]
        self.token_renewable = response["auth"]["renewable"]
        print(f"Authenticated, token valid for {self.token_lease_duration}s")

    def read_secret(self, path: str, mount_point: str = "secret") -> dict:
        """Read a KV v2 secret."""
        response = self.client.secrets.kv.v2.read_secret_version(
            path=path,
            mount_point=mount_point
        )
        return response["data"]["data"]

    def write_secret(self, path: str, data: dict, mount_point: str = "secret"):
        """Write a KV v2 secret."""
        self.client.secrets.kv.v2.create_or_update_secret(
            path=path,
            secret=data,
            mount_point=mount_point
        )

# Usage
vault = VaultClient(
    vault_url=os.environ["VAULT_ADDR"],
    role_id=os.environ["VAULT_ROLE_ID"],
    secret_id=os.environ["VAULT_SECRET_ID"]
)
db_config = vault.read_secret("myapp/database")
print(f"DB host: {db_config['host']}, user: {db_config['username']}")

Dynamic database credentials

Instead of storing static passwords, Vault generates unique, time-limited database credentials for each requester:

class DynamicDatabaseCredentials:
    """Fetch and manage dynamic database credentials from Vault."""

    def __init__(self, vault_client: hvac.Client, db_role: str = "myapp-readonly"):
        self.client = vault_client
        self.db_role = db_role
        self._lease_id = None
        self._credentials = None
        self._expires_at = None

    def get_credentials(self) -> dict:
        """Get current credentials, fetching new ones if expired."""
        import time
        if self._credentials and self._expires_at and time.time() < self._expires_at:
            return self._credentials

        return self._fetch_new_credentials()

    def _fetch_new_credentials(self) -> dict:
        """Request fresh credentials from Vault's database secret engine."""
        import time
        response = self.client.secrets.databases.generate_credentials(
            name=self.db_role,
            mount_point="database"
        )

        self._credentials = {
            "username": response["data"]["username"],
            "password": response["data"]["password"]
        }
        self._lease_id = response["lease_id"]
        self._expires_at = time.time() + response["lease_duration"] - 30  # 30s buffer

        print(f"New DB credentials: user={self._credentials['username']}, "
              f"lease={response['lease_duration']}s")
        return self._credentials

    def renew_lease(self):
        """Extend the current credential lease."""
        if self._lease_id:
            self.client.sys.renew_lease(
                lease_id=self._lease_id,
                increment=3600  # Request 1 hour extension
            )
            print(f"Lease renewed: {self._lease_id[:20]}...")

    def revoke_on_shutdown(self):
        """Revoke credentials during graceful shutdown."""
        if self._lease_id:
            self.client.sys.revoke_lease(self._lease_id)
            print("Database credentials revoked")

Integrating dynamic credentials with SQLAlchemy

Combining Vault dynamic credentials with a connection pool that handles credential rotation:

from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool
import threading

class VaultManagedDatabase:
    """SQLAlchemy engine with Vault-managed rotating credentials."""

    def __init__(self, vault_client: hvac.Client, db_host: str,
                 db_name: str, db_role: str):
        self.creds = DynamicDatabaseCredentials(vault_client, db_role)
        self.db_host = db_host
        self.db_name = db_name
        self._engine = None
        self._renewal_thread = None

    def get_engine(self):
        """Create engine with current credentials."""
        if self._engine:
            return self._engine

        creds = self.creds.get_credentials()
        url = (f"postgresql://{creds['username']}:{creds['password']}"
               f"@{self.db_host}/{self.db_name}")

        self._engine = create_engine(
            url,
            pool_size=5,
            pool_recycle=1800,   # Recycle connections before credential expiry
            pool_pre_ping=True   # Verify connections are alive
        )

        # Handle credential expiry by recreating connections
        @event.listens_for(self._engine, "connect")
        def on_connect(dbapi_connection, connection_record):
            connection_record.info["created_at"] = __import__("time").time()

        self._start_renewal_loop()
        return self._engine

    def _start_renewal_loop(self):
        """Background thread to renew credentials before expiry."""
        def renewal_loop():
            import time
            while True:
                time.sleep(300)  # Check every 5 minutes
                try:
                    self.creds.renew_lease()
                except Exception as e:
                    print(f"Lease renewal failed: {e}")
                    # Fetch new credentials and recreate engine
                    self._recreate_engine()

        self._renewal_thread = threading.Thread(target=renewal_loop, daemon=True)
        self._renewal_thread.start()

    def _recreate_engine(self):
        """Recreate engine with fresh credentials."""
        if self._engine:
            self._engine.dispose()  # Close all existing connections
        self._engine = None
        self.get_engine()  # Rebuild with new credentials

    def shutdown(self):
        """Graceful shutdown: revoke credentials."""
        if self._engine:
            self._engine.dispose()
        self.creds.revoke_on_shutdown()

Transit encryption — encrypt without managing keys

Vault’s Transit engine provides encryption-as-a-service. Your application sends plaintext to Vault and receives ciphertext. The encryption key never leaves Vault.

class TransitEncryption:
    """Encrypt/decrypt data using Vault's Transit secret engine."""

    def __init__(self, vault_client: hvac.Client, key_name: str = "myapp-data"):
        self.client = vault_client
        self.key_name = key_name

    def encrypt(self, plaintext: str) -> str:
        """Encrypt plaintext string, returns Vault ciphertext."""
        import base64
        encoded = base64.b64encode(plaintext.encode()).decode()
        response = self.client.secrets.transit.encrypt_data(
            name=self.key_name,
            plaintext=encoded,
            mount_point="transit"
        )
        return response["data"]["ciphertext"]  # vault:v1:xxxxx

    def decrypt(self, ciphertext: str) -> str:
        """Decrypt Vault ciphertext back to plaintext."""
        import base64
        response = self.client.secrets.transit.decrypt_data(
            name=self.key_name,
            ciphertext=ciphertext,
            mount_point="transit"
        )
        return base64.b64decode(response["data"]["plaintext"]).decode()

    def rotate_key(self):
        """Rotate the encryption key. Old ciphertexts remain decryptable."""
        self.client.secrets.transit.rotate_key(
            name=self.key_name,
            mount_point="transit"
        )
        print(f"Key '{self.key_name}' rotated to new version")

    def rewrap(self, ciphertext: str) -> str:
        """Re-encrypt with latest key version without exposing plaintext."""
        response = self.client.secrets.transit.rewrap_data(
            name=self.key_name,
            ciphertext=ciphertext,
            mount_point="transit"
        )
        return response["data"]["ciphertext"]

# Usage
transit = TransitEncryption(vault.client)
encrypted = transit.encrypt("sensitive patient data")
print(f"Encrypted: {encrypted}")  # vault:v1:8SDd3...
decrypted = transit.decrypt(encrypted)
print(f"Decrypted: {decrypted}")  # sensitive patient data

Kubernetes integration with Vault Agent Injector

In Kubernetes, the Vault Agent Injector automatically adds a sidecar that handles authentication and secret injection:

# deployment.yaml with Vault annotations
apiVersion: apps/v1
kind: Deployment
metadata:
  name: myapp
spec:
  template:
    metadata:
      annotations:
        vault.hashicorp.com/agent-inject: "true"
        vault.hashicorp.com/role: "myapp"
        vault.hashicorp.com/agent-inject-secret-db: "database/creds/myapp-role"
        vault.hashicorp.com/agent-inject-template-db: |
          {{- with secret "database/creds/myapp-role" -}}
          DB_USER={{ .Data.username }}
          DB_PASS={{ .Data.password }}
          {{- end }}
    spec:
      serviceAccountName: myapp
      containers:
        - name: myapp
          image: myapp:latest

The Python application reads injected secrets from files:

import os

def read_vault_injected_secret(secret_name: str) -> str:
    """Read secret injected by Vault Agent sidecar."""
    secret_path = f"/vault/secrets/{secret_name}"
    with open(secret_path) as f:
        return f.read().strip()

# Parse key=value format from template
def parse_vault_env_file(filename: str) -> dict:
    """Parse Vault Agent template output as environment variables."""
    config = {}
    secret_path = f"/vault/secrets/{filename}"
    if os.path.exists(secret_path):
        with open(secret_path) as f:
            for line in f:
                line = line.strip()
                if "=" in line and not line.startswith("#"):
                    key, value = line.split("=", 1)
                    config[key] = value
    return config

# Use in application startup
db_config = parse_vault_env_file("db")
db_user = db_config.get("DB_USER")
db_pass = db_config.get("DB_PASS")

Multi-cloud secret access pattern

When running across AWS, GCP, and on-prem, abstract vault access behind a provider interface:

from abc import ABC, abstractmethod

class SecretProvider(ABC):
    @abstractmethod
    def get_secret(self, name: str) -> str: ...

class HashiCorpVaultProvider(SecretProvider):
    def __init__(self, client: hvac.Client, mount: str = "secret"):
        self.client = client
        self.mount = mount

    def get_secret(self, name: str) -> str:
        data = self.client.secrets.kv.v2.read_secret_version(
            path=name, mount_point=self.mount
        )
        return data["data"]["data"]

class AWSSecretsManagerProvider(SecretProvider):
    def __init__(self):
        import boto3
        self.client = boto3.client("secretsmanager")

    def get_secret(self, name: str) -> str:
        import json
        response = self.client.get_secret_value(SecretId=name)
        return json.loads(response["SecretString"])

class GCPSecretManagerProvider(SecretProvider):
    def __init__(self, project_id: str):
        from google.cloud import secretmanager
        self.client = secretmanager.SecretManagerServiceClient()
        self.project = project_id

    def get_secret(self, name: str) -> str:
        resource = f"projects/{self.project}/secrets/{name}/versions/latest"
        response = self.client.access_secret_version(request={"name": resource})
        return response.payload.data.decode()

def get_secret_provider() -> SecretProvider:
    """Factory: select provider based on environment."""
    provider = os.environ.get("SECRET_PROVIDER", "vault")
    if provider == "aws":
        return AWSSecretsManagerProvider()
    elif provider == "gcp":
        return GCPSecretManagerProvider(os.environ["GCP_PROJECT"])
    else:
        client = hvac.Client(url=os.environ["VAULT_ADDR"], token=os.environ["VAULT_TOKEN"])
        return HashiCorpVaultProvider(client)

The one thing to remember: Production Vault integration in Python centers on three patterns — AppRole authentication with hvac for service identity, dynamic credentials with lease renewal for databases, and Transit encryption for data protection without key management — with Kubernetes sidecar injection providing the simplest deployment model.

pythonsecurityvaultsecrets-management

See Also