API Pagination Clients — Deep Dive

Building a universal pagination consumer

Most Python codebases interact with multiple paginated APIs, each using a different scheme. Building a reusable pagination abstraction saves significant duplication and prevents the bugs that come with hand-rolled pagination loops.

Cursor-based pagination with generators

The most common modern pattern. The generator fetches pages lazily and yields items one at a time:

import httpx
from typing import Iterator, Any


def paginate_cursor(
    client: httpx.Client,
    url: str,
    params: dict[str, Any] | None = None,
    page_size: int = 100,
    cursor_field: str = "next_cursor",
    data_field: str = "data",
) -> Iterator[dict]:
    params = dict(params or {})
    params["limit"] = page_size
    cursor: str | None = None

    while True:
        if cursor:
            params["cursor"] = cursor

        resp = client.get(url, params=params)
        resp.raise_for_status()
        body = resp.json()

        items = body.get(data_field, [])
        yield from items

        cursor = body.get(cursor_field)
        if not cursor or len(items) < page_size:
            break

Usage: for user in paginate_cursor(client, "/users"):. Memory stays proportional to one page. The caller doesn’t know or care about pagination mechanics.

Async cursor pagination

For httpx.AsyncClient, use async generators:

import httpx
from typing import AsyncIterator, Any


async def async_paginate_cursor(
    client: httpx.AsyncClient,
    url: str,
    params: dict[str, Any] | None = None,
    page_size: int = 100,
    cursor_field: str = "next_cursor",
    data_field: str = "data",
) -> AsyncIterator[dict]:
    params = dict(params or {})
    params["limit"] = page_size
    cursor: str | None = None

    while True:
        if cursor:
            params["cursor"] = cursor

        resp = await client.get(url, params=params)
        resp.raise_for_status()
        body = resp.json()

        items = body.get(data_field, [])
        for item in items:
            yield item

        cursor = body.get(cursor_field)
        if not cursor or len(items) < page_size:
            break

Usage in async context: async for user in async_paginate_cursor(client, "/users"):.

GitHub’s API puts pagination URLs in the Link response header. Parse them to follow the chain:

import httpx
import re
from typing import Iterator


def parse_link_header(header: str) -> dict[str, str]:
    links: dict[str, str] = {}
    for part in header.split(","):
        match = re.match(r'\s*<([^>]+)>;\s*rel="(\w+)"', part.strip())
        if match:
            links[match.group(2)] = match.group(1)
    return links


def paginate_link_header(
    client: httpx.Client,
    url: str,
    params: dict | None = None,
) -> Iterator[dict]:
    resp = client.get(url, params=params)
    resp.raise_for_status()
    yield from resp.json()

    while True:
        link_header = resp.headers.get("Link", "")
        links = parse_link_header(link_header)
        next_url = links.get("next")

        if not next_url:
            break

        resp = client.get(next_url)
        resp.raise_for_status()
        yield from resp.json()

This follows rel="next" links until none remain. The client never constructs pagination URLs manually — it follows whatever the server provides.

Offset-based pagination with consistency guard

Offset pagination is unreliable for changing datasets, but some APIs only offer it. Add a consistency check:

import httpx
from typing import Iterator, Any
import logging

logger = logging.getLogger(__name__)


def paginate_offset(
    client: httpx.Client,
    url: str,
    params: dict[str, Any] | None = None,
    page_size: int = 100,
    max_pages: int = 1000,
) -> Iterator[dict]:
    params = dict(params or {})
    params["limit"] = page_size
    seen_ids: set[str] = set()
    offset = 0

    for page_num in range(max_pages):
        params["offset"] = offset
        resp = client.get(url, params=params)
        resp.raise_for_status()
        items = resp.json().get("data", [])

        if not items:
            break

        duplicates = 0
        for item in items:
            item_id = str(item.get("id", ""))
            if item_id in seen_ids:
                duplicates += 1
                continue
            seen_ids.add(item_id)
            yield item

        if duplicates > 0:
            logger.warning(
                "Page %d had %d duplicate items (dataset changed during pagination)",
                page_num,
                duplicates,
            )

        if len(items) < page_size:
            break
        offset += page_size

The seen_ids set detects duplicates caused by insertions shifting the offset window. The max_pages guard prevents infinite pagination on buggy APIs.

Rate-limit-aware pagination

Aggressive pagination can burn through rate limits. Add adaptive delays:

import httpx
import time
from typing import Iterator, Any


