Python Async Resource Management — Deep Dive

How async with Works at the Protocol Level

The async with statement compiles to:

# async with expr as var:
#     body

mgr = expr
value = await mgr.__aenter__()
try:
    var = value
    body
except BaseException as exc:
    if not await mgr.__aexit__(type(exc), exc, exc.__traceback__):
        raise
else:
    await mgr.__aexit__(None, None, None)

The await in __aexit__ is critical — it means cleanup can perform I/O, but also that it’s subject to the same cancellation rules as any other coroutine.

Building a Production Connection Pool

Here’s a full-featured async connection pool with health checks, timeouts, and leak detection:

import asyncio
import time
from collections import deque
from contextlib import asynccontextmanager

class AsyncPool:
    def __init__(self, factory, min_size=2, max_size=10,
                 max_idle_time=300, acquire_timeout=30):
        self._factory = factory  # async callable that returns a connection
        self._min_size = min_size
        self._max_size = max_size
        self._max_idle_time = max_idle_time
        self._acquire_timeout = acquire_timeout

        self._idle = deque()          # (conn, last_used_time)
        self._in_use = set()
        self._size = 0
        self._waiters = deque()       # Futures waiting for a connection
        self._closed = False

    async def start(self):
        """Pre-populate the pool with min_size connections."""
        for _ in range(self._min_size):
            conn = await self._factory()
            self._idle.append((conn, time.monotonic()))
            self._size += 1

    @asynccontextmanager
    async def acquire(self):
        conn = await self._get_connection()
        self._in_use.add(conn)
        try:
            yield conn
        finally:
            self._in_use.discard(conn)
            if self._closed:
                await self._destroy(conn)
            elif self._waiters:
                waiter = self._waiters.popleft()
                if not waiter.done():
                    waiter.set_result(conn)
            else:
                self._idle.append((conn, time.monotonic()))

    async def _get_connection(self):
        # Try to get an idle connection
        while self._idle:
            conn, last_used = self._idle.popleft()
            age = time.monotonic() - last_used
            if age > self._max_idle_time:
                await self._destroy(conn)
                continue
            if await self._health_check(conn):
                return conn
            await self._destroy(conn)

        # Create a new one if under limit
        if self._size < self._max_size:
            self._size += 1
            return await self._factory()

        # Wait for a connection to be returned
        waiter = asyncio.get_running_loop().create_future()
        self._waiters.append(waiter)
        try:
            return await asyncio.wait_for(
                waiter, timeout=self._acquire_timeout
            )
        except asyncio.TimeoutError:
            self._waiters.discard(waiter)
            raise RuntimeError(
                f"Pool exhausted: {self._size} connections in use"
            )

    async def _health_check(self, conn):
        try:
            await asyncio.wait_for(conn.ping(), timeout=2.0)
            return True
        except Exception:
            return False

    async def _destroy(self, conn):
        self._size -= 1
        try:
            await asyncio.shield(conn.close())
        except Exception:
            pass  # Best effort

    async def close(self):
        self._closed = True
        # Cancel waiters
        for waiter in self._waiters:
            waiter.cancel()
        # Close idle connections
        while self._idle:
            conn, _ = self._idle.popleft()
            await self._destroy(conn)
        # In-use connections will be destroyed when returned

Key design decisions:

  • Health checks before reuse — stale connections are discarded
  • Idle timeout — connections unused for too long are evicted
  • Waiter queue — when the pool is full, callers block until a connection is returned
  • Shield on close — connection cleanup must complete even during cancellation

AsyncExitStack Internals

AsyncExitStack maintains a stack of cleanup callbacks:

class AsyncExitStack:
    def __init__(self):
        self._exit_callbacks = deque()

    async def enter_async_context(self, cm):
        result = await cm.__aenter__()
        self._exit_callbacks.append(cm.__aexit__)
        return result

    async def __aexit__(self, *exc_info):
        # Pop and call callbacks in LIFO order
        while self._exit_callbacks:
            cb = self._exit_callbacks.pop()
            try:
                if await cb(*exc_info):
                    exc_info = (None, None, None)
            except Exception:
                exc_info = sys.exc_info()
        return exc_info == (None, None, None)

