Python Push Notifications — Deep Dive
System-level framing
A production notification system must handle three distinct push protocols (APNs, FCM, Web Push), manage millions of device tokens with constant churn, respect per-service rate limits, and degrade gracefully when push services are unavailable. The Python backend is typically the orchestration layer — it decides who gets notified, constructs payloads per platform, and dispatches requests through the appropriate channel.
APNs with Python
Authentication
Apple supports two authentication methods:
Token-based (JWT) — preferred for most deployments:
import jwt
import time
def create_apns_token(key_id: str, team_id: str, private_key: str) -> str:
payload = {
"iss": team_id,
"iat": int(time.time()),
}
headers = {
"alg": "ES256",
"kid": key_id,
}
return jwt.encode(payload, private_key, algorithm="ES256", headers=headers)
Tokens are valid for up to 60 minutes. Cache and rotate them to avoid signing on every request.
Certificate-based — uses a .p12 or .pem file for mutual TLS. Simpler setup but certificates expire annually.
Sending via HTTP/2
APNs requires HTTP/2. The httpx library handles this natively:
import httpx
async def send_apns(
device_token: str,
payload: dict,
auth_token: str,
bundle_id: str,
production: bool = True,
):
base = "https://api.push.apple.com" if production else "https://api.sandbox.push.apple.com"
url = f"{base}/3/device/{device_token}"
headers = {
"authorization": f"bearer {auth_token}",
"apns-topic": bundle_id,
"apns-push-type": "alert",
"apns-priority": "10",
"apns-expiration": "0",
}
async with httpx.AsyncClient(http2=True) as client:
response = await client.post(url, json={"aps": payload}, headers=headers)
if response.status_code == 410:
await mark_token_invalid(device_token)
elif response.status_code != 200:
raise APNsError(response.status_code, response.json())
Key headers:
apns-push-type— “alert”, “background”, “voip”, or “complication”apns-priority— 10 for immediate, 5 for power-conscious deliveryapns-expiration— 0 means do not store if device is offline
APNs payload structure
{
"aps": {
"alert": {
"title": "New Message",
"body": "Hey, are you coming tonight?",
"loc-key": "MSG_FORMAT",
"loc-args": ["Sarah"]
},
"badge": 3,
"sound": "default",
"mutable-content": 1
},
"custom_data": {
"conversation_id": "abc123"
}
}
The total payload must not exceed 4 KB. Use mutable-content when your app’s notification service extension needs to modify the notification before display (for example, downloading an image).
FCM with Python
Authentication with service accounts
from google.oauth2 import service_account
from google.auth.transport.requests import Request
credentials = service_account.Credentials.from_service_account_file(
"service-account.json",
scopes=["https://www.googleapis.com/auth/firebase.messaging"],
)
credentials.refresh(Request())
access_token = credentials.token
Sending via FCM v1 API
The legacy API is deprecated. Use the v1 HTTP API:
import httpx
async def send_fcm(
device_token: str,
title: str,
body: str,
data: dict | None = None,
project_id: str = "my-project",
):
url = f"https://fcm.googleapis.com/v1/projects/{project_id}/messages:send"
message = {
"message": {
"token": device_token,
"notification": {"title": title, "body": body},
"android": {
"priority": "high",
"notification": {"channel_id": "default"},
},
"apns": {
"payload": {"aps": {"badge": 1, "sound": "default"}},
},
}
}
if data:
message["message"]["data"] = {k: str(v) for k, v in data.items()}
async with httpx.AsyncClient() as client:
response = await client.post(
url,
json=message,
headers={"Authorization": f"Bearer {access_token}"},
)
if response.status_code == 404:
await mark_token_invalid(device_token)
return response.json()
FCM v1 supports platform-specific overrides in a single request, so you can target Android and iOS with one call.
Topic messaging
message = {
"message": {
"topic": "breaking-news",
"notification": {"title": "Breaking", "body": "Major event occurred"},
}
}
Topics scale to millions of subscribers without managing individual tokens for broadcasts.
Web Push with Python
VAPID key generation
from py_vapid import Vapid
vapid = Vapid()
vapid.generate_keys()
vapid.save_key("private_key.pem")
vapid.save_public_key("public_key.pem")
Sending web push
from pywebpush import webpush
def send_web_push(subscription_info: dict, message: str, vapid_private_key: str):
webpush(
subscription_info=subscription_info,
data=message,
vapid_private_key=vapid_private_key,
vapid_claims={"sub": "mailto:admin@example.com"},
ttl=86400,
)
The subscription_info comes from the browser’s Push API and contains the endpoint URL, public key, and auth secret. All payloads are encrypted end-to-end using the browser’s keys.
Token lifecycle management
from datetime import datetime, timedelta
class TokenManager:
async def register_token(self, user_id: str, token: str, platform: str):
await self.db.tokens.upsert({
"user_id": user_id,
"token": token,
"platform": platform,
"registered_at": datetime.utcnow(),
"last_success": None,
"failure_count": 0,
})
async def handle_send_result(self, token: str, success: bool):
if success:
await self.db.tokens.update(
{"token": token},
{"last_success": datetime.utcnow(), "failure_count": 0},
)
else:
await self.db.tokens.increment({"token": token}, "failure_count")
async def cleanup_stale_tokens(self):
cutoff = datetime.utcnow() - timedelta(days=30)
await self.db.tokens.delete_many({
"$or": [
{"failure_count": {"$gte": 3}},
{"last_success": {"$lt": cutoff}, "last_success": {"$ne": None}},
{"registered_at": {"$lt": cutoff}, "last_success": None},
]
})
Run cleanup daily. Tokens that fail three times consecutively or have not succeeded in 30 days are almost certainly dead.
Scaling notification dispatch
For high-volume systems (millions of notifications per hour), the synchronous request-per-token approach breaks down:
Queue-based architecture:
Event trigger → Task queue (Celery/RQ) → Worker pool → Push services
↓
Feedback processor → Token cleanup
Batching: FCM supports sending to up to 500 tokens in one request via the batch API. APNs benefits from HTTP/2 multiplexing — keep connections alive and send multiple requests over a single connection.
Rate limiting per service:
| Service | Rate limit |
|---|---|
| APNs | No published hard limit, but connection-based throttling occurs |
| FCM | 600K messages/minute per project (adjustable) |
| Web Push | Varies by browser vendor endpoint |
Connection pooling: Reuse HTTP/2 connections to APNs. Opening a new TLS+HTTP/2 connection takes 100-300ms. A connection pool with 5-10 persistent connections handles most workloads.
Notification priority and user experience
Not every notification deserves to buzz the user’s pocket:
class NotificationPriority:
CRITICAL = "critical" # Immediate delivery, bypasses DND
HIGH = "high" # Immediate delivery
NORMAL = "normal" # May be batched by OS
LOW = "low" # Silent, delivered when convenient
def get_priority(event_type: str) -> str:
priority_map = {
"security_alert": NotificationPriority.CRITICAL,
"direct_message": NotificationPriority.HIGH,
"like": NotificationPriority.NORMAL,
"weekly_digest": NotificationPriority.LOW,
}
return priority_map.get(event_type, NotificationPriority.NORMAL)
Android’s notification channels and iOS’s notification categories let users control which types they receive. Map your priority levels to platform-specific settings.
Testing notifications
Testing push is notoriously difficult because it involves external services and physical devices:
- APNs sandbox — use the sandbox URL during development; requires a development provisioning profile.
- FCM test messages — target specific device tokens in the Firebase console.
- Web push testing — Chrome DevTools > Application > Service Workers has a “Push” button for local testing.
- Mock services — for unit tests, mock the HTTP calls and verify payload structure rather than delivery.
@pytest.mark.asyncio
async def test_apns_payload_structure():
payload = build_notification_payload(
title="Test", body="Hello", badge=5, custom={"thread": "123"}
)
assert payload["aps"]["alert"]["title"] == "Test"
assert payload["aps"]["badge"] == 5
assert payload["custom"]["thread"] == "123"
assert len(json.dumps(payload).encode()) <= 4096
One thing to remember: A production notification system is a multi-protocol dispatcher with a token lifecycle problem. Get the token management right — registration, feedback processing, and stale cleanup — and the actual sending is the easy part.
See Also
- Python Discord Bot Development Learn how Python creates Discord bots that moderate servers, play music, and respond to commands — explained for total beginners.
- Python Email Templating Jinja Discover how Jinja templates let Python create personalized emails for thousands of people without writing each one by hand.
- Python Imap Reading Emails See how Python reads your inbox using IMAP — explained with a mailbox-and-key analogy anyone can follow.
- Python Slack Bot Development Find out how Python builds Slack bots that read messages, reply to commands, and automate team workflows — no Slack expertise needed.
- Python Smtplib Sending Emails Understand how Python sends emails through smtplib using the simplest real-world analogy you will ever need.