Python SSE Client Consumption — Deep Dive
System-level framing
An SSE consumer is a long-lived HTTP client that maintains a persistent connection to a server, parses a text-based event stream, and routes events to application handlers. Production consumers must handle connection drops with automatic reconnection, manage backpressure when events arrive faster than they can be processed, deal with servers that deviate from the SSE specification, and integrate with async Python application frameworks. The protocol is deceptively simple — the operational challenges come from maintaining reliability over hours or days of continuous streaming.
The SSE protocol in detail
The text/event-stream format is line-oriented:
: this is a comment (keep-alive ping)
event: update
data: {"temperature": 22.5}
id: evt-001
retry: 5000
data: plain text without event type
id: evt-002
event: multi-line
data: first line
data: second line
data: third line
id: evt-003
Parsing rules:
- Lines starting with
:are comments (used as keep-alive pings). - Fields are
field: value(note the space after colon is optional). - Events are delimited by blank lines.
- Multiple
data:lines are joined with\n. - Unknown fields are ignored.
- The
retryfield sets the reconnection interval in milliseconds.
SSE event parser
from dataclasses import dataclass, field
@dataclass
class SSEEvent:
event: str = "message"
data: str = ""
id: str = ""
retry: int | None = None
class SSEParser:
def __init__(self):
self._event = SSEEvent()
self._data_lines: list[str] = []
def feed_line(self, line: str) -> SSEEvent | None:
if not line: # Blank line = dispatch event
return self._dispatch()
if line.startswith(":"):
return None # Comment / keep-alive
if ":" in line:
field_name, _, value = line.partition(":")
if value.startswith(" "):
value = value[1:]
else:
field_name = line
value = ""
if field_name == "event":
self._event.event = value
elif field_name == "data":
self._data_lines.append(value)
elif field_name == "id":
self._event.id = value
elif field_name == "retry":
try:
self._event.retry = int(value)
except ValueError:
pass
return None
def _dispatch(self) -> SSEEvent | None:
if not self._data_lines:
self._reset()
return None
self._event.data = "\n".join(self._data_lines)
event = self._event
self._reset()
return event
def _reset(self):
self._event = SSEEvent()
self._data_lines = []
Async SSE consumer with httpx
import httpx
import asyncio
import logging
logger = logging.getLogger("sse_consumer")
class SSEConsumer:
def __init__(
self,
url: str,
headers: dict | None = None,
reconnect_delay: float = 3.0,
max_reconnect_delay: float = 60.0,
):
self.url = url
self.headers = headers or {}
self.reconnect_delay = reconnect_delay
self.max_reconnect_delay = max_reconnect_delay
self._last_event_id: str = ""
self._current_retry: float = reconnect_delay
self._running = False
async def connect(self, callback):
self._running = True
while self._running:
try:
await self._stream(callback)
except (httpx.ReadError, httpx.RemoteProtocolError, httpx.ConnectError) as exc:
logger.warning(f"Connection lost: {exc}. Reconnecting in {self._current_retry}s")
await asyncio.sleep(self._current_retry)
self._current_retry = min(
self._current_retry * 1.5,
self.max_reconnect_delay,
)
except asyncio.CancelledError:
self._running = False
break
async def _stream(self, callback):
headers = {
**self.headers,
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
}
if self._last_event_id:
headers["Last-Event-ID"] = self._last_event_id
parser = SSEParser()
async with httpx.AsyncClient(timeout=httpx.Timeout(
connect=10, read=None, write=10, pool=10
)) as client:
async with client.stream("GET", self.url, headers=headers) as response:
response.raise_for_status()
self._current_retry = self.reconnect_delay # Reset on success
logger.info(f"Connected to SSE stream: {self.url}")
async for line in response.aiter_lines():
event = parser.feed_line(line)
if event:
if event.id:
self._last_event_id = event.id
if event.retry:
self.reconnect_delay = event.retry / 1000
self._current_retry = self.reconnect_delay
await callback(event)
def stop(self):
self._running = False
Key design decisions:
read=Nonetimeout allows the connection to stay open indefinitely.Last-Event-IDheader enables gap-free reconnection.- Exponential backoff on reconnection prevents hammering a struggling server.
- The
retryfield from the server overrides the default reconnection delay.
Using httpx-sse for simpler integration
The httpx-sse library provides a higher-level API:
import httpx
from httpx_sse import aconnect_sse
async def consume_with_httpx_sse(url: str):
async with httpx.AsyncClient() as client:
async with aconnect_sse(client, "GET", url) as source:
async for event in source.aiter_sse():
print(f"Event: {event.event}, Data: {event.data}")
This is cleaner for simple cases but offers less control over reconnection and error handling.
LLM streaming integration
The most common SSE use case in 2024-2026 is streaming LLM responses:
import json
class LLMStreamConsumer:
def __init__(self, api_url: str, api_key: str):
self.api_url = api_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
}
async def stream_completion(
self, messages: list[dict], on_token=None, on_complete=None
) -> str:
full_response = []
async with httpx.AsyncClient(timeout=httpx.Timeout(
connect=10, read=120, write=10, pool=10
)) as client:
async with client.stream(
"POST",
f"{self.api_url}/chat/completions",
headers=self.headers,
json={"messages": messages, "stream": True},
) as response:
response.raise_for_status()
parser = SSEParser()
async for line in response.aiter_lines():
event = parser.feed_line(line)
if not event:
continue
if event.data == "[DONE]":
break
try:
chunk = json.loads(event.data)
delta = chunk["choices"][0]["delta"]
content = delta.get("content", "")
if content:
full_response.append(content)
if on_token:
await on_token(content)
except (json.JSONDecodeError, KeyError, IndexError):
logger.warning(f"Unparseable SSE data: {event.data[:100]}")
result = "".join(full_response)
if on_complete:
await on_complete(result)
return result
Backpressure handling
When events arrive faster than your application can process them, you need backpressure management:
class BufferedSSEConsumer:
def __init__(self, consumer: SSEConsumer, max_buffer: int = 10000):
self.consumer = consumer
self.buffer: asyncio.Queue = asyncio.Queue(maxsize=max_buffer)
self._overflow_count = 0
async def start(self, num_workers: int = 3):
tasks = [
asyncio.create_task(self.consumer.connect(self._enqueue)),
*[asyncio.create_task(self._worker(i)) for i in range(num_workers)],
]
await asyncio.gather(*tasks)
async def _enqueue(self, event: SSEEvent):
try:
self.buffer.put_nowait(event)
except asyncio.QueueFull:
self._overflow_count += 1
if self._overflow_count % 100 == 0:
logger.warning(f"SSE buffer overflow: {self._overflow_count} events dropped")
async def _worker(self, worker_id: int):
while True:
event = await self.buffer.get()
try:
await self._process(event)
except Exception as exc:
logger.error(f"Worker {worker_id} error: {exc}")
finally:
self.buffer.task_done()
async def _process(self, event: SSEEvent):
# Override in subclass or pass handler
pass
The bounded queue prevents memory exhaustion when the consumer cannot keep up. Dropped events are logged so the application can detect sustained overload.
Connection health monitoring
Long-lived SSE connections can silently die (half-open TCP connections):
class MonitoredSSEConsumer(SSEConsumer):
def __init__(self, *args, heartbeat_timeout: float = 60, **kwargs):
super().__init__(*args, **kwargs)
self.heartbeat_timeout = heartbeat_timeout
self._last_event_time = time.monotonic()
async def _stream(self, callback):
headers = {
**self.headers,
"Accept": "text/event-stream",
}
if self._last_event_id:
headers["Last-Event-ID"] = self._last_event_id
parser = SSEParser()
async with httpx.AsyncClient(timeout=httpx.Timeout(
connect=10, read=self.heartbeat_timeout + 10, write=10, pool=10
)) as client:
async with client.stream("GET", self.url, headers=headers) as response:
response.raise_for_status()
self._current_retry = self.reconnect_delay
async for line in response.aiter_lines():
self._last_event_time = time.monotonic()
event = parser.feed_line(line)
if event:
if event.id:
self._last_event_id = event.id
await callback(event)
Setting read timeout slightly above the expected heartbeat interval ensures dead connections are detected. Well-behaved SSE servers send comment lines (:ping) as keep-alives every 15-30 seconds.
Testing SSE consumers
import pytest
from aiohttp import web
async def mock_sse_server(request):
response = web.StreamResponse()
response.headers["Content-Type"] = "text/event-stream"
response.headers["Cache-Control"] = "no-cache"
await response.prepare(request)
events = [
"event: update\ndata: {\"value\": 1}\nid: 1\n\n",
"event: update\ndata: {\"value\": 2}\nid: 2\n\n",
": keepalive\n\n",
"event: done\ndata: finished\nid: 3\n\n",
]
for event_text in events:
await response.write(event_text.encode())
await asyncio.sleep(0.1)
return response
@pytest.fixture
async def sse_server(aiohttp_server):
app = web.Application()
app.router.add_get("/events", mock_sse_server)
server = await aiohttp_server(app)
return server
@pytest.mark.asyncio
async def test_consumer_receives_all_events(sse_server):
received = []
async def collect(event: SSEEvent):
received.append(event)
consumer = SSEConsumer(f"http://localhost:{sse_server.port}/events")
task = asyncio.create_task(consumer.connect(collect))
await asyncio.sleep(1)
consumer.stop()
task.cancel()
assert len(received) == 3 # Two updates + done (comments are filtered)
assert received[0].event == "update"
assert received[2].data == "finished"
Performance considerations
| Optimization | Impact |
|---|---|
| Reuse httpx client | Avoids TCP handshake per reconnect |
| Buffer events before parsing JSON | Reduces per-event overhead |
| Use orjson for JSON parsing | 3-5x faster than stdlib json |
| Worker pool for processing | Decouples I/O from computation |
| Connection pooling for multi-stream | Shares TCP connections |
For high-throughput scenarios (thousands of events per second), consider switching from line-by-line parsing to chunk-based parsing:
async for chunk in response.aiter_bytes():
for line in chunk.decode().split("\n"):
event = parser.feed_line(line)
if event:
await callback(event)
One thing to remember: A production SSE consumer is a resilient, long-lived HTTP client with automatic reconnection via Last-Event-ID, backpressure management through bounded queues, and health monitoring via heartbeat timeouts. The protocol is simple text over HTTP — the engineering challenge is maintaining reliable operation over hours and days of continuous streaming.
See Also
- Python Api Rate Limit Handling Why APIs tell your Python program to slow down, and how to handle it gracefully — explained so anyone can follow along.
- Python Proxy Rotation Why Python programs disguise their internet address when collecting data, and how proxy rotation works — explained without any tech jargon.
- Python Web Scraping Ethics When is it okay to collect data from websites with Python, and when does it cross the line? The rules explained for everyone.
- Python Webhook Handlers How Python programs receive instant notifications from other services when something happens — explained without technical jargon.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.