Python SSE Client Consumption — Deep Dive

System-level framing

An SSE consumer is a long-lived HTTP client that maintains a persistent connection to a server, parses a text-based event stream, and routes events to application handlers. Production consumers must handle connection drops with automatic reconnection, manage backpressure when events arrive faster than they can be processed, deal with servers that deviate from the SSE specification, and integrate with async Python application frameworks. The protocol is deceptively simple — the operational challenges come from maintaining reliability over hours or days of continuous streaming.

The SSE protocol in detail

The text/event-stream format is line-oriented:

: this is a comment (keep-alive ping)

event: update
data: {"temperature": 22.5}
id: evt-001
retry: 5000

data: plain text without event type
id: evt-002

event: multi-line
data: first line
data: second line
data: third line
id: evt-003

Parsing rules:

  • Lines starting with : are comments (used as keep-alive pings).
  • Fields are field: value (note the space after colon is optional).
  • Events are delimited by blank lines.
  • Multiple data: lines are joined with \n.
  • Unknown fields are ignored.
  • The retry field sets the reconnection interval in milliseconds.

SSE event parser

from dataclasses import dataclass, field

@dataclass
class SSEEvent:
    event: str = "message"
    data: str = ""
    id: str = ""
    retry: int | None = None

class SSEParser:
    def __init__(self):
        self._event = SSEEvent()
        self._data_lines: list[str] = []

    def feed_line(self, line: str) -> SSEEvent | None:
        if not line:  # Blank line = dispatch event
            return self._dispatch()

        if line.startswith(":"):
            return None  # Comment / keep-alive

        if ":" in line:
            field_name, _, value = line.partition(":")
            if value.startswith(" "):
                value = value[1:]
        else:
            field_name = line
            value = ""

        if field_name == "event":
            self._event.event = value
        elif field_name == "data":
            self._data_lines.append(value)
        elif field_name == "id":
            self._event.id = value
        elif field_name == "retry":
            try:
                self._event.retry = int(value)
            except ValueError:
                pass

        return None

    def _dispatch(self) -> SSEEvent | None:
        if not self._data_lines:
            self._reset()
            return None

        self._event.data = "\n".join(self._data_lines)
        event = self._event
        self._reset()
        return event

    def _reset(self):
        self._event = SSEEvent()
        self._data_lines = []

Async SSE consumer with httpx

import httpx
import asyncio
import logging

logger = logging.getLogger("sse_consumer")

class SSEConsumer:
    def __init__(
        self,
        url: str,
        headers: dict | None = None,
        reconnect_delay: float = 3.0,
        max_reconnect_delay: float = 60.0,
    ):
        self.url = url
        self.headers = headers or {}
        self.reconnect_delay = reconnect_delay
        self.max_reconnect_delay = max_reconnect_delay
        self._last_event_id: str = ""
        self._current_retry: float = reconnect_delay
        self._running = False

    async def connect(self, callback):
        self._running = True
        while self._running:
            try:
                await self._stream(callback)
            except (httpx.ReadError, httpx.RemoteProtocolError, httpx.ConnectError) as exc:
                logger.warning(f"Connection lost: {exc}. Reconnecting in {self._current_retry}s")
                await asyncio.sleep(self._current_retry)
                self._current_retry = min(
                    self._current_retry * 1.5,
                    self.max_reconnect_delay,
                )
            except asyncio.CancelledError:
                self._running = False
                break

    async def _stream(self, callback):
        headers = {
            **self.headers,
            "Accept": "text/event-stream",
            "Cache-Control": "no-cache",
        }
        if self._last_event_id:
            headers["Last-Event-ID"] = self._last_event_id

        parser = SSEParser()

        async with httpx.AsyncClient(timeout=httpx.Timeout(
            connect=10, read=None, write=10, pool=10
        )) as client:
            async with client.stream("GET", self.url, headers=headers) as response:
                response.raise_for_status()
                self._current_retry = self.reconnect_delay  # Reset on success
                logger.info(f"Connected to SSE stream: {self.url}")

                async for line in response.aiter_lines():
                    event = parser.feed_line(line)
                    if event:
                        if event.id:
                            self._last_event_id = event.id
                        if event.retry:
                            self.reconnect_delay = event.retry / 1000
                            self._current_retry = self.reconnect_delay
                        await callback(event)

    def stop(self):
        self._running = False

