Python Object Pooling — Deep Dive

CPython’s Internal Object Pools

Before building your own pool, it’s worth understanding what CPython already does. The interpreter maintains several internal pools and free lists:

The Small Object Allocator (pymalloc)

CPython’s pymalloc manages allocations up to 512 bytes using a three-tier system:

  1. Arenas — 256 KB chunks obtained from the OS via mmap or malloc.
  2. Pools — 4 KB blocks within arenas, each dedicated to a specific size class (8, 16, 24, …, 512 bytes).
  3. Blocks — Individual allocation units within a pool.

When you create a small object and then delete it, the memory block goes back to its pool’s free list — not back to the OS. The next small object of the same size class reuses that block. This is why creating and destroying millions of small Python objects is faster than you’d expect.

Type-Specific Free Lists

CPython maintains separate free lists for commonly created types:

TypeFree list size (CPython 3.12)Notes
float100 objectsPyFloat_MAXFREELIST
tuple2000 (up to size 20)Per-size free lists
list80 objectsEmpty list shells only
dict80 objectsEmpty dict shells only
frameVariesFunction call frames

When you delete a float, CPython doesn’t deallocate it — it parks it on the float free list. The next float() call grabs a pre-allocated shell from the list and just fills in the value. This is why code that creates/destroys millions of floats in a loop performs better than expected.

Thread-Safe Pool with Context Manager

A production pool needs thread safety, health checks, and clean error handling:

import threading
import time
from collections import deque
from contextlib import contextmanager

class Pool:
    def __init__(self, factory, *, max_size=10, max_idle=300,
                 validate=None):
        self._factory = factory
        self._max_size = max_size
        self._max_idle = max_idle
        self._validate = validate or (lambda obj: True)
        self._lock = threading.Lock()
        self._available = deque()  # (object, last_used_timestamp)
        self._total_count = 0
        self._waiters = threading.Condition(self._lock)

    @contextmanager
    def acquire(self, timeout=30):
        obj = self._get(timeout)
        try:
            yield obj
        except Exception:
            self._discard(obj)
            raise
        else:
            self._return(obj)

    def _get(self, timeout):
        deadline = time.monotonic() + timeout
        with self._lock:
            while True:
                # Try to get an existing idle object
                while self._available:
                    obj, last_used = self._available.popleft()
                    age = time.monotonic() - last_used
                    if age > self._max_idle:
                        self._total_count -= 1
                        self._destroy(obj)
                        continue
                    if self._validate(obj):
                        return obj
                    self._total_count -= 1
                    self._destroy(obj)

                # Create a new one if under limit
                if self._total_count < self._max_size:
                    self._total_count += 1
                    return self._factory()

                # Wait for one to be returned
                remaining = deadline - time.monotonic()
                if remaining <= 0:
                    raise TimeoutError("Pool exhausted")
                self._waiters.wait(timeout=remaining)

    def _return(self, obj):
        with self._lock:
            self._available.append((obj, time.monotonic()))
            self._waiters.notify()

    def _discard(self, obj):
        self._destroy(obj)
        with self._lock:
            self._total_count -= 1
            self._waiters.notify()

    def _destroy(self, obj):
        try:
            if hasattr(obj, 'close'):
                obj.close()
        except Exception:
            pass

Key design decisions:

  • Context manager — Guarantees objects are returned even if exceptions occur.
  • Discard on exception — If the borrowed object was involved in an error, it may be in a bad state. Better to destroy and create fresh.
  • Idle timeout — Objects sitting unused longer than max_idle seconds are destroyed, preventing stale connections.
  • Validation — A callback checks object health before handing it out (e.g., connection.ping() for database connections).

Async Pool for asyncio

Async applications need non-blocking pools:

import asyncio
import time

class AsyncPool:
    def __init__(self, factory, *, max_size=10):
        self._factory = factory  # async callable
        self._max_size = max_size
        self._semaphore = asyncio.Semaphore(max_size)
        self._available = asyncio.Queue()
        self._total = 0
        self._lock = asyncio.Lock()

    async def acquire(self):
        await self._semaphore.acquire()
        try:
            obj = self._available.get_nowait()
        except asyncio.QueueEmpty:
            async with self._lock:
                self._total += 1
            obj = await self._factory()
        return obj

    async def release(self, obj):
        await self._available.put(obj)
        self._semaphore.release()

    async def __aenter__(self):
        self._obj = await self.acquire()
        return self._obj

    async def __aexit__(self, *exc):
        await self.release(self._obj)

