Python Async Performance Tuning — Deep Dive
Async Python’s performance ceiling is surprisingly high — uvicorn + uvloop can handle 50,000+ requests per second on a single core. But reaching that ceiling requires understanding the event loop’s internals and knowing exactly where cycles are wasted.
Event loop internals
The asyncio event loop cycle
Each iteration of the event loop follows this sequence:
- Run all ready callbacks — tasks that were scheduled with
call_soon() - Poll for I/O — call
epoll/kqueue/IOCPwith a calculated timeout - Process I/O events — invoke callbacks for ready file descriptors
- Run scheduled callbacks —
call_later()andcall_at()that have expired
The key insight: if step 1 takes too long (because a callback is CPU-heavy), steps 2-4 are delayed, and all pending I/O operations experience added latency.
Measuring event loop lag
import asyncio
import time
async def monitor_loop_lag(interval=1.0):
"""Report event loop responsiveness"""
while True:
t0 = time.monotonic()
await asyncio.sleep(interval)
actual = time.monotonic() - t0
lag_ms = (actual - interval) * 1000
if lag_ms > 10: # 10ms threshold
print(f"Event loop lag: {lag_ms:.1f}ms")
# Run as background task
asyncio.create_task(monitor_loop_lag())
This coroutine sleeps for 1 second and measures how much longer it actually took. Consistent lag above 10-20ms indicates blocking callbacks.
uvloop: the drop-in accelerator
uvloop replaces asyncio’s default event loop with one built on libuv (the same library powering Node.js). It’s typically 2-4× faster for I/O-heavy workloads.
import asyncio
import uvloop
# Option 1: Set as default policy
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Option 2: Use uvloop.run() (Python 3.12+)
uvloop.run(main())
Benchmark comparison
Measured on a 4-core machine, handling HTTP requests with aiohttp:
| Event loop | Requests/sec | P99 latency |
|---|---|---|
| asyncio (default) | 12,400 | 8.2ms |
| uvloop | 31,200 | 3.1ms |
The speedup comes from uvloop’s C implementation of the event loop, avoiding Python overhead for every I/O poll cycle.
When uvloop doesn’t help
- CPU-bound callbacks (the bottleneck isn’t in the loop)
- Very few concurrent connections (loop overhead is negligible)
- Code that uses asyncio internals incompatible with uvloop
Concurrency control patterns
Adaptive rate limiting
Fixed semaphore values waste capacity or cause overload. Adaptive approaches adjust based on feedback:
import asyncio
import time
class AdaptiveLimiter:
def __init__(self, initial=10, min_val=1, max_val=200):
self.sem = asyncio.Semaphore(initial)
self.current = initial
self.min_val = min_val
self.max_val = max_val
self.success_count = 0
self.error_count = 0
self._lock = asyncio.Lock()
async def acquire(self):
await self.sem.acquire()
async def release(self, success=True):
self.sem.release()
async with self._lock:
if success:
self.success_count += 1
else:
self.error_count += 1
await self._maybe_adjust()
async def _maybe_adjust(self):
total = self.success_count + self.error_count
if total < 100:
return
error_rate = self.error_count / total
if error_rate > 0.1 and self.current > self.min_val:
# Too many errors: reduce concurrency
self.current = max(self.min_val, self.current // 2)
self.sem = asyncio.Semaphore(self.current)
elif error_rate < 0.01 and self.current < self.max_val:
# Very few errors: increase concurrency
self.current = min(self.max_val, self.current + 10)
self.sem = asyncio.Semaphore(self.current)
self.success_count = 0
self.error_count = 0
Task batching with asyncio.TaskGroup
Python 3.11+ provides structured concurrency via TaskGroup:
async def process_batch(items, batch_size=50):
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process(item)) for item in batch]
results.extend(t.result() for t in tasks)
return results
TaskGroup automatically cancels remaining tasks if one raises an exception, preventing resource leaks.
Connection pool optimization
Sizing database pools
The optimal pool size for database connections follows this formula:
pool_size = (concurrent_queries × avg_query_time) / target_response_time
In practice:
import asyncpg
pool = await asyncpg.create_pool(
dsn='postgresql://...',
min_size=5, # keep 5 connections warm
max_size=20, # never exceed 20
max_inactive_connection_lifetime=300, # close idle connections after 5min
command_timeout=30, # kill slow queries
)
Monitor pool usage to detect saturation:
async def pool_stats(pool):
return {
'size': pool.get_size(),
'free': pool.get_idle_size(),
'used': pool.get_size() - pool.get_idle_size(),
'min': pool.get_min_size(),
'max': pool.get_max_size(),
}
HTTP connection pooling with aiohttp
connector = aiohttp.TCPConnector(
limit=100, # total connections
limit_per_host=10, # per-host limit
ttl_dns_cache=300, # DNS cache TTL
enable_cleanup_closed=True,
keepalive_timeout=30,
)
session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=30, connect=5),
)
Profiling async applications
Using yappi for async-aware profiling
import yappi
yappi.set_clock_type('wall') # wall clock for I/O-heavy code
yappi.start()
asyncio.run(main())
yappi.stop()
# Get coroutine-level stats
func_stats = yappi.get_func_stats()
func_stats.sort('ttot', 'desc')
func_stats.print_all(columns={
'name': 60, 'ncall': 10, 'ttot': 10, 'tavg': 10
})
Tracing task lifecycle
import asyncio
import logging
logger = logging.getLogger('async_tasks')
class TracingTaskFactory:
def __call__(self, loop, coro, *, name=None, context=None):
task = asyncio.Task(coro, loop=loop, name=name, context=context)
created_at = loop.time()
def done_callback(t):
elapsed = loop.time() - created_at
if elapsed > 1.0:
logger.warning(
f"Slow task {t.get_name()}: {elapsed:.2f}s "
f"exception={t.exception()}"
)
task.add_done_callback(done_callback)
return task
loop = asyncio.get_event_loop()
loop.set_task_factory(TracingTaskFactory())
Memory optimization for high-concurrency
Each coroutine frame consumes ~1-3KB. At 100,000 concurrent coroutines, that’s 100-300MB just for frames.
Strategies to reduce memory:
- Limit concurrent tasks — use semaphores to cap active coroutines
- Stream large responses — don’t buffer entire response bodies in memory
async def stream_download(url, dest):
async with session.get(url) as resp:
with open(dest, 'wb') as f:
async for chunk in resp.content.iter_chunked(8192):
f.write(chunk)
- Use
__slots__on frequently created objects — reduces per-instance memory - Release references early —
del large_objectwithin long-running coroutines
Production checklist
| Area | Optimization | Expected impact |
|---|---|---|
| Event loop | Switch to uvloop | 2-4× throughput |
| Concurrency | Add semaphore limits | Prevents resource exhaustion |
| Connections | Pool HTTP and DB connections | Eliminates connection overhead |
| Blocking calls | Offload to executor | Removes event loop stalls |
| Parallelism | Use gather for independent I/O | Reduces latency by parallelism factor |
| Batching | Group small operations | Reduces round-trips |
| Monitoring | Track event loop lag | Early warning for degradation |
| Memory | Stream large payloads | Prevents OOM at scale |
The one thing to remember: async performance is bounded by the slowest synchronous operation in your event loop — find it with lag monitoring, eliminate it with executors or async libraries, then scale concurrency with proper pooling and rate limiting.
See Also
- Python Algorithmic Complexity Understand Algorithmic Complexity through a practical analogy so your Python decisions become faster and clearer.
- Python Benchmark Methodology Why timing Python code once means nothing, and how fair testing works like a science experiment.
- Python C Extension Performance How Python borrows C's speed for the hard parts — like hiring a specialist for the toughest job on the worksite.
- Python Caching Strategies Understand Python caching strategies with a shortcut-road analogy so your app gets faster without taking wrong turns.
- Python Caching Techniques Understand Caching Techniques through a practical analogy so your Python decisions become faster and clearer.