Key design decisions:

  • read=None timeout allows the connection to stay open indefinitely.
  • Last-Event-ID header enables gap-free reconnection.
  • Exponential backoff on reconnection prevents hammering a struggling server.
  • The retry field from the server overrides the default reconnection delay.

Using httpx-sse for simpler integration

The httpx-sse library provides a higher-level API:

import httpx
from httpx_sse import aconnect_sse

async def consume_with_httpx_sse(url: str):
    async with httpx.AsyncClient() as client:
        async with aconnect_sse(client, "GET", url) as source:
            async for event in source.aiter_sse():
                print(f"Event: {event.event}, Data: {event.data}")

This is cleaner for simple cases but offers less control over reconnection and error handling.

LLM streaming integration

The most common SSE use case in 2024-2026 is streaming LLM responses:

import json

class LLMStreamConsumer:
    def __init__(self, api_url: str, api_key: str):
        self.api_url = api_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Accept": "text/event-stream",
        }

    async def stream_completion(
        self, messages: list[dict], on_token=None, on_complete=None
    ) -> str:
        full_response = []

        async with httpx.AsyncClient(timeout=httpx.Timeout(
            connect=10, read=120, write=10, pool=10
        )) as client:
            async with client.stream(
                "POST",
                f"{self.api_url}/chat/completions",
                headers=self.headers,
                json={"messages": messages, "stream": True},
            ) as response:
                response.raise_for_status()
                parser = SSEParser()

                async for line in response.aiter_lines():
                    event = parser.feed_line(line)
                    if not event:
                        continue

                    if event.data == "[DONE]":
                        break

                    try:
                        chunk = json.loads(event.data)
                        delta = chunk["choices"][0]["delta"]
                        content = delta.get("content", "")
                        if content:
                            full_response.append(content)
                            if on_token:
                                await on_token(content)
                    except (json.JSONDecodeError, KeyError, IndexError):
                        logger.warning(f"Unparseable SSE data: {event.data[:100]}")

        result = "".join(full_response)
        if on_complete:
            await on_complete(result)
        return result

Backpressure handling

When events arrive faster than your application can process them, you need backpressure management:

class BufferedSSEConsumer:
    def __init__(self, consumer: SSEConsumer, max_buffer: int = 10000):
        self.consumer = consumer
        self.buffer: asyncio.Queue = asyncio.Queue(maxsize=max_buffer)
        self._overflow_count = 0

    async def start(self, num_workers: int = 3):
        tasks = [
            asyncio.create_task(self.consumer.connect(self._enqueue)),
            *[asyncio.create_task(self._worker(i)) for i in range(num_workers)],
        ]
        await asyncio.gather(*tasks)

    async def _enqueue(self, event: SSEEvent):
        try:
            self.buffer.put_nowait(event)
        except asyncio.QueueFull:
            self._overflow_count += 1
            if self._overflow_count % 100 == 0:
                logger.warning(f"SSE buffer overflow: {self._overflow_count} events dropped")

    async def _worker(self, worker_id: int):
        while True:
            event = await self.buffer.get()
            try:
                await self._process(event)
            except Exception as exc:
                logger.error(f"Worker {worker_id} error: {exc}")
            finally:
                self.buffer.task_done()

    async def _process(self, event: SSEEvent):
        # Override in subclass or pass handler
        pass

The bounded queue prevents memory exhaustion when the consumer cannot keep up. Dropped events are logged so the application can detect sustained overload.

Connection health monitoring

Long-lived SSE connections can silently die (half-open TCP connections):

