Data Retention Policies in Python — Deep Dive

Declarative retention policy engine

A production retention system needs a policy engine that’s configured declaratively — not buried in application code where it’s hard to audit or modify.

from dataclasses import dataclass
from datetime import timedelta, datetime
from enum import Enum
from typing import Callable, Any

class RetentionAction(Enum):
    DELETE = "delete"
    ANONYMIZE = "anonymize"
    ARCHIVE = "archive"

@dataclass
class RetentionPolicy:
    name: str
    table: str
    timestamp_column: str
    retention_period: timedelta
    action: RetentionAction
    legal_basis: str
    conditions: str | None = None  # SQL WHERE clause for scoping
    batch_size: int = 5000
    anonymize_columns: list[str] | None = None

# Define policies as data, not code
POLICIES = [
    RetentionPolicy(
        name="session_logs",
        table="session_logs",
        timestamp_column="created_at",
        retention_period=timedelta(days=90),
        action=RetentionAction.DELETE,
        legal_basis="Legitimate interest (security monitoring)",
    ),
    RetentionPolicy(
        name="analytics_events",
        table="analytics_events",
        timestamp_column="event_time",
        retention_period=timedelta(days=395),  # 13 months
        action=RetentionAction.DELETE,
        legal_basis="Consent (analytics)",
    ),
    RetentionPolicy(
        name="purchase_records",
        table="orders",
        timestamp_column="created_at",
        retention_period=timedelta(days=2557),  # ~7 years
        action=RetentionAction.ANONYMIZE,
        legal_basis="Legal obligation (tax records)",
        anonymize_columns=["customer_name", "email", "shipping_address", "phone"],
    ),
    RetentionPolicy(
        name="support_tickets",
        table="support_tickets",
        timestamp_column="resolved_at",
        retention_period=timedelta(days=730),
        action=RetentionAction.DELETE,
        legal_basis="Legitimate interest (service improvement)",
        conditions="resolved_at IS NOT NULL",  # only purge resolved tickets
    ),
    RetentionPolicy(
        name="abandoned_carts",
        table="shopping_carts",
        timestamp_column="updated_at",
        retention_period=timedelta(days=30),
        action=RetentionAction.DELETE,
        legal_basis="Contract (service provision)",
        conditions="status = 'abandoned'",
    ),
]

Batch purge executor

The executor processes each policy in configurable batches, respecting legal holds and logging every action for audit:

import logging
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

logger = logging.getLogger("retention")

@dataclass
class PurgeResult:
    policy_name: str
    action: RetentionAction
    records_processed: int
    records_skipped_legal_hold: int
    duration_seconds: float
    errors: list[str]

