Python Async Resource Management — Deep Dive
How async with Works at the Protocol Level
The async with statement compiles to:
# async with expr as var:
# body
mgr = expr
value = await mgr.__aenter__()
try:
var = value
body
except BaseException as exc:
if not await mgr.__aexit__(type(exc), exc, exc.__traceback__):
raise
else:
await mgr.__aexit__(None, None, None)
The await in __aexit__ is critical — it means cleanup can perform I/O, but also that it’s subject to the same cancellation rules as any other coroutine.
Building a Production Connection Pool
Here’s a full-featured async connection pool with health checks, timeouts, and leak detection:
import asyncio
import time
from collections import deque
from contextlib import asynccontextmanager
class AsyncPool:
def __init__(self, factory, min_size=2, max_size=10,
max_idle_time=300, acquire_timeout=30):
self._factory = factory # async callable that returns a connection
self._min_size = min_size
self._max_size = max_size
self._max_idle_time = max_idle_time
self._acquire_timeout = acquire_timeout
self._idle = deque() # (conn, last_used_time)
self._in_use = set()
self._size = 0
self._waiters = deque() # Futures waiting for a connection
self._closed = False
async def start(self):
"""Pre-populate the pool with min_size connections."""
for _ in range(self._min_size):
conn = await self._factory()
self._idle.append((conn, time.monotonic()))
self._size += 1
@asynccontextmanager
async def acquire(self):
conn = await self._get_connection()
self._in_use.add(conn)
try:
yield conn
finally:
self._in_use.discard(conn)
if self._closed:
await self._destroy(conn)
elif self._waiters:
waiter = self._waiters.popleft()
if not waiter.done():
waiter.set_result(conn)
else:
self._idle.append((conn, time.monotonic()))
async def _get_connection(self):
# Try to get an idle connection
while self._idle:
conn, last_used = self._idle.popleft()
age = time.monotonic() - last_used
if age > self._max_idle_time:
await self._destroy(conn)
continue
if await self._health_check(conn):
return conn
await self._destroy(conn)
# Create a new one if under limit
if self._size < self._max_size:
self._size += 1
return await self._factory()
# Wait for a connection to be returned
waiter = asyncio.get_running_loop().create_future()
self._waiters.append(waiter)
try:
return await asyncio.wait_for(
waiter, timeout=self._acquire_timeout
)
except asyncio.TimeoutError:
self._waiters.discard(waiter)
raise RuntimeError(
f"Pool exhausted: {self._size} connections in use"
)
async def _health_check(self, conn):
try:
await asyncio.wait_for(conn.ping(), timeout=2.0)
return True
except Exception:
return False
async def _destroy(self, conn):
self._size -= 1
try:
await asyncio.shield(conn.close())
except Exception:
pass # Best effort
async def close(self):
self._closed = True
# Cancel waiters
for waiter in self._waiters:
waiter.cancel()
# Close idle connections
while self._idle:
conn, _ = self._idle.popleft()
await self._destroy(conn)
# In-use connections will be destroyed when returned
Key design decisions:
- Health checks before reuse — stale connections are discarded
- Idle timeout — connections unused for too long are evicted
- Waiter queue — when the pool is full, callers block until a connection is returned
- Shield on close — connection cleanup must complete even during cancellation
AsyncExitStack Internals
AsyncExitStack maintains a stack of cleanup callbacks:
class AsyncExitStack:
def __init__(self):
self._exit_callbacks = deque()
async def enter_async_context(self, cm):
result = await cm.__aenter__()
self._exit_callbacks.append(cm.__aexit__)
return result
async def __aexit__(self, *exc_info):
# Pop and call callbacks in LIFO order
while self._exit_callbacks:
cb = self._exit_callbacks.pop()
try:
if await cb(*exc_info):
exc_info = (None, None, None)
except Exception:
exc_info = sys.exc_info()
return exc_info == (None, None, None)
The LIFO order ensures resources are cleaned up in reverse acquisition order — exactly like nested with blocks.
Cancellation-Safe Cleanup Patterns
Pattern 1: Synchronous Fallback
When possible, implement both sync and async cleanup:
class ManagedResource:
async def __aexit__(self, *exc):
try:
await asyncio.shield(self._async_cleanup())
except asyncio.CancelledError:
# Fallback to synchronous cleanup
self._sync_cleanup()
raise
Pattern 2: Cleanup Task
Spawn a dedicated cleanup task that isn’t subject to the caller’s cancellation:
@asynccontextmanager
async def robust_resource():
resource = await acquire()
try:
yield resource
finally:
# Fire-and-forget cleanup in a separate task
asyncio.get_running_loop().call_soon(
asyncio.create_task, resource.close()
)
This trades strict ordering for reliability — the cleanup happens eventually but not inline.
Pattern 3: Deadline-Aware Cleanup
Give cleanup a bounded amount of time:
@asynccontextmanager
async def timed_cleanup_resource(cleanup_timeout=5.0):
resource = await acquire()
try:
yield resource
finally:
try:
await asyncio.wait_for(
asyncio.shield(resource.close()),
timeout=cleanup_timeout
)
except asyncio.TimeoutError:
resource.force_close() # Synchronous forced shutdown
Resource Leak Detection
Build a tracker that catches resources that aren’t properly closed:
import weakref
import traceback
class LeakDetector:
_instances = weakref.WeakValueDictionary()
@classmethod
def track(cls, resource, label="resource"):
cls._instances[id(resource)] = resource
resource._creation_stack = traceback.format_stack()
resource._creation_time = time.monotonic()
resource._leak_label = label
@classmethod
def report(cls, max_age=60):
now = time.monotonic()
for rid, resource in list(cls._instances.items()):
age = now - resource._creation_time
if age > max_age:
print(f"Potential leak: {resource._leak_label} "
f"(age: {age:.0f}s)")
print("Created at:")
print("".join(resource._creation_stack[-5:]))
# Usage in a context manager:
@asynccontextmanager
async def tracked_connection():
conn = await create_connection()
LeakDetector.track(conn, "db-connection")
try:
yield conn
finally:
await conn.close()
Run LeakDetector.report() periodically (in a heartbeat task) to catch leaks early.
Composing Resources with Dependency Order
In complex applications, resources depend on each other. Use a dependency-aware setup:
async def application_lifecycle():
async with AsyncExitStack() as stack:
# Layer 1: Infrastructure
config = await stack.enter_async_context(config_loader())
# Layer 2: Connections (depend on config)
db = await stack.enter_async_context(
db_pool(config.database_url)
)
cache = await stack.enter_async_context(
redis_pool(config.redis_url)
)
# Layer 3: Services (depend on connections)
user_service = UserService(db, cache)
# Layer 4: Server (depends on services)
server = await stack.enter_async_context(
web_server(user_service)
)
# Run until shutdown signal
await shutdown_event.wait()
# Cleanup order: server → cache → db → config
Performance: Pool Sizing Rules of Thumb
| Application Type | Recommended Pool Size |
|---|---|
| Web API (per worker) | 2 × CPU cores |
| Background workers | 1 per concurrent job |
| Mixed workload | CPU cores + I/O wait ratio × cores |
| Connection to remote DB | Min: 2, Max: 20 (network latency adds up) |
Oversized pools waste memory and connections. Undersized pools cause acquire timeouts. Monitor _waiters length to find the right balance.
One thing to remember: Production async resource management requires three layers — acquisition via async context managers, cleanup protection with asyncio.shield() or synchronous fallbacks, and leak detection via tracking and monitoring — because in a concurrent system, resources that leak accumulate until the application fails.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.