class MonitoredSSEConsumer(SSEConsumer):
    def __init__(self, *args, heartbeat_timeout: float = 60, **kwargs):
        super().__init__(*args, **kwargs)
        self.heartbeat_timeout = heartbeat_timeout
        self._last_event_time = time.monotonic()

    async def _stream(self, callback):
        headers = {
            **self.headers,
            "Accept": "text/event-stream",
        }
        if self._last_event_id:
            headers["Last-Event-ID"] = self._last_event_id

        parser = SSEParser()

        async with httpx.AsyncClient(timeout=httpx.Timeout(
            connect=10, read=self.heartbeat_timeout + 10, write=10, pool=10
        )) as client:
            async with client.stream("GET", self.url, headers=headers) as response:
                response.raise_for_status()
                self._current_retry = self.reconnect_delay

                async for line in response.aiter_lines():
                    self._last_event_time = time.monotonic()
                    event = parser.feed_line(line)
                    if event:
                        if event.id:
                            self._last_event_id = event.id
                        await callback(event)

Setting read timeout slightly above the expected heartbeat interval ensures dead connections are detected. Well-behaved SSE servers send comment lines (:ping) as keep-alives every 15-30 seconds.

Testing SSE consumers

import pytest
from aiohttp import web

async def mock_sse_server(request):
    response = web.StreamResponse()
    response.headers["Content-Type"] = "text/event-stream"
    response.headers["Cache-Control"] = "no-cache"
    await response.prepare(request)

    events = [
        "event: update\ndata: {\"value\": 1}\nid: 1\n\n",
        "event: update\ndata: {\"value\": 2}\nid: 2\n\n",
        ": keepalive\n\n",
        "event: done\ndata: finished\nid: 3\n\n",
    ]
    for event_text in events:
        await response.write(event_text.encode())
        await asyncio.sleep(0.1)

    return response

@pytest.fixture
async def sse_server(aiohttp_server):
    app = web.Application()
    app.router.add_get("/events", mock_sse_server)
    server = await aiohttp_server(app)
    return server

@pytest.mark.asyncio
async def test_consumer_receives_all_events(sse_server):
    received = []

    async def collect(event: SSEEvent):
        received.append(event)

    consumer = SSEConsumer(f"http://localhost:{sse_server.port}/events")

    task = asyncio.create_task(consumer.connect(collect))
    await asyncio.sleep(1)
    consumer.stop()
    task.cancel()

    assert len(received) == 3  # Two updates + done (comments are filtered)
    assert received[0].event == "update"
    assert received[2].data == "finished"

Performance considerations

OptimizationImpact
Reuse httpx clientAvoids TCP handshake per reconnect
Buffer events before parsing JSONReduces per-event overhead
Use orjson for JSON parsing3-5x faster than stdlib json
Worker pool for processingDecouples I/O from computation
Connection pooling for multi-streamShares TCP connections

For high-throughput scenarios (thousands of events per second), consider switching from line-by-line parsing to chunk-based parsing:

async for chunk in response.aiter_bytes():
    for line in chunk.decode().split("\n"):
        event = parser.feed_line(line)
        if event:
            await callback(event)

One thing to remember: A production SSE consumer is a resilient, long-lived HTTP client with automatic reconnection via Last-Event-ID, backpressure management through bounded queues, and health monitoring via heartbeat timeouts. The protocol is simple text over HTTP — the engineering challenge is maintaining reliable operation over hours and days of continuous streaming.

pythonssestreamingreal-time

See Also

  • Python Api Rate Limit Handling Why APIs tell your Python program to slow down, and how to handle it gracefully — explained so anyone can follow along.
  • Python Proxy Rotation Why Python programs disguise their internet address when collecting data, and how proxy rotation works — explained without any tech jargon.
  • Python Web Scraping Ethics When is it okay to collect data from websites with Python, and when does it cross the line? The rules explained for everyone.
  • Python Webhook Handlers How Python programs receive instant notifications from other services when something happens — explained without technical jargon.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.