class RetentionExecutor:
    def __init__(self, session: AsyncSession, legal_hold_checker):
        self.session = session
        self.legal_hold = legal_hold_checker
    
    async def execute_policy(self, policy: RetentionPolicy) -> PurgeResult:
        import time
        start = time.monotonic()
        cutoff = datetime.utcnow() - policy.retention_period
        errors = []
        total_processed = 0
        total_held = 0
        
        while True:
            # Find candidates
            where_clause = f"{policy.timestamp_column} < :cutoff"
            if policy.conditions:
                where_clause += f" AND ({policy.conditions})"
            
            select_sql = text(
                f"SELECT id FROM {policy.table} "
                f"WHERE {where_clause} "
                f"LIMIT :batch_size"
            )
            
            result = await self.session.execute(
                select_sql, {"cutoff": cutoff, "batch_size": policy.batch_size}
            )
            candidate_ids = [row[0] for row in result.fetchall()]
            
            if not candidate_ids:
                break
            
            # Filter out legal holds
            held_ids = await self.legal_hold.check_ids(
                policy.table, candidate_ids
            )
            actionable_ids = [
                id_ for id_ in candidate_ids if id_ not in held_ids
            ]
            total_held += len(held_ids)
            
            if not actionable_ids:
                break
            
            try:
                if policy.action == RetentionAction.DELETE:
                    await self._delete_batch(policy.table, actionable_ids)
                elif policy.action == RetentionAction.ANONYMIZE:
                    await self._anonymize_batch(
                        policy.table, actionable_ids, 
                        policy.anonymize_columns or []
                    )
                elif policy.action == RetentionAction.ARCHIVE:
                    await self._archive_batch(policy.table, actionable_ids)
                
                total_processed += len(actionable_ids)
                await self.session.commit()
                
            except Exception as e:
                errors.append(str(e))
                await self.session.rollback()
                logger.error(f"Retention error for {policy.name}: {e}")
                break
        
        duration = time.monotonic() - start
        
        logger.info(
            f"Retention: {policy.name} | action={policy.action.value} | "
            f"processed={total_processed} | held={total_held} | "
            f"duration={duration:.1f}s"
        )
        
        return PurgeResult(
            policy_name=policy.name,
            action=policy.action,
            records_processed=total_processed,
            records_skipped_legal_hold=total_held,
            duration_seconds=round(duration, 2),
            errors=errors,
        )
    
    async def _delete_batch(self, table: str, ids: list) -> None:
        placeholders = ", ".join(f":id_{i}" for i in range(len(ids)))
        params = {f"id_{i}": id_ for i, id_ in enumerate(ids)}
        await self.session.execute(
            text(f"DELETE FROM {table} WHERE id IN ({placeholders})"),
            params,
        )
    
    async def _anonymize_batch(
        self, table: str, ids: list, columns: list[str]
    ) -> None:
        set_clause = ", ".join(f"{col} = '[ANONYMIZED]'" for col in columns)
        placeholders = ", ".join(f":id_{i}" for i in range(len(ids)))
        params = {f"id_{i}": id_ for i, id_ in enumerate(ids)}
        await self.session.execute(
            text(
                f"UPDATE {table} SET {set_clause} "
                f"WHERE id IN ({placeholders})"
            ),
            params,
        )
    
    async def _archive_batch(self, table: str, ids: list) -> None:
        # Move to archive table, then delete from primary
        placeholders = ", ".join(f":id_{i}" for i in range(len(ids)))
        params = {f"id_{i}": id_ for i, id_ in enumerate(ids)}
        await self.session.execute(
            text(
                f"INSERT INTO {table}_archive SELECT * FROM {table} "
                f"WHERE id IN ({placeholders})"
            ),
            params,
        )
        await self._delete_batch(table, ids)

Legal holds freeze retention for records involved in litigation, regulatory investigations, or audit proceedings. The hold must override automated deletion:

from datetime import datetime

@dataclass
class LegalHold:
    id: str
    reason: str
    tables: list[str]
    record_ids: dict[str, list[str]]  # table -> list of record IDs
    created_by: str
    created_at: datetime
    expires_at: datetime | None = None  # None = indefinite

class LegalHoldManager:
    def __init__(self, session: AsyncSession):
        self.session = session
    
    async def create_hold(self, hold: LegalHold) -> None:
        """Create a legal hold preventing retention of specified records."""
        await self.session.execute(
            text(
                "INSERT INTO legal_holds (id, reason, tables, record_ids, "
                "created_by, created_at, expires_at) "
                "VALUES (:id, :reason, :tables, :record_ids, "
                ":created_by, :created_at, :expires_at)"
            ),
            {
                "id": hold.id,
                "reason": hold.reason,
                "tables": ",".join(hold.tables),
                "record_ids": json.dumps(hold.record_ids),
                "created_by": hold.created_by,
                "created_at": hold.created_at,
                "expires_at": hold.expires_at,
            },
        )
        logger.info(f"Legal hold created: {hold.id} by {hold.created_by}")
    
    async def check_ids(self, table: str, ids: list[str]) -> set[str]:
        """Return IDs that are under legal hold and must not be purged."""
        result = await self.session.execute(
            text(
                "SELECT record_ids FROM legal_holds "
                "WHERE :table = ANY(string_to_array(tables, ',')) "
                "AND (expires_at IS NULL OR expires_at > NOW())"
            ),
            {"table": table},
        )
        
        held_ids = set()
        for row in result.fetchall():
            record_map = json.loads(row[0])
            held_ids.update(record_map.get(table, []))
        
        return held_ids.intersection(ids)

Scheduling with APScheduler

Run retention jobs during off-peak hours with monitoring:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger

async def nightly_retention_run():
    """Execute all retention policies and report results."""
    async with get_session() as session:
        hold_manager = LegalHoldManager(session)
        executor = RetentionExecutor(session, hold_manager)
        
        results = []
        for policy in POLICIES:
            result = await executor.execute_policy(policy)
            results.append(result)
        
        # Generate summary report
        total_processed = sum(r.records_processed for r in results)
        total_held = sum(r.records_skipped_legal_hold for r in results)
        total_errors = sum(len(r.errors) for r in results)
        
        logger.info(
            f"Retention run complete: {total_processed} records processed, "
            f"{total_held} held, {total_errors} errors"
        )
        
        # Store run metadata for compliance reporting
        await session.execute(
            text(
                "INSERT INTO retention_run_log "
                "(run_date, total_processed, total_held, total_errors, details) "
                "VALUES (:date, :processed, :held, :errors, :details)"
            ),
            {
                "date": datetime.utcnow(),
                "processed": total_processed,
                "held": total_held,
                "errors": total_errors,
                "details": json.dumps([
                    {
                        "policy": r.policy_name,
                        "action": r.action.value,
                        "processed": r.records_processed,
                        "held": r.records_skipped_legal_hold,
                        "duration": r.duration_seconds,
                        "errors": r.errors,
                    }
                    for r in results
                ]),
            },
        )
        await session.commit()

