Python Async Testing Patterns — Deep Dive
Controlling the Event Loop in Tests
By default, pytest-asyncio creates a new event loop per test. You can customize this for shared state or special loop implementations:
@pytest.fixture(scope="session")
def event_loop_policy():
"""Use uvloop for all tests."""
import uvloop
return uvloop.EventLoopPolicy()
@pytest.fixture
def event_loop(event_loop_policy):
policy = event_loop_policy or asyncio.DefaultEventLoopPolicy()
loop = policy.new_event_loop()
yield loop
loop.close()
For tests that need precise control over scheduling:
@pytest.fixture
def controlled_loop():
"""Event loop where we manually advance time."""
loop = asyncio.new_event_loop()
original_time = loop.time
fake_time = [0.0]
def time():
return fake_time[0]
def advance(seconds):
fake_time[0] += seconds
loop._run_once() # Process callbacks that are now due
loop.time = time
loop.advance = advance
return loop
Deterministic Scheduling with Manual Stepping
For tests that need to verify exact interleaving:
import asyncio
async def step_through(coros, steps=100):
"""Run coroutines one step at a time for deterministic testing."""
tasks = [asyncio.ensure_future(c) for c in coros]
for _ in range(steps):
await asyncio.sleep(0) # Yield to allow one scheduling round
if all(t.done() for t in tasks):
break
return tasks
A more powerful approach uses a custom scheduler:
class DeterministicScheduler:
def __init__(self):
self._ready = []
self._time = 0.0
async def run_until_idle(self):
"""Process all ready callbacks, then return."""
while True:
await asyncio.sleep(0)
# Check if any new callbacks were scheduled
loop = asyncio.get_running_loop()
if not loop._ready and not any(
h._when <= loop.time() for h in loop._scheduled
):
break
def advance_time(self, seconds):
self._time += seconds
Testing Backpressure Behavior
Verify that your producer actually blocks when the consumer is slow:
@pytest.mark.asyncio
async def test_backpressure_blocks_producer():
queue = asyncio.Queue(maxsize=5)
producer_blocked = asyncio.Event()
items_produced = 0
async def tracked_producer():
nonlocal items_produced
for i in range(100):
await queue.put(i)
items_produced += 1
if items_produced == 5:
producer_blocked.set()
task = asyncio.create_task(tracked_producer())
await producer_blocked.wait()
# Give the producer one more scheduling opportunity
await asyncio.sleep(0)
# Producer should be blocked at exactly 5 items
assert items_produced == 5
assert queue.full()
# Consume one — producer should advance
await queue.get()
await asyncio.sleep(0)
assert items_produced == 6
task.cancel()
Mocking Async Iterators
For testing code that consumes async streams:
class MockAsyncIterator:
def __init__(self, items, delay=0):
self._items = list(items)
self._delay = delay
self._index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self._index >= len(self._items):
raise StopAsyncIteration
if self._delay:
await asyncio.sleep(self._delay)
item = self._items[self._index]
self._index += 1
return item
@pytest.mark.asyncio
async def test_stream_processing():
mock_stream = MockAsyncIterator([1, 2, 3, 4, 5])
result = await process_stream(mock_stream)
assert result == 15 # sum
Mocking Transports and Protocols
For testing network code at the transport level:
class MockTransport(asyncio.Transport):
def __init__(self):
super().__init__()
self.written = bytearray()
self.closed = False
self._closing = False
self._paused = False
def write(self, data):
self.written.extend(data)
def close(self):
self.closed = True
self._closing = True
def is_closing(self):
return self._closing
def get_write_buffer_size(self):
return len(self.written)
@pytest.mark.asyncio
async def test_protocol_handling():
transport = MockTransport()
protocol = MyProtocol()
protocol.connection_made(transport)
protocol.data_received(b"HELLO\n")
assert transport.written == b"WORLD\n"
Testing Cancellation Robustness
A systematic approach to verify cancellation safety:
@pytest.mark.asyncio
async def test_cancellation_at_every_await():
"""Cancel the function at each possible suspension point."""
cancel_points_hit = 0
async def instrumented_target():
nonlocal cancel_points_hit
conn = await connect() # point 1
cancel_points_hit = 1
data = await conn.read() # point 2
cancel_points_hit = 2
await conn.write(process(data)) # point 3
cancel_points_hit = 3
await conn.close() # point 4
cancel_points_hit = 4
for cancel_at in range(1, 5):
cancel_points_hit = 0
async def cancel_after_point(task, target_point):
while cancel_points_hit < target_point:
await asyncio.sleep(0)
task.cancel()
with patch("mymodule.connect", new_callable=AsyncMock):
task = asyncio.create_task(instrumented_target())
cancel_task = asyncio.create_task(
cancel_after_point(task, cancel_at)
)
try:
await task
except asyncio.CancelledError:
pass
# Verify no resource leaks
Async Test Fixtures with Cleanup Verification
Ensure test cleanup actually happens:
class ResourceTracker:
"""Track resource lifecycle across tests."""
_active = set()
@classmethod
def register(cls, resource):
cls._active.add(id(resource))
@classmethod
def unregister(cls, resource):
cls._active.discard(id(resource))
@classmethod
def assert_all_cleaned(cls):
assert len(cls._active) == 0, (
f"{len(cls._active)} resources not cleaned up"
)
@pytest.fixture(autouse=True)
async def check_resource_leaks():
ResourceTracker._active.clear()
yield
# Small delay for cleanup tasks
await asyncio.sleep(0.01)
ResourceTracker.assert_all_cleaned()
Performance Testing Async Code
Measure throughput under controlled conditions:
@pytest.mark.asyncio
async def test_throughput():
queue = asyncio.Queue(maxsize=100)
processed = 0
target = 10_000
async def producer():
for i in range(target):
await queue.put(i)
await queue.put(None)
async def consumer():
nonlocal processed
while True:
item = await queue.get()
if item is None:
break
processed += 1
start = asyncio.get_event_loop().time()
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())
elapsed = asyncio.get_event_loop().time() - start
assert processed == target
throughput = target / elapsed
assert throughput > 50_000, f"Too slow: {throughput:.0f} items/s"
Test Organization for Large Async Codebases
tests/
├── conftest.py # Shared fixtures (db, event loop config)
├── unit/
│ ├── test_handlers.py # Mock all I/O
│ └── test_models.py
├── integration/
│ ├── test_db.py # Real database, test container
│ └── test_api.py # Real HTTP, mock external services
└── stress/
├── test_backpressure.py # High-volume producer-consumer
└── test_cancellation.py # Systematic cancellation testing
Key principles:
- Unit tests: Mock all async I/O, test logic only
- Integration tests: Real event loop, real resources (via test containers)
- Stress tests: High concurrency, timing-sensitive, run separately
One thing to remember: The foundation of reliable async testing is replacing time-based synchronization (sleep) with event-based synchronization (Event, Queue, wait_for) — and systematically testing cancellation at every await point to verify cleanup safety.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.