Sanic Framework — Deep Dive

Application structure

A well-organized Sanic application uses the application factory pattern and blueprints:

# server.py
from sanic import Sanic

def create_app() -> Sanic:
    app = Sanic("MyService")
    
    from blueprints.api import api_bp
    from blueprints.health import health_bp
    
    app.blueprint(api_bp)
    app.blueprint(health_bp)
    
    return app

app = create_app()
# blueprints/api.py
from sanic import Blueprint
from sanic.response import json

api_bp = Blueprint("api", url_prefix="/api/v1")

@api_bp.get("/users/<user_id:int>")
async def get_user(request, user_id: int):
    user = await db.fetch_user(user_id)
    if not user:
        return json({"error": "not found"}, status=404)
    return json(user.to_dict())

Blueprint groups let you nest blueprints with shared prefixes and middleware:

from sanic import Blueprint

v1 = Blueprint.group(users_bp, orders_bp, url_prefix="/api/v1")
v2 = Blueprint.group(users_v2_bp, orders_v2_bp, url_prefix="/api/v2")

app.blueprint(v1)
app.blueprint(v2)

Signal system

Sanic’s signal system goes beyond simple events. Signals follow a dotted namespace convention and support dynamic dispatching:

from sanic import Sanic
from sanic.signals import Event

app = Sanic("SignalApp")

# Built-in signals
@app.signal(Event.HTTP_LIFECYCLE_REQUEST)
async def on_request(request, **kwargs):
    request.ctx.start_time = time.perf_counter()

@app.signal(Event.HTTP_LIFECYCLE_RESPONSE)
async def on_response(request, response, **kwargs):
    elapsed = time.perf_counter() - request.ctx.start_time
    response.headers["X-Response-Time"] = f"{elapsed:.4f}"

# Custom signals
@app.signal("order.created.<status>")
async def handle_order(status: str, order_id: str, **kwargs):
    if status == "paid":
        await notify_warehouse(order_id)
    elif status == "cancelled":
        await process_refund(order_id)

# Dispatching custom signals
@app.post("/orders")
async def create_order(request):
    order = await process(request.json)
    await app.dispatch(
        f"order.created.{order.status}",
        context={"order_id": order.id}
    )
    return json({"id": order.id})

Signals with dynamic path segments (<status>) enable pattern-based event routing. This creates a lightweight in-process event bus.

Middleware execution order

Sanic middleware has explicit priority control:

@app.middleware("request", priority=10)
async def high_priority_auth(request):
    token = request.headers.get("Authorization")
    if not token:
        return json({"error": "unauthorized"}, status=401)

@app.middleware("request", priority=5)
async def low_priority_logging(request):
    logger.info(f"{request.method} {request.path}")

Higher priority numbers execute first for request middleware and last for response middleware. This lets you guarantee that authentication runs before logging, which runs before rate limiting — regardless of registration order.

Response middleware executes in reverse order, creating a clean onion-layer pattern:

Request → auth(10) → logging(5) → handler → logging(5) → auth(10) → Response

Streaming responses and requests

For large payloads, Sanic supports true streaming in both directions:

from sanic.response import stream

@app.get("/export")
async def export_data(request):
    async def streaming_fn(response):
        async for batch in db.fetch_batches(1000):
            csv_chunk = format_csv(batch)
            await response.write(csv_chunk)
    
    return stream(streaming_fn, content_type="text/csv")

Request body streaming avoids buffering large uploads in memory:

@app.post("/upload")
async def upload(request):
    total = 0
    async for chunk in request.stream:
        total += len(chunk)
        await storage.write(chunk)
    return json({"bytes_received": total})

This is critical for services handling file uploads or processing large JSON payloads — you can start processing before the full body arrives.

Worker management and the Inspector

Sanic manages multiple worker processes internally:

# Run with 4 workers
app.run(host="0.0.0.0", port=8000, workers=4, access_log=False)

Or from the CLI:

sanic server:app --workers=4 --no-access-log

The Inspector provides runtime introspection without restarting:

app.config.INSPECTOR = True
app.config.INSPECTOR_HOST = "localhost"
app.config.INSPECTOR_PORT = 6457

Then query it:

# Check server state
sanic inspect

# Reload the application
sanic inspect reload

# Trigger shutdown
sanic inspect shutdown

# Custom inspector commands
sanic inspect custom my_command

This is invaluable for production operations — you can check health, reload configuration, or scale workers without downtime.

Dependency injection with extensions

Sanic Extensions adds dependency injection similar to FastAPI:

from sanic_ext import Extend, openapi

@app.get("/users/<user_id:int>")
@openapi.parameter("user_id", int, location="path")
async def get_user(request, user_id: int, db: Database):
    # db is automatically injected
    user = await db.fetch_user(user_id)
    return json(user.to_dict())

# Register the dependency
Extend(app)
app.ext.dependency(Database)

Error handling patterns

Sanic’s error handling supports both exception-based and handler-based approaches:

from sanic.exceptions import SanicException

class NotFoundError(SanicException):
    status_code = 404
    message = "Resource not found"

@app.exception(NotFoundError)
async def handle_not_found(request, exception):
    return json({
        "error": exception.message,
        "path": request.path,
    }, status=exception.status_code)

# Fallback for unhandled exceptions
@app.exception(Exception)
async def handle_unexpected(request, exception):
    logger.exception("Unhandled error", exc_info=exception)
    return json({"error": "internal server error"}, status=500)

Production deployment

For production, configure Sanic with performance-oriented settings:

app.config.update({
    "ACCESS_LOG": False,          # Disable for performance
    "REQUEST_TIMEOUT": 30,        # Seconds to receive full request
    "RESPONSE_TIMEOUT": 60,       # Seconds to send full response
    "KEEP_ALIVE_TIMEOUT": 120,    # Keep-alive connection timeout
    "REQUEST_MAX_SIZE": 10_000_000,  # 10MB max request body
    "GRACEFUL_SHUTDOWN_TIMEOUT": 15,
})

Behind a reverse proxy (nginx/Caddy), configure trusted proxies:

app.config.PROXIES_COUNT = 1
app.config.REAL_IP_HEADER = "X-Forwarded-For"
app.config.FORWARDED_SECRET = "your-secret"

For TLS termination at the application level:

app.run(
    host="0.0.0.0",
    port=443,
    ssl={"cert": "/path/to/cert.pem", "key": "/path/to/key.pem"},
    workers=4,
)

Performance tuning

Key levers for Sanic performance:

  1. Install uvloop: pip install uvloop — Sanic auto-detects it and uses it as the event loop, yielding 2-4x throughput improvement
  2. Disable access logging in production — it’s synchronous I/O that blocks the event loop
  3. Worker count: Start with CPU_cores workers. Unlike sync frameworks, async workers don’t need the 2N+1 formula since each worker handles thousands of concurrent connections
  4. Connection pooling: Use asyncpg for PostgreSQL, aioredis for Redis — initialize pools in the before_server_start listener
  5. Response caching: Sanic Extensions includes @cache() decorator for endpoint-level caching

Lifecycle listeners

@app.before_server_start
async def setup(app, loop):
    app.ctx.db = await asyncpg.create_pool(dsn="postgresql://...")
    app.ctx.redis = await aioredis.from_url("redis://localhost")

@app.after_server_stop
async def teardown(app, loop):
    await app.ctx.db.close()
    await app.ctx.redis.close()

These listeners run once per worker process, so each worker gets its own connection pool — which is exactly what you want for database connections.

The one thing to remember: Sanic’s integrated server, signal-based architecture, and runtime Inspector create a self-contained async ecosystem where your framework, server, and operational tooling are one coordinated unit rather than a stack of loosely coupled pieces.

pythonweb-frameworksasyncsanic

See Also

  • Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
  • Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
  • Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
  • Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
  • Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.