Python asyncpg Database — Deep Dive

Binary Protocol Deep Dive

PostgreSQL supports two wire protocols: text and binary. Most drivers (psycopg2, pg8000) use text protocol by default, where every value is serialized as a string. asyncpg uses the binary protocol exclusively.

Why Binary Is Faster

Consider a bigint column with value 9223372036854775807:

  • Text protocol: PostgreSQL serializes to "9223372036854775807" (19 bytes), sends it, the driver parses the string back to an integer.
  • Binary protocol: PostgreSQL sends the raw 8-byte big-endian representation. asyncpg reads it directly into a Python int.

The binary protocol eliminates serialization/deserialization overhead for numeric types, timestamps, UUIDs, and arrays. The savings compound with wide rows and large result sets.

Protocol Message Flow

A typical query cycle:

Client → Server: Parse (SQL + parameter types)
Client → Server: Bind (parameter values in binary)
Client → Server: Execute
Client → Server: Sync
Server → Client: ParseComplete
Server → Client: BindComplete
Server → Client: DataRow (binary values) × N
Server → Client: CommandComplete
Server → Client: ReadyForQuery

asyncpg pipelines these messages — it sends Parse+Bind+Execute+Sync in a single TCP write, reducing round trips.

Connection Pool Architecture

Pool Sizing

The optimal pool size depends on your PostgreSQL server configuration and workload:

pool = await asyncpg.create_pool(
    dsn=DATABASE_URL,
    min_size=5,        # Minimum idle connections
    max_size=20,       # Maximum total connections
    max_inactive_connection_lifetime=300,  # Close idle connections after 5 min
    command_timeout=60  # Query timeout in seconds
)

Sizing rule of thumb: max_size should not exceed PostgreSQL’s max_connections divided by the number of application instances. PostgreSQL performs poorly with more than ~100-200 active connections.

Pool Lifecycle Hooks

async def init_connection(conn):
    """Called when a new connection is created."""
    await conn.set_type_codec(
        'jsonb',
        encoder=json.dumps,
        decoder=json.loads,
        schema='pg_catalog'
    )
    await conn.execute("SET timezone = 'UTC'")
    await conn.execute("SET statement_timeout = '30s'")

pool = await asyncpg.create_pool(
    dsn=DATABASE_URL,
    init=init_connection
)

Connection Health Checks

asyncpg doesn’t have built-in connection health checks. Implement them manually:

async def get_healthy_connection(pool):
    conn = await pool.acquire()
    try:
        await conn.fetchval("SELECT 1")
    except Exception:
        await conn.close()
        conn = await pool.acquire()
    return conn

Or use the setup callback pattern to validate connections when they’re checked out.

COPY Operations

For bulk data loading, asyncpg supports PostgreSQL’s COPY protocol:

# Import from a file-like object
async with pool.acquire() as conn:
    result = await conn.copy_to_table(
        'users',
        source=open('users.csv', 'rb'),
        format='csv',
        header=True
    )
    print(f"Imported {result} rows")

# Copy from Python records
records = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
await conn.copy_records_to_table(
    'users',
    records=records,
    columns=['id', 'name']
)

COPY is orders of magnitude faster than individual INSERTs for bulk loading — asyncpg benchmarks show 100,000+ rows/second for wide tables.

Streaming COPY Out

async with pool.acquire() as conn:
    async with conn.transaction():
        async for record in conn.cursor("SELECT * FROM big_table"):
            process(record)

Using a server-side cursor avoids loading the entire result set into memory.

LISTEN/NOTIFY

asyncpg supports PostgreSQL’s pub/sub mechanism:

async def notification_handler(conn, pid, channel, payload):
    print(f"Received: {payload} on {channel}")

async def listen_for_events():
    conn = await asyncpg.connect(DATABASE_URL)
    await conn.add_listener('events', notification_handler)
    
    # Keep connection alive
    while True:
        await asyncio.sleep(1)

From another connection (or psql):

NOTIFY events, '{"type": "user_created", "id": 42}';

Production LISTEN Pattern