The LIFO order ensures resources are cleaned up in reverse acquisition order — exactly like nested with blocks.

Cancellation-Safe Cleanup Patterns

Pattern 1: Synchronous Fallback

When possible, implement both sync and async cleanup:

class ManagedResource:
    async def __aexit__(self, *exc):
        try:
            await asyncio.shield(self._async_cleanup())
        except asyncio.CancelledError:
            # Fallback to synchronous cleanup
            self._sync_cleanup()
            raise

Pattern 2: Cleanup Task

Spawn a dedicated cleanup task that isn’t subject to the caller’s cancellation:

@asynccontextmanager
async def robust_resource():
    resource = await acquire()
    try:
        yield resource
    finally:
        # Fire-and-forget cleanup in a separate task
        asyncio.get_running_loop().call_soon(
            asyncio.create_task, resource.close()
        )

This trades strict ordering for reliability — the cleanup happens eventually but not inline.

Pattern 3: Deadline-Aware Cleanup

Give cleanup a bounded amount of time:

@asynccontextmanager
async def timed_cleanup_resource(cleanup_timeout=5.0):
    resource = await acquire()
    try:
        yield resource
    finally:
        try:
            await asyncio.wait_for(
                asyncio.shield(resource.close()),
                timeout=cleanup_timeout
            )
        except asyncio.TimeoutError:
            resource.force_close()  # Synchronous forced shutdown

Resource Leak Detection

Build a tracker that catches resources that aren’t properly closed:

import weakref
import traceback

class LeakDetector:
    _instances = weakref.WeakValueDictionary()

    @classmethod
    def track(cls, resource, label="resource"):
        cls._instances[id(resource)] = resource
        resource._creation_stack = traceback.format_stack()
        resource._creation_time = time.monotonic()
        resource._leak_label = label

    @classmethod
    def report(cls, max_age=60):
        now = time.monotonic()
        for rid, resource in list(cls._instances.items()):
            age = now - resource._creation_time
            if age > max_age:
                print(f"Potential leak: {resource._leak_label} "
                      f"(age: {age:.0f}s)")
                print("Created at:")
                print("".join(resource._creation_stack[-5:]))

# Usage in a context manager:
@asynccontextmanager
async def tracked_connection():
    conn = await create_connection()
    LeakDetector.track(conn, "db-connection")
    try:
        yield conn
    finally:
        await conn.close()

Run LeakDetector.report() periodically (in a heartbeat task) to catch leaks early.

Composing Resources with Dependency Order

In complex applications, resources depend on each other. Use a dependency-aware setup:

async def application_lifecycle():
    async with AsyncExitStack() as stack:
        # Layer 1: Infrastructure
        config = await stack.enter_async_context(config_loader())

        # Layer 2: Connections (depend on config)
        db = await stack.enter_async_context(
            db_pool(config.database_url)
        )
        cache = await stack.enter_async_context(
            redis_pool(config.redis_url)
        )

        # Layer 3: Services (depend on connections)
        user_service = UserService(db, cache)

        # Layer 4: Server (depends on services)
        server = await stack.enter_async_context(
            web_server(user_service)
        )

        # Run until shutdown signal
        await shutdown_event.wait()

    # Cleanup order: server → cache → db → config

Performance: Pool Sizing Rules of Thumb

Application TypeRecommended Pool Size
Web API (per worker)2 × CPU cores
Background workers1 per concurrent job
Mixed workloadCPU cores + I/O wait ratio × cores
Connection to remote DBMin: 2, Max: 20 (network latency adds up)

Oversized pools waste memory and connections. Undersized pools cause acquire timeouts. Monitor _waiters length to find the right balance.

One thing to remember: Production async resource management requires three layers — acquisition via async context managers, cleanup protection with asyncio.shield() or synchronous fallbacks, and leak detection via tracking and monitoring — because in a concurrent system, resources that leak accumulate until the application fails.

pythonconcurrencyasyncioresources

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.