scheduler = AsyncIOScheduler()
scheduler.add_job(
    nightly_retention_run,
    CronTrigger(hour=3, minute=0),  # 3:00 AM daily
    id="retention_run",
    max_instances=1,
    misfire_grace_time=3600,
)
scheduler.start()

Cross-service deletion with events

In a microservice architecture, retention can’t be handled by a single database query. Use an event-driven approach:

from dataclasses import dataclass
import json

@dataclass 
class RetentionEvent:
    event_type: str  # "delete" | "anonymize"
    entity_type: str  # "user" | "order" | etc.
    entity_id: str
    policy_name: str
    initiated_at: str

async def publish_retention_event(event: RetentionEvent, publisher):
    """Publish retention events for other services to consume."""
    await publisher.publish(
        topic="data.retention",
        message=json.dumps({
            "type": event.event_type,
            "entity_type": event.entity_type,
            "entity_id": event.entity_id,
            "policy": event.policy_name,
            "initiated_at": event.initiated_at,
        }),
    )

# Each microservice subscribes to retention events
async def handle_retention_event(message: dict):
    """Handler in downstream service (e.g., notification service)."""
    if message["entity_type"] == "user":
        user_id = message["entity_id"]
        if message["type"] == "delete":
            await delete_user_notifications(user_id)
            await delete_user_preferences(user_id)
        elif message["type"] == "anonymize":
            await anonymize_user_notifications(user_id)

Monitoring and compliance reporting

Track retention health with metrics:

from prometheus_client import Counter, Gauge, Histogram

retention_records_processed = Counter(
    "retention_records_processed_total",
    "Total records processed by retention",
    ["policy", "action"],
)

retention_records_held = Counter(
    "retention_records_held_total",
    "Records skipped due to legal hold",
    ["policy"],
)

retention_duration = Histogram(
    "retention_run_duration_seconds",
    "Duration of retention policy execution",
    ["policy"],
)

oldest_unprocessed = Gauge(
    "retention_oldest_unprocessed_days",
    "Age in days of oldest record that should have been purged",
    ["policy"],
)

The oldest_unprocessed gauge is the most important — if it’s growing, retention is falling behind. Set an alert when any policy’s oldest unpurged record exceeds its retention period plus a grace window.

Tradeoffs

Granularity vs. complexity: Per-table policies are simple. Per-column policies (anonymize email but keep order total) are more precise but significantly more complex to implement and audit.

Deletion speed vs. system impact: Deleting millions of rows in one transaction locks tables. Batch processing with commits between batches is slower overall but doesn’t impact production traffic. Size batches based on your database’s write throughput.

Compliance vs. analytics: Data scientists want maximum history. Privacy requires minimum retention. The compromise is aggregation pipelines that compute and store statistics before raw data is purged. Run aggregation jobs before retention jobs in your nightly schedule.

Backup lag: If your backup retention is 30 days, a user’s data persists in backups for up to 30 days after purge. Document this as your maximum deletion delay. Some regulators accept this; others may require shorter backup cycles.

The one thing to remember: Production data retention requires a declarative policy engine with legal hold overrides, batch purge execution with audit logging, cross-service event propagation in microservice architectures, and continuous monitoring to ensure policies are actually being enforced.

pythonprivacydata-retentioncompliance

See Also

  • Python Compliance Audit Trails Why your Python app needs a tamper-proof diary that records every important action — like a security camera for your data
  • Python Consent Management How Python apps ask permission like a polite guest — and remember exactly what you said yes and no to
  • Python Data Anonymization How Python can disguise personal information so well that nobody — not even the original collector — can figure out who it belongs to
  • Python Differential Privacy How adding a pinch of random noise to data lets companies learn from millions of people without knowing anything about any single person
  • Python Gdpr Compliance Why Europe's privacy law is like a restaurant that must tell you every ingredient — and how Python apps follow the recipe