class EventListener:
    def __init__(self, dsn, channels):
        self.dsn = dsn
        self.channels = channels
        self.handlers = {}
    
    def on(self, channel, handler):
        self.handlers.setdefault(channel, []).append(handler)
    
    async def start(self):
        self.conn = await asyncpg.connect(self.dsn)
        for channel in self.channels:
            await self.conn.add_listener(channel, self._dispatch)
    
    async def _dispatch(self, conn, pid, channel, payload):
        import json
        data = json.loads(payload)
        for handler in self.handlers.get(channel, []):
            await handler(data)
    
    async def stop(self):
        for channel in self.channels:
            await self.conn.remove_listener(channel, self._dispatch)
        await self.conn.close()

Advanced Query Patterns

Returning Clauses

row = await conn.fetchrow(
    """
    INSERT INTO users (name, email)
    VALUES ($1, $2)
    RETURNING id, created_at
    """,
    'Alice', 'alice@example.com'
)
user_id = row['id']

Batch Operations with executemany

data = [(f"user_{i}", f"user_{i}@example.com") for i in range(10000)]
await conn.executemany(
    "INSERT INTO users (name, email) VALUES ($1, $2)",
    data
)

executemany uses pipelining — it sends all operations in a batch, which is faster than individual execute calls but slower than COPY for large datasets.

Server-Side Cursors

For large result sets:

async with conn.transaction():
    async for batch in conn.cursor(
        "SELECT * FROM huge_table",
        prefetch=5000
    ):
        process_batch(batch)

The prefetch parameter controls how many rows are fetched per network round trip.

Integration Patterns

With FastAPI

from fastapi import FastAPI, Depends
import asyncpg

app = FastAPI()
pool: asyncpg.Pool = None

@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)

@app.on_event("shutdown")
async def shutdown():
    await pool.close()

async def get_db():
    async with pool.acquire() as conn:
        yield conn

@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(get_db)):
    return await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)

With SQLAlchemy Async

from sqlalchemy.ext.asyncio import create_async_engine

# SQLAlchemy uses asyncpg as its async PostgreSQL dialect
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/mydb",
    pool_size=20,
    max_overflow=0
)

Monitoring and Debugging

Query Logging

import logging

async def setup_logging(conn):
    """Log all queries with timing."""
    async def log_query(query):
        logging.debug(f"Query: {query}")
    
    # asyncpg doesn't have built-in query logging
    # Use PostgreSQL's log_min_duration_statement instead
    await conn.execute("SET log_min_duration_statement = 100")  # Log queries > 100ms

Pool Statistics

def pool_stats(pool):
    return {
        "size": pool.get_size(),
        "free": pool.get_idle_size(),
        "used": pool.get_size() - pool.get_idle_size(),
        "min": pool.get_min_size(),
        "max": pool.get_max_size(),
    }

Error Handling

asyncpg maps PostgreSQL errors to specific exception classes:

try:
    await conn.execute("INSERT INTO users (email) VALUES ($1)", email)
except asyncpg.UniqueViolationError:
    # Duplicate email
    pass
except asyncpg.ForeignKeyViolationError:
    # Referenced record doesn't exist
    pass
except asyncpg.PostgresError as e:
    # Any PostgreSQL error
    print(f"PG error {e.sqlstate}: {e.message}")
except asyncpg.InterfaceError:
    # Connection problem
    pass

One thing to remember: asyncpg’s power comes from three design choices: binary protocol (faster serialization), native async (true non-blocking I/O), and Cython internals (minimal overhead). In production, size your pool conservatively, use COPY for bulk operations, leverage LISTEN/NOTIFY for real-time events, and always set command_timeout to prevent runaway queries.

pythonasyncasyncpgdatabasespostgresql

See Also

  • Python Aioredis Understand Aioredis through a practical analogy so your Python decisions become faster and clearer.
  • Python Alembic Understand Alembic through a practical analogy so your Python decisions become faster and clearer.
  • Python Asyncpg Understand Asyncpg through a practical analogy so your Python decisions become faster and clearer.
  • Python Cassandra Python Understand Cassandra Python through a practical analogy so your Python decisions become faster and clearer.
  • Python Connection Pooling Understand Connection Pooling through a practical analogy so your Python decisions become faster and clearer.