Libraries like asyncpg implement sophisticated async pools internally. asyncpg.create_pool() handles connection lifecycle, health checks, and statement caching within the pool.

Real-World Pooling: Database Connections

SQLAlchemy’s QueuePool (the default) demonstrates production pooling:

from sqlalchemy import create_engine, event

engine = create_engine(
    "postgresql+psycopg2://user:pass@db:5432/mydb",
    pool_size=20,           # Steady-state connections
    max_overflow=10,        # Burst capacity
    pool_timeout=30,        # Wait time before TimeoutError
    pool_recycle=1800,      # Force reconnect after 30 min
    pool_pre_ping=True,     # Validate connections before use
)

# Monitor pool health
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
    print(f"Connection checked out: {connection_record}")

@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_record):
    print(f"Connection returned: {connection_record}")

The pool_pre_ping=True setting sends a lightweight query (SELECT 1) before each checkout to detect broken connections. This adds ~1ms latency per checkout but prevents cryptic “connection reset” errors in production.

Buffer Pooling for High-Throughput I/O

When processing network data or file I/O at high throughput, allocating and deallocating buffers creates GC pressure. A buffer pool recycles bytearray objects:

class BufferPool:
    def __init__(self, buffer_size=65536, pool_size=32):
        self._buffer_size = buffer_size
        self._pool = [bytearray(buffer_size) for _ in range(pool_size)]
        self._lock = threading.Lock()

    def get(self):
        with self._lock:
            if self._pool:
                buf = self._pool.pop()
                # Zero out for security
                buf[:] = b'\x00' * self._buffer_size
                return buf
        return bytearray(self._buffer_size)

    def put(self, buf):
        with self._lock:
            if len(self._pool) < 64:  # Don't grow unbounded
                self._pool.append(buf)

This pattern is used in Python web servers like gunicorn and network libraries like asyncio’s transport layer, where buffer allocation is a measurable bottleneck under high concurrency.

Pooling with slots for Memory Efficiency

When pooling many instances of the same class, using __slots__ reduces per-object memory overhead:

class PooledWorker:
    __slots__ = ('id', 'buffer', 'state', '_pool_ref')

    def __init__(self, worker_id):
        self.id = worker_id
        self.buffer = bytearray(4096)
        self.state = "idle"
        self._pool_ref = None

    def reset(self):
        self.buffer[:] = b'\x00' * 4096
        self.state = "idle"

Without __slots__, each instance carries a __dict__ (56+ bytes). With __slots__, the attributes are stored in a compact fixed-layout array, saving ~30% memory per instance.

Monitoring and Tuning

Production pools need observability:

import dataclasses

@dataclasses.dataclass
class PoolStats:
    total_created: int = 0
    total_destroyed: int = 0
    current_borrowed: int = 0
    current_idle: int = 0
    wait_count: int = 0
    timeout_count: int = 0
    avg_borrow_ms: float = 0.0

Key metrics to track:

  • Pool utilizationcurrent_borrowed / max_size. Consistently >80% means you need a larger pool.
  • Wait count — Non-zero means threads are blocking on pool exhaustion.
  • Destroy rate — High destroy rate suggests objects are failing validation — investigate the root cause (network issues, database restarts).
  • Idle ratio — Consistently >90% means your pool is oversized, wasting memory and potentially database connections.

When to Build vs Use Existing Pools

NeedUse existingBuild custom
Database connectionsSQLAlchemy, asyncpg, psycopg pool
HTTP connectionsrequests.Session, httpx.Client
Redis connectionsredis.ConnectionPool
gRPC channelsgrpc.aio channel pool
Custom expensive objects
Pre-allocated buffers
Thread-local resources

Always prefer battle-tested library pools for standard protocols. Build custom pools only for domain-specific expensive objects.

The one thing to remember: Effective object pooling combines bounded sizing, health validation, idle cleanup, and context-managed borrowing — but the biggest wins come from using existing pooling in database drivers and HTTP clients rather than building your own.

pythonperformancememorydesign-patternsinternals

See Also