Robyn Rust-Python Framework — Deep Dive
Internal architecture
Understanding Robyn requires understanding how Rust and Python communicate:
The PyO3 bridge
PyO3 is a Rust crate that provides bindings to the Python interpreter. When Robyn starts:
- The Rust binary initializes a Tokio async runtime
- It starts a Python interpreter via PyO3
- Your Python app file is loaded, registering routes with the Rust router
- The Rust HTTP server starts listening on the specified port
When a request arrives:
- Rust’s actix runtime accepts the TCP connection (zero Python involvement)
- Rust parses the HTTP request (headers, path, query string)
- Rust matches the URL against compiled route patterns
- The matched Python handler is called via PyO3
- If the handler is async, it runs on Python’s asyncio event loop
- The Python return value is passed back to Rust
- Rust serializes the HTTP response and writes it to the socket
Steps 1-3 and 6-7 happen entirely in Rust. Only step 4-5 involves Python. For a simple JSON endpoint, the Python portion might be 20% of the total request time.
Application structure
A well-organized Robyn application:
from robyn import Robyn, Request, Response
from robyn.robyn import Headers
app = Robyn(__file__)
# Configuration
app.config = {
"DATABASE_URL": os.environ["DATABASE_URL"],
"JWT_SECRET": os.environ["JWT_SECRET"],
}
# Lifecycle management
@app.startup_handler
async def startup():
import asyncpg
app.state["db_pool"] = await asyncpg.create_pool(
app.config["DATABASE_URL"],
min_size=5,
max_size=20,
)
@app.shutdown_handler
async def shutdown():
await app.state["db_pool"].close()
Routing and handlers
Robyn supports path parameters, query parameters, and request body parsing:
@app.get("/api/v1/users")
async def list_users(request: Request):
"""Query parameters from request."""
page = int(request.query_params.get("page", "1"))
per_page = min(int(request.query_params.get("per_page", "20")), 100)
pool = app.state["db_pool"]
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, name, email FROM users LIMIT $1 OFFSET $2",
per_page, (page - 1) * per_page
)
total = await conn.fetchval("SELECT COUNT(*) FROM users")
return Response(
status_code=200,
headers=Headers({"Content-Type": "application/json"}),
body=json.dumps({
"data": [dict(r) for r in rows],
"pagination": {"page": page, "total": total},
}),
)
@app.post("/api/v1/users")
async def create_user(request: Request):
"""JSON body parsing."""
data = request.json()
# Validation
if not data.get("name") or not data.get("email"):
return Response(
status_code=422,
body=json.dumps({"error": "name and email required"}),
)
pool = app.state["db_pool"]
async with pool.acquire() as conn:
user_id = await conn.fetchval(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
data["name"], data["email"],
)
return Response(
status_code=201,
body=json.dumps({"id": user_id, "name": data["name"]}),
)
@app.put("/api/v1/users/:user_id")
async def update_user(request: Request):
user_id = int(request.path_params["user_id"])
data = request.json()
pool = app.state["db_pool"]
async with pool.acquire() as conn:
result = await conn.execute(
"UPDATE users SET name = $1 WHERE id = $2",
data.get("name"), user_id,
)
if result == "UPDATE 0":
return Response(status_code=404, body='{"error": "not found"}')
return Response(status_code=200, body=json.dumps({"updated": True}))
Middleware chain
Robyn’s middleware system supports request and response interception:
@app.before_request()
async def request_timer(request: Request):
"""Attach timing info to request."""
request.headers["X-Start-Time"] = str(time.perf_counter())
return request
@app.before_request("/api/*")
async def authenticate(request: Request):
"""Auth middleware for API routes only."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return Response(
status_code=401,
body='{"error": "missing token"}',
)
token = auth_header[7:]
try:
payload = jwt.decode(token, app.config["JWT_SECRET"], algorithms=["HS256"])
request.headers["X-User-ID"] = str(payload["user_id"])
except jwt.InvalidTokenError:
return Response(
status_code=401,
body='{"error": "invalid token"}',
)
return request
@app.after_request()
async def add_timing_header(response: Response):
"""Add response timing header."""
response.headers.set("X-Powered-By", "Robyn")
return response
When a before_request handler returns a Response instead of a Request, the middleware chain short-circuits and the response is sent immediately.
WebSocket support
Robyn provides WebSocket handling through the Rust layer:
from robyn import WebSocket
websocket = WebSocket(app, "/ws")
@websocket.on("connect")
async def ws_connect(ws):
print(f"Client connected: {ws.id}")
return "Connected"
@websocket.on("message")
async def ws_message(ws, message):
"""Handle incoming WebSocket messages."""
try:
data = json.loads(message)
except json.JSONDecodeError:
return "Invalid JSON"
action = data.get("action")
if action == "broadcast":
# Send to all connected clients
for client_id in websocket.active_connections:
websocket.send_to(client_id, json.dumps({
"from": ws.id,
"text": data.get("text", ""),
}))
elif action == "echo":
return json.dumps({"echo": data.get("text", "")})
return ""
@websocket.on("close")
async def ws_close(ws):
print(f"Client disconnected: {ws.id}")
return "Disconnected"
WebSocket connections are managed by the Rust runtime, so connection handling (accept, close, frame parsing) happens at Rust speed.
Multi-process scaling
Robyn supports running multiple worker processes:
app.start(
host="0.0.0.0",
port=8000,
processes=4, # Number of worker processes
workers=2, # Number of Rust threads per process
)
Each process has its own:
- Rust async runtime with
workersthreads - Python interpreter (via fork)
- Connection pool (initialize in
startup_handler)
This sidesteps the GIL entirely — each process has an independent GIL, and within each process, the Rust threads handle I/O without touching the GIL.
The recommended formula:
processes: Number of CPU coresworkers: 1-2 per process for I/O-bound workloads
Static file serving
Robyn serves static files directly from Rust, bypassing Python entirely:
from robyn import Robyn, ALLOW_CORS
app = Robyn(__file__)
# Serve static directory
app.serve_directory(
route="/static",
directory_path="./public",
index_file="index.html",
)
This is significantly faster than Python-based static file serving because the file I/O and HTTP response construction happen in Rust.
Testing
from robyn.test import TestClient
import pytest
@pytest.fixture
def client():
return TestClient(app)
def test_list_users(client):
response = client.get("/api/v1/users")
assert response.status_code == 200
data = response.json()
assert "data" in data
def test_create_user(client):
response = client.post(
"/api/v1/users",
json={"name": "Alice", "email": "alice@test.com"},
)
assert response.status_code == 201
assert "id" in response.json()
def test_unauthorized(client):
response = client.get("/api/v1/users")
# Without auth header
assert response.status_code == 401
Production deployment
Robyn includes its own production-ready server (the Rust layer), so no separate ASGI/WSGI server is needed:
# Direct
python app.py --processes 4 --workers 2 --log-level WARNING
# With environment variables
DATABASE_URL=postgresql://... JWT_SECRET=... python app.py
Behind nginx:
upstream robyn {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
}
server {
listen 443 ssl http2;
server_name api.example.com;
location / {
proxy_pass http://robyn;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
Benchmarking reality
Real-world performance testing reveals where Robyn’s Rust layer matters:
# Simple JSON response (framework overhead dominates)
Robyn: ~85,000 req/s
FastAPI: ~25,000 req/s
Flask: ~8,000 req/s
# Database query endpoint (I/O dominates)
Robyn: ~4,200 req/s
FastAPI: ~3,800 req/s
Flask: ~2,100 req/s (threaded)
# CPU-heavy endpoint (Python speed dominates)
Robyn: ~1,200 req/s
FastAPI: ~1,100 req/s
Flask: ~900 req/s
The Rust advantage is most visible in the simple JSON case where framework overhead is the primary cost. As handler complexity increases, the difference narrows because Python execution dominates.
Tradeoffs to understand
Advantages:
- Dramatically faster than pure-Python frameworks for simple endpoints
- Built-in production server (no uvicorn/gunicorn needed)
- Multi-process scaling without external process managers
- Growing ecosystem with active development
Tradeoffs:
- Smaller community than FastAPI or Flask
- Debugging framework internals requires Rust knowledge
- Binary distribution means platform-specific wheels
- No ASGI compatibility (can’t use ASGI middleware ecosystem)
- Fewer production case studies to learn from
The one thing to remember: Robyn represents a new approach to Python web frameworks where the performance-critical path runs in Rust — delivering 2-5x throughput improvements for framework-bound workloads while keeping the developer experience firmly in Python territory.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.