def paginate_with_rate_limit(
    client: httpx.Client,
    url: str,
    params: dict[str, Any] | None = None,
    page_size: int = 100,
    min_delay: float = 0.1,
) -> Iterator[dict]:
    params = dict(params or {})
    params["limit"] = page_size
    cursor: str | None = None

    while True:
        if cursor:
            params["cursor"] = cursor

        resp = client.get(url, params=params)

        # Check rate limit headers
        remaining = int(resp.headers.get("X-RateLimit-Remaining", 100))
        reset_at = float(resp.headers.get("X-RateLimit-Reset", 0))

        if resp.status_code == 429:
            retry_after = float(resp.headers.get("Retry-After", 60))
            time.sleep(retry_after)
            continue

        resp.raise_for_status()
        body = resp.json()
        items = body.get("data", [])
        yield from items

        cursor = body.get("next_cursor")
        if not cursor or len(items) < page_size:
            break

        # Adaptive delay: slow down as rate limit depletes
        if remaining < 10:
            wait = max(reset_at - time.time(), 0) / max(remaining, 1)
            time.sleep(max(wait, min_delay))
        else:
            time.sleep(min_delay)

The adaptive delay distributes remaining requests evenly across the rate limit window. When X-RateLimit-Remaining drops below 10, the code slows down proportionally instead of hitting a hard 429.

Universal paginator class

Combine all schemes into a reusable class:

import httpx
from typing import Iterator, Any, Literal
from dataclasses import dataclass, field


@dataclass
class PaginationConfig:
    scheme: Literal["cursor", "offset", "link"]
    page_size: int = 100
    cursor_field: str = "next_cursor"
    data_field: str = "data"
    max_pages: int = 10000


class Paginator:
    def __init__(self, client: httpx.Client, config: PaginationConfig):
        self._client = client
        self._config = config

    def paginate(
        self, url: str, params: dict[str, Any] | None = None
    ) -> Iterator[dict]:
        if self._config.scheme == "cursor":
            yield from self._cursor(url, params)
        elif self._config.scheme == "offset":
            yield from self._offset(url, params)
        elif self._config.scheme == "link":
            yield from self._link(url, params)

    def _cursor(
        self, url: str, params: dict[str, Any] | None
    ) -> Iterator[dict]:
        params = dict(params or {})
        params["limit"] = self._config.page_size
        cursor: str | None = None
        cfg = self._config

        for _ in range(cfg.max_pages):
            if cursor:
                params["cursor"] = cursor
            resp = self._client.get(url, params=params)
            resp.raise_for_status()
            body = resp.json()
            items = body.get(cfg.data_field, [])
            yield from items
            cursor = body.get(cfg.cursor_field)
            if not cursor or len(items) < cfg.page_size:
                break

    def _offset(
        self, url: str, params: dict[str, Any] | None
    ) -> Iterator[dict]:
        params = dict(params or {})
        params["limit"] = self._config.page_size
        offset = 0
        cfg = self._config

        for _ in range(cfg.max_pages):
            params["offset"] = offset
            resp = self._client.get(url, params=params)
            resp.raise_for_status()
            items = resp.json().get(cfg.data_field, [])
            if not items:
                break
            yield from items
            if len(items) < cfg.page_size:
                break
            offset += cfg.page_size

    def _link(
        self, url: str, params: dict[str, Any] | None
    ) -> Iterator[dict]:
        resp = self._client.get(url, params=params)
        resp.raise_for_status()
        yield from resp.json()

        for _ in range(self._config.max_pages):
            links = parse_link_header(resp.headers.get("Link", ""))
            next_url = links.get("next")
            if not next_url:
                break
            resp = self._client.get(next_url)
            resp.raise_for_status()
            yield from resp.json()

Teams configure pagination once per API and consume it uniformly.

Testing pagination consumers

Test with deterministic multi-page fixtures:

import httpx


def make_paginated_handler(
    items: list[dict], page_size: int = 2
) -> httpx.MockTransport:
    pages = [
        items[i:i + page_size]
        for i in range(0, len(items), page_size)
    ]

    call_count = {"n": 0}

    def handler(request: httpx.Request) -> httpx.Response:
        page_idx = call_count["n"]
        call_count["n"] += 1

        page_items = pages[page_idx] if page_idx < len(pages) else []
        has_next = page_idx + 1 < len(pages)
        cursor = f"cursor_{page_idx + 1}" if has_next else None

        return httpx.Response(
            200,
            json={
                "data": page_items,
                "next_cursor": cursor,
            },
        )

    return httpx.MockTransport(handler)


def test_cursor_pagination():
    test_items = [{"id": i, "name": f"user_{i}"} for i in range(5)]
    transport = make_paginated_handler(test_items, page_size=2)
    client = httpx.Client(
        transport=transport, base_url="https://test.com"
    )

    config = PaginationConfig(scheme="cursor", page_size=2)
    paginator = Paginator(client, config)
    results = list(paginator.paginate("/users"))

    assert len(results) == 5
    assert results[0]["id"] == 0
    assert results[4]["id"] == 4

Test edge cases: empty first page, single-item pages, cursor that becomes null mid-page, and rate limit responses during pagination.

The one thing to remember: A reusable pagination consumer that supports cursor, offset, and link-header schemes — with rate-limit awareness and lazy iteration — eliminates one of the most common sources of boilerplate and bugs in Python API client code.

pythonapisdata

See Also

  • Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
  • Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
  • Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
  • Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
  • Python Beautifulsoup Understand Beautifulsoup through a practical analogy so your Python decisions become faster and clearer.