Python asyncpg Database — Deep Dive
Binary Protocol Deep Dive
PostgreSQL supports two wire protocols: text and binary. Most drivers (psycopg2, pg8000) use text protocol by default, where every value is serialized as a string. asyncpg uses the binary protocol exclusively.
Why Binary Is Faster
Consider a bigint column with value 9223372036854775807:
- Text protocol: PostgreSQL serializes to
"9223372036854775807"(19 bytes), sends it, the driver parses the string back to an integer. - Binary protocol: PostgreSQL sends the raw 8-byte big-endian representation. asyncpg reads it directly into a Python
int.
The binary protocol eliminates serialization/deserialization overhead for numeric types, timestamps, UUIDs, and arrays. The savings compound with wide rows and large result sets.
Protocol Message Flow
A typical query cycle:
Client → Server: Parse (SQL + parameter types)
Client → Server: Bind (parameter values in binary)
Client → Server: Execute
Client → Server: Sync
Server → Client: ParseComplete
Server → Client: BindComplete
Server → Client: DataRow (binary values) × N
Server → Client: CommandComplete
Server → Client: ReadyForQuery
asyncpg pipelines these messages — it sends Parse+Bind+Execute+Sync in a single TCP write, reducing round trips.
Connection Pool Architecture
Pool Sizing
The optimal pool size depends on your PostgreSQL server configuration and workload:
pool = await asyncpg.create_pool(
dsn=DATABASE_URL,
min_size=5, # Minimum idle connections
max_size=20, # Maximum total connections
max_inactive_connection_lifetime=300, # Close idle connections after 5 min
command_timeout=60 # Query timeout in seconds
)
Sizing rule of thumb: max_size should not exceed PostgreSQL’s max_connections divided by the number of application instances. PostgreSQL performs poorly with more than ~100-200 active connections.
Pool Lifecycle Hooks
async def init_connection(conn):
"""Called when a new connection is created."""
await conn.set_type_codec(
'jsonb',
encoder=json.dumps,
decoder=json.loads,
schema='pg_catalog'
)
await conn.execute("SET timezone = 'UTC'")
await conn.execute("SET statement_timeout = '30s'")
pool = await asyncpg.create_pool(
dsn=DATABASE_URL,
init=init_connection
)
Connection Health Checks
asyncpg doesn’t have built-in connection health checks. Implement them manually:
async def get_healthy_connection(pool):
conn = await pool.acquire()
try:
await conn.fetchval("SELECT 1")
except Exception:
await conn.close()
conn = await pool.acquire()
return conn
Or use the setup callback pattern to validate connections when they’re checked out.
COPY Operations
For bulk data loading, asyncpg supports PostgreSQL’s COPY protocol:
# Import from a file-like object
async with pool.acquire() as conn:
result = await conn.copy_to_table(
'users',
source=open('users.csv', 'rb'),
format='csv',
header=True
)
print(f"Imported {result} rows")
# Copy from Python records
records = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
await conn.copy_records_to_table(
'users',
records=records,
columns=['id', 'name']
)
COPY is orders of magnitude faster than individual INSERTs for bulk loading — asyncpg benchmarks show 100,000+ rows/second for wide tables.
Streaming COPY Out
async with pool.acquire() as conn:
async with conn.transaction():
async for record in conn.cursor("SELECT * FROM big_table"):
process(record)
Using a server-side cursor avoids loading the entire result set into memory.
LISTEN/NOTIFY
asyncpg supports PostgreSQL’s pub/sub mechanism:
async def notification_handler(conn, pid, channel, payload):
print(f"Received: {payload} on {channel}")
async def listen_for_events():
conn = await asyncpg.connect(DATABASE_URL)
await conn.add_listener('events', notification_handler)
# Keep connection alive
while True:
await asyncio.sleep(1)
From another connection (or psql):
NOTIFY events, '{"type": "user_created", "id": 42}';
Production LISTEN Pattern
class EventListener:
def __init__(self, dsn, channels):
self.dsn = dsn
self.channels = channels
self.handlers = {}
def on(self, channel, handler):
self.handlers.setdefault(channel, []).append(handler)
async def start(self):
self.conn = await asyncpg.connect(self.dsn)
for channel in self.channels:
await self.conn.add_listener(channel, self._dispatch)
async def _dispatch(self, conn, pid, channel, payload):
import json
data = json.loads(payload)
for handler in self.handlers.get(channel, []):
await handler(data)
async def stop(self):
for channel in self.channels:
await self.conn.remove_listener(channel, self._dispatch)
await self.conn.close()
Advanced Query Patterns
Returning Clauses
row = await conn.fetchrow(
"""
INSERT INTO users (name, email)
VALUES ($1, $2)
RETURNING id, created_at
""",
'Alice', 'alice@example.com'
)
user_id = row['id']
Batch Operations with executemany
data = [(f"user_{i}", f"user_{i}@example.com") for i in range(10000)]
await conn.executemany(
"INSERT INTO users (name, email) VALUES ($1, $2)",
data
)
executemany uses pipelining — it sends all operations in a batch, which is faster than individual execute calls but slower than COPY for large datasets.
Server-Side Cursors
For large result sets:
async with conn.transaction():
async for batch in conn.cursor(
"SELECT * FROM huge_table",
prefetch=5000
):
process_batch(batch)
The prefetch parameter controls how many rows are fetched per network round trip.
Integration Patterns
With FastAPI
from fastapi import FastAPI, Depends
import asyncpg
app = FastAPI()
pool: asyncpg.Pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)
@app.on_event("shutdown")
async def shutdown():
await pool.close()
async def get_db():
async with pool.acquire() as conn:
yield conn
@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(get_db)):
return await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
With SQLAlchemy Async
from sqlalchemy.ext.asyncio import create_async_engine
# SQLAlchemy uses asyncpg as its async PostgreSQL dialect
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=20,
max_overflow=0
)
Monitoring and Debugging
Query Logging
import logging
async def setup_logging(conn):
"""Log all queries with timing."""
async def log_query(query):
logging.debug(f"Query: {query}")
# asyncpg doesn't have built-in query logging
# Use PostgreSQL's log_min_duration_statement instead
await conn.execute("SET log_min_duration_statement = 100") # Log queries > 100ms
Pool Statistics
def pool_stats(pool):
return {
"size": pool.get_size(),
"free": pool.get_idle_size(),
"used": pool.get_size() - pool.get_idle_size(),
"min": pool.get_min_size(),
"max": pool.get_max_size(),
}
Error Handling
asyncpg maps PostgreSQL errors to specific exception classes:
try:
await conn.execute("INSERT INTO users (email) VALUES ($1)", email)
except asyncpg.UniqueViolationError:
# Duplicate email
pass
except asyncpg.ForeignKeyViolationError:
# Referenced record doesn't exist
pass
except asyncpg.PostgresError as e:
# Any PostgreSQL error
print(f"PG error {e.sqlstate}: {e.message}")
except asyncpg.InterfaceError:
# Connection problem
pass
One thing to remember: asyncpg’s power comes from three design choices: binary protocol (faster serialization), native async (true non-blocking I/O), and Cython internals (minimal overhead). In production, size your pool conservatively, use COPY for bulk operations, leverage LISTEN/NOTIFY for real-time events, and always set command_timeout to prevent runaway queries.
See Also
- Python Aioredis Understand Aioredis through a practical analogy so your Python decisions become faster and clearer.
- Python Alembic Understand Alembic through a practical analogy so your Python decisions become faster and clearer.
- Python Asyncpg Understand Asyncpg through a practical analogy so your Python decisions become faster and clearer.
- Python Cassandra Python Understand Cassandra Python through a practical analogy so your Python decisions become faster and clearer.
- Python Connection Pooling Understand Connection Pooling through a practical analogy so your Python decisions become faster and clearer.