Data Retention Policies in Python — Deep Dive
Declarative retention policy engine
A production retention system needs a policy engine that’s configured declaratively — not buried in application code where it’s hard to audit or modify.
from dataclasses import dataclass
from datetime import timedelta, datetime
from enum import Enum
from typing import Callable, Any
class RetentionAction(Enum):
DELETE = "delete"
ANONYMIZE = "anonymize"
ARCHIVE = "archive"
@dataclass
class RetentionPolicy:
name: str
table: str
timestamp_column: str
retention_period: timedelta
action: RetentionAction
legal_basis: str
conditions: str | None = None # SQL WHERE clause for scoping
batch_size: int = 5000
anonymize_columns: list[str] | None = None
# Define policies as data, not code
POLICIES = [
RetentionPolicy(
name="session_logs",
table="session_logs",
timestamp_column="created_at",
retention_period=timedelta(days=90),
action=RetentionAction.DELETE,
legal_basis="Legitimate interest (security monitoring)",
),
RetentionPolicy(
name="analytics_events",
table="analytics_events",
timestamp_column="event_time",
retention_period=timedelta(days=395), # 13 months
action=RetentionAction.DELETE,
legal_basis="Consent (analytics)",
),
RetentionPolicy(
name="purchase_records",
table="orders",
timestamp_column="created_at",
retention_period=timedelta(days=2557), # ~7 years
action=RetentionAction.ANONYMIZE,
legal_basis="Legal obligation (tax records)",
anonymize_columns=["customer_name", "email", "shipping_address", "phone"],
),
RetentionPolicy(
name="support_tickets",
table="support_tickets",
timestamp_column="resolved_at",
retention_period=timedelta(days=730),
action=RetentionAction.DELETE,
legal_basis="Legitimate interest (service improvement)",
conditions="resolved_at IS NOT NULL", # only purge resolved tickets
),
RetentionPolicy(
name="abandoned_carts",
table="shopping_carts",
timestamp_column="updated_at",
retention_period=timedelta(days=30),
action=RetentionAction.DELETE,
legal_basis="Contract (service provision)",
conditions="status = 'abandoned'",
),
]
Batch purge executor
The executor processes each policy in configurable batches, respecting legal holds and logging every action for audit:
import logging
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger("retention")
@dataclass
class PurgeResult:
policy_name: str
action: RetentionAction
records_processed: int
records_skipped_legal_hold: int
duration_seconds: float
errors: list[str]
class RetentionExecutor:
def __init__(self, session: AsyncSession, legal_hold_checker):
self.session = session
self.legal_hold = legal_hold_checker
async def execute_policy(self, policy: RetentionPolicy) -> PurgeResult:
import time
start = time.monotonic()
cutoff = datetime.utcnow() - policy.retention_period
errors = []
total_processed = 0
total_held = 0
while True:
# Find candidates
where_clause = f"{policy.timestamp_column} < :cutoff"
if policy.conditions:
where_clause += f" AND ({policy.conditions})"
select_sql = text(
f"SELECT id FROM {policy.table} "
f"WHERE {where_clause} "
f"LIMIT :batch_size"
)
result = await self.session.execute(
select_sql, {"cutoff": cutoff, "batch_size": policy.batch_size}
)
candidate_ids = [row[0] for row in result.fetchall()]
if not candidate_ids:
break
# Filter out legal holds
held_ids = await self.legal_hold.check_ids(
policy.table, candidate_ids
)
actionable_ids = [
id_ for id_ in candidate_ids if id_ not in held_ids
]
total_held += len(held_ids)
if not actionable_ids:
break
try:
if policy.action == RetentionAction.DELETE:
await self._delete_batch(policy.table, actionable_ids)
elif policy.action == RetentionAction.ANONYMIZE:
await self._anonymize_batch(
policy.table, actionable_ids,
policy.anonymize_columns or []
)
elif policy.action == RetentionAction.ARCHIVE:
await self._archive_batch(policy.table, actionable_ids)
total_processed += len(actionable_ids)
await self.session.commit()
except Exception as e:
errors.append(str(e))
await self.session.rollback()
logger.error(f"Retention error for {policy.name}: {e}")
break
duration = time.monotonic() - start
logger.info(
f"Retention: {policy.name} | action={policy.action.value} | "
f"processed={total_processed} | held={total_held} | "
f"duration={duration:.1f}s"
)
return PurgeResult(
policy_name=policy.name,
action=policy.action,
records_processed=total_processed,
records_skipped_legal_hold=total_held,
duration_seconds=round(duration, 2),
errors=errors,
)
async def _delete_batch(self, table: str, ids: list) -> None:
placeholders = ", ".join(f":id_{i}" for i in range(len(ids)))
params = {f"id_{i}": id_ for i, id_ in enumerate(ids)}
await self.session.execute(
text(f"DELETE FROM {table} WHERE id IN ({placeholders})"),
params,
)
async def _anonymize_batch(
self, table: str, ids: list, columns: list[str]
) -> None:
set_clause = ", ".join(f"{col} = '[ANONYMIZED]'" for col in columns)
placeholders = ", ".join(f":id_{i}" for i in range(len(ids)))
params = {f"id_{i}": id_ for i, id_ in enumerate(ids)}
await self.session.execute(
text(
f"UPDATE {table} SET {set_clause} "
f"WHERE id IN ({placeholders})"
),
params,
)
async def _archive_batch(self, table: str, ids: list) -> None:
# Move to archive table, then delete from primary
placeholders = ", ".join(f":id_{i}" for i in range(len(ids)))
params = {f"id_{i}": id_ for i, id_ in enumerate(ids)}
await self.session.execute(
text(
f"INSERT INTO {table}_archive SELECT * FROM {table} "
f"WHERE id IN ({placeholders})"
),
params,
)
await self._delete_batch(table, ids)
Legal hold management
Legal holds freeze retention for records involved in litigation, regulatory investigations, or audit proceedings. The hold must override automated deletion:
from datetime import datetime
@dataclass
class LegalHold:
id: str
reason: str
tables: list[str]
record_ids: dict[str, list[str]] # table -> list of record IDs
created_by: str
created_at: datetime
expires_at: datetime | None = None # None = indefinite
class LegalHoldManager:
def __init__(self, session: AsyncSession):
self.session = session
async def create_hold(self, hold: LegalHold) -> None:
"""Create a legal hold preventing retention of specified records."""
await self.session.execute(
text(
"INSERT INTO legal_holds (id, reason, tables, record_ids, "
"created_by, created_at, expires_at) "
"VALUES (:id, :reason, :tables, :record_ids, "
":created_by, :created_at, :expires_at)"
),
{
"id": hold.id,
"reason": hold.reason,
"tables": ",".join(hold.tables),
"record_ids": json.dumps(hold.record_ids),
"created_by": hold.created_by,
"created_at": hold.created_at,
"expires_at": hold.expires_at,
},
)
logger.info(f"Legal hold created: {hold.id} by {hold.created_by}")
async def check_ids(self, table: str, ids: list[str]) -> set[str]:
"""Return IDs that are under legal hold and must not be purged."""
result = await self.session.execute(
text(
"SELECT record_ids FROM legal_holds "
"WHERE :table = ANY(string_to_array(tables, ',')) "
"AND (expires_at IS NULL OR expires_at > NOW())"
),
{"table": table},
)
held_ids = set()
for row in result.fetchall():
record_map = json.loads(row[0])
held_ids.update(record_map.get(table, []))
return held_ids.intersection(ids)
Scheduling with APScheduler
Run retention jobs during off-peak hours with monitoring:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
async def nightly_retention_run():
"""Execute all retention policies and report results."""
async with get_session() as session:
hold_manager = LegalHoldManager(session)
executor = RetentionExecutor(session, hold_manager)
results = []
for policy in POLICIES:
result = await executor.execute_policy(policy)
results.append(result)
# Generate summary report
total_processed = sum(r.records_processed for r in results)
total_held = sum(r.records_skipped_legal_hold for r in results)
total_errors = sum(len(r.errors) for r in results)
logger.info(
f"Retention run complete: {total_processed} records processed, "
f"{total_held} held, {total_errors} errors"
)
# Store run metadata for compliance reporting
await session.execute(
text(
"INSERT INTO retention_run_log "
"(run_date, total_processed, total_held, total_errors, details) "
"VALUES (:date, :processed, :held, :errors, :details)"
),
{
"date": datetime.utcnow(),
"processed": total_processed,
"held": total_held,
"errors": total_errors,
"details": json.dumps([
{
"policy": r.policy_name,
"action": r.action.value,
"processed": r.records_processed,
"held": r.records_skipped_legal_hold,
"duration": r.duration_seconds,
"errors": r.errors,
}
for r in results
]),
},
)
await session.commit()
scheduler = AsyncIOScheduler()
scheduler.add_job(
nightly_retention_run,
CronTrigger(hour=3, minute=0), # 3:00 AM daily
id="retention_run",
max_instances=1,
misfire_grace_time=3600,
)
scheduler.start()
Cross-service deletion with events
In a microservice architecture, retention can’t be handled by a single database query. Use an event-driven approach:
from dataclasses import dataclass
import json
@dataclass
class RetentionEvent:
event_type: str # "delete" | "anonymize"
entity_type: str # "user" | "order" | etc.
entity_id: str
policy_name: str
initiated_at: str
async def publish_retention_event(event: RetentionEvent, publisher):
"""Publish retention events for other services to consume."""
await publisher.publish(
topic="data.retention",
message=json.dumps({
"type": event.event_type,
"entity_type": event.entity_type,
"entity_id": event.entity_id,
"policy": event.policy_name,
"initiated_at": event.initiated_at,
}),
)
# Each microservice subscribes to retention events
async def handle_retention_event(message: dict):
"""Handler in downstream service (e.g., notification service)."""
if message["entity_type"] == "user":
user_id = message["entity_id"]
if message["type"] == "delete":
await delete_user_notifications(user_id)
await delete_user_preferences(user_id)
elif message["type"] == "anonymize":
await anonymize_user_notifications(user_id)
Monitoring and compliance reporting
Track retention health with metrics:
from prometheus_client import Counter, Gauge, Histogram
retention_records_processed = Counter(
"retention_records_processed_total",
"Total records processed by retention",
["policy", "action"],
)
retention_records_held = Counter(
"retention_records_held_total",
"Records skipped due to legal hold",
["policy"],
)
retention_duration = Histogram(
"retention_run_duration_seconds",
"Duration of retention policy execution",
["policy"],
)
oldest_unprocessed = Gauge(
"retention_oldest_unprocessed_days",
"Age in days of oldest record that should have been purged",
["policy"],
)
The oldest_unprocessed gauge is the most important — if it’s growing, retention is falling behind. Set an alert when any policy’s oldest unpurged record exceeds its retention period plus a grace window.
Tradeoffs
Granularity vs. complexity: Per-table policies are simple. Per-column policies (anonymize email but keep order total) are more precise but significantly more complex to implement and audit.
Deletion speed vs. system impact: Deleting millions of rows in one transaction locks tables. Batch processing with commits between batches is slower overall but doesn’t impact production traffic. Size batches based on your database’s write throughput.
Compliance vs. analytics: Data scientists want maximum history. Privacy requires minimum retention. The compromise is aggregation pipelines that compute and store statistics before raw data is purged. Run aggregation jobs before retention jobs in your nightly schedule.
Backup lag: If your backup retention is 30 days, a user’s data persists in backups for up to 30 days after purge. Document this as your maximum deletion delay. Some regulators accept this; others may require shorter backup cycles.
The one thing to remember: Production data retention requires a declarative policy engine with legal hold overrides, batch purge execution with audit logging, cross-service event propagation in microservice architectures, and continuous monitoring to ensure policies are actually being enforced.
See Also
- Python Compliance Audit Trails Why your Python app needs a tamper-proof diary that records every important action — like a security camera for your data
- Python Consent Management How Python apps ask permission like a polite guest — and remember exactly what you said yes and no to
- Python Data Anonymization How Python can disguise personal information so well that nobody — not even the original collector — can figure out who it belongs to
- Python Differential Privacy How adding a pinch of random noise to data lets companies learn from millions of people without knowing anything about any single person
- Python Gdpr Compliance Why Europe's privacy law is like a restaurant that must tell you every ingredient — and how Python apps follow the recipe