Tornado Framework — Deep Dive
Application architecture
A production Tornado application uses the Application class to configure routing, settings, and middleware-like transforms:
import tornado.web
import tornado.ioloop
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/api/users/(\d+)", UserHandler),
(r"/ws", ChatWebSocket),
]
settings = {
"debug": False,
"cookie_secret": os.environ["COOKIE_SECRET"],
"xsrf_cookies": True,
"compress_response": True,
}
super().__init__(handlers, **settings)
# Initialize shared resources
self.db = motor.motor_tornado.MotorClient()["mydb"]
self.redis = aioredis.from_url("redis://localhost")
Notice the regex-based routing — (\d+) captures a numeric user ID and passes it as an argument to the handler method. This is more flexible than path-parameter routing but less readable for simple cases.
RequestHandler lifecycle
Understanding the handler lifecycle is essential for proper Tornado development:
class UserHandler(tornado.web.RequestHandler):
def initialize(self, db):
"""Called per-request with kwargs from URLSpec."""
self.db = db
async def prepare(self):
"""Runs before any HTTP method handler. Good for auth."""
token = self.request.headers.get("Authorization", "")
self.current_user = await self.verify_token(token)
if not self.current_user:
self.set_status(401)
self.finish({"error": "unauthorized"})
async def get(self, user_id):
user = await self.db.users.find_one({"_id": int(user_id)})
if not user:
raise tornado.web.HTTPError(404)
self.write({"id": user["_id"], "name": user["name"]})
def on_finish(self):
"""Called after response is sent. Cleanup, metrics, logging."""
elapsed = self.request.request_time()
logger.info(f"{self.request.method} {self.request.uri} {self._status_code} {elapsed:.3f}s")
def write_error(self, status_code, **kwargs):
"""Override to customize error responses."""
if "exc_info" in kwargs:
exc = kwargs["exc_info"][1]
if isinstance(exc, tornado.web.HTTPError) and exc.reason:
self.write({"error": exc.reason})
return
self.write({"error": "internal server error"})
The lifecycle order: initialize() → prepare() → get/post/put/delete() → on_finish(). If prepare() calls self.finish(), the HTTP method handler is skipped.
WebSocket implementation
Tornado’s WebSocket support is mature and battle-tested:
import json
from tornado.websocket import WebSocketHandler
class ChatWebSocket(WebSocketHandler):
# Class-level set of all connected clients
connections = set()
def check_origin(self, origin):
"""Control CORS for WebSocket connections."""
allowed = ["https://myapp.com", "https://staging.myapp.com"]
return origin in allowed
def open(self):
"""Called when WebSocket connection is established."""
self.connections.add(self)
self.user_id = self.get_argument("user_id", None)
logger.info(f"WebSocket opened: {self.user_id}")
def on_message(self, message):
"""Called when a message is received."""
try:
data = json.loads(message)
except json.JSONDecodeError:
self.write_message({"error": "invalid JSON"})
return
# Broadcast to all connected clients
for conn in self.connections:
if conn is not self:
try:
conn.write_message({
"from": self.user_id,
"text": data.get("text", ""),
})
except Exception:
logger.warning("Failed to send to connection")
def on_close(self):
"""Called when WebSocket is closed."""
self.connections.discard(self)
logger.info(f"WebSocket closed: {self.user_id}, code: {self.close_code}")
def on_ping(self, data):
"""Custom ping handling for connection health."""
pass # Tornado auto-responds with pong
For production WebSocket servers, add heartbeat detection:
class HeartbeatWebSocket(WebSocketHandler):
HEARTBEAT_INTERVAL = 30 # seconds
HEARTBEAT_TIMEOUT = 10
def open(self):
self.last_pong = time.time()
self.heartbeat_handle = tornado.ioloop.PeriodicCallback(
self.send_heartbeat, self.HEARTBEAT_INTERVAL * 1000
)
self.heartbeat_handle.start()
def send_heartbeat(self):
if time.time() - self.last_pong > self.HEARTBEAT_INTERVAL + self.HEARTBEAT_TIMEOUT:
logger.warning("Client heartbeat timeout, closing")
self.close()
return
self.ping(b"heartbeat")
def on_pong(self, data):
self.last_pong = time.time()
def on_close(self):
if hasattr(self, "heartbeat_handle"):
self.heartbeat_handle.stop()
Coroutine patterns and concurrency
Tornado supports native async/await coroutines and integrates with asyncio:
import asyncio
from tornado.httpclient import AsyncHTTPClient
class AggregatorHandler(tornado.web.RequestHandler):
async def get(self):
"""Fetch from multiple services concurrently."""
client = AsyncHTTPClient()
# Run requests in parallel
users_future = client.fetch("http://users-service/api/users")
orders_future = client.fetch("http://orders-service/api/orders")
users_resp, orders_resp = await asyncio.gather(
users_future, orders_future,
return_exceptions=True
)
result = {}
if not isinstance(users_resp, Exception):
result["users"] = json.loads(users_resp.body)
if not isinstance(orders_resp, Exception):
result["orders"] = json.loads(orders_resp.body)
self.write(result)
For CPU-bound work, offload to a thread pool:
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
class ImageHandler(tornado.web.RequestHandler):
async def post(self):
image_data = self.request.body
# Run CPU-intensive work in thread pool
result = await asyncio.get_event_loop().run_in_executor(
executor, process_image, image_data
)
self.write({"processed": True, "size": len(result)})
Streaming responses
Tornado supports chunked transfer encoding for streaming:
class StreamHandler(tornado.web.RequestHandler):
async def get(self):
self.set_header("Content-Type", "text/event-stream")
self.set_header("Cache-Control", "no-cache")
while True:
event = await event_queue.get()
self.write(f"data: {json.dumps(event)}\n\n")
await self.flush()
def on_connection_close(self):
"""Called when client disconnects during streaming."""
logger.info("Client disconnected from stream")
The flush() call pushes buffered data to the client immediately. The on_connection_close() callback fires when the client disconnects, letting you clean up resources.
Multi-process deployment
Tornado provides built-in multi-process support:
import tornado.httpserver
import tornado.process
def main():
app = Application()
server = tornado.httpserver.HTTPServer(app)
server.bind(8000)
# Fork worker processes
tornado.process.fork_processes(num_processes=0) # 0 = one per CPU
# Each worker runs its own IOLoop
tornado.ioloop.IOLoop.current().start()
However, for production, running separate processes managed by systemd or a process manager is more robust:
# systemd service with multiple instances
# tornado@.service
[Service]
ExecStart=/usr/bin/python -m myapp --port=80%i
# Then enable tornado@01, tornado@02, tornado@03, tornado@04
Put nginx in front for load balancing:
upstream tornado_backend {
server 127.0.0.1:8001;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
server 127.0.0.1:8004;
}
server {
listen 80;
location / {
proxy_pass http://tornado_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
The proxy_set_header Upgrade and Connection "upgrade" lines are essential for WebSocket proxying.
Testing Tornado applications
Tornado provides a dedicated test framework:
from tornado.testing import AsyncHTTPTestCase
import tornado.testing
class TestUserAPI(AsyncHTTPTestCase):
def get_app(self):
return Application()
def test_get_user(self):
response = self.fetch("/api/users/1")
self.assertEqual(response.code, 200)
data = json.loads(response.body)
self.assertIn("name", data)
@tornado.testing.gen_test
async def test_websocket(self):
ws_url = f"ws://localhost:{self.get_http_port()}/ws?user_id=test"
conn = await tornado.websocket.websocket_connect(ws_url)
conn.write_message(json.dumps({"text": "hello"}))
response = await conn.read_message()
self.assertIsNotNone(response)
conn.close()
The AsyncHTTPTestCase starts a real Tornado server on a random port, so tests exercise the full networking stack.
Performance characteristics
Tornado’s performance profile differs from ASGI frameworks:
- Startup time: Fast — no auto-discovery, no import scanning
- Memory per connection: ~2-5KB for idle keep-alive, ~10-50KB for active WebSockets
- Throughput: Competitive for I/O-bound workloads; 20,000-50,000 requests/second for simple JSON responses on modern hardware
- Latency: P99 latency stays stable under load because the single-threaded event loop avoids thread contention
- Scalability wall: Hits CPU limits before memory limits, typically around 50,000-100,000 concurrent connections per process depending on message rate
The one thing to remember: Tornado’s strength lies in its battle-tested event loop, mature WebSocket support, and fine-grained control over the networking stack — making it the framework of choice when you need predictable behavior under high-concurrency, long-lived connection workloads.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.