Tornado Framework — Deep Dive

Application architecture

A production Tornado application uses the Application class to configure routing, settings, and middleware-like transforms:

import tornado.web
import tornado.ioloop

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/", MainHandler),
            (r"/api/users/(\d+)", UserHandler),
            (r"/ws", ChatWebSocket),
        ]
        settings = {
            "debug": False,
            "cookie_secret": os.environ["COOKIE_SECRET"],
            "xsrf_cookies": True,
            "compress_response": True,
        }
        super().__init__(handlers, **settings)
        
        # Initialize shared resources
        self.db = motor.motor_tornado.MotorClient()["mydb"]
        self.redis = aioredis.from_url("redis://localhost")

Notice the regex-based routing — (\d+) captures a numeric user ID and passes it as an argument to the handler method. This is more flexible than path-parameter routing but less readable for simple cases.

RequestHandler lifecycle

Understanding the handler lifecycle is essential for proper Tornado development:

class UserHandler(tornado.web.RequestHandler):
    def initialize(self, db):
        """Called per-request with kwargs from URLSpec."""
        self.db = db
    
    async def prepare(self):
        """Runs before any HTTP method handler. Good for auth."""
        token = self.request.headers.get("Authorization", "")
        self.current_user = await self.verify_token(token)
        if not self.current_user:
            self.set_status(401)
            self.finish({"error": "unauthorized"})
    
    async def get(self, user_id):
        user = await self.db.users.find_one({"_id": int(user_id)})
        if not user:
            raise tornado.web.HTTPError(404)
        self.write({"id": user["_id"], "name": user["name"]})
    
    def on_finish(self):
        """Called after response is sent. Cleanup, metrics, logging."""
        elapsed = self.request.request_time()
        logger.info(f"{self.request.method} {self.request.uri} {self._status_code} {elapsed:.3f}s")
    
    def write_error(self, status_code, **kwargs):
        """Override to customize error responses."""
        if "exc_info" in kwargs:
            exc = kwargs["exc_info"][1]
            if isinstance(exc, tornado.web.HTTPError) and exc.reason:
                self.write({"error": exc.reason})
                return
        self.write({"error": "internal server error"})

The lifecycle order: initialize()prepare()get/post/put/delete()on_finish(). If prepare() calls self.finish(), the HTTP method handler is skipped.

WebSocket implementation

Tornado’s WebSocket support is mature and battle-tested:

import json
from tornado.websocket import WebSocketHandler

class ChatWebSocket(WebSocketHandler):
    # Class-level set of all connected clients
    connections = set()
    
    def check_origin(self, origin):
        """Control CORS for WebSocket connections."""
        allowed = ["https://myapp.com", "https://staging.myapp.com"]
        return origin in allowed
    
    def open(self):
        """Called when WebSocket connection is established."""
        self.connections.add(self)
        self.user_id = self.get_argument("user_id", None)
        logger.info(f"WebSocket opened: {self.user_id}")
    
    def on_message(self, message):
        """Called when a message is received."""
        try:
            data = json.loads(message)
        except json.JSONDecodeError:
            self.write_message({"error": "invalid JSON"})
            return
        
        # Broadcast to all connected clients
        for conn in self.connections:
            if conn is not self:
                try:
                    conn.write_message({
                        "from": self.user_id,
                        "text": data.get("text", ""),
                    })
                except Exception:
                    logger.warning("Failed to send to connection")
    
    def on_close(self):
        """Called when WebSocket is closed."""
        self.connections.discard(self)
        logger.info(f"WebSocket closed: {self.user_id}, code: {self.close_code}")
    
    def on_ping(self, data):
        """Custom ping handling for connection health."""
        pass  # Tornado auto-responds with pong

For production WebSocket servers, add heartbeat detection:

class HeartbeatWebSocket(WebSocketHandler):
    HEARTBEAT_INTERVAL = 30  # seconds
    HEARTBEAT_TIMEOUT = 10
    
    def open(self):
        self.last_pong = time.time()
        self.heartbeat_handle = tornado.ioloop.PeriodicCallback(
            self.send_heartbeat, self.HEARTBEAT_INTERVAL * 1000
        )
        self.heartbeat_handle.start()
    
    def send_heartbeat(self):
        if time.time() - self.last_pong > self.HEARTBEAT_INTERVAL + self.HEARTBEAT_TIMEOUT:
            logger.warning("Client heartbeat timeout, closing")
            self.close()
            return
        self.ping(b"heartbeat")
    
    def on_pong(self, data):
        self.last_pong = time.time()
    
    def on_close(self):
        if hasattr(self, "heartbeat_handle"):
            self.heartbeat_handle.stop()

Coroutine patterns and concurrency

Tornado supports native async/await coroutines and integrates with asyncio:

import asyncio
from tornado.httpclient import AsyncHTTPClient

class AggregatorHandler(tornado.web.RequestHandler):
    async def get(self):
        """Fetch from multiple services concurrently."""
        client = AsyncHTTPClient()
        
        # Run requests in parallel
        users_future = client.fetch("http://users-service/api/users")
        orders_future = client.fetch("http://orders-service/api/orders")
        
        users_resp, orders_resp = await asyncio.gather(
            users_future, orders_future,
            return_exceptions=True
        )
        
        result = {}
        if not isinstance(users_resp, Exception):
            result["users"] = json.loads(users_resp.body)
        if not isinstance(orders_resp, Exception):
            result["orders"] = json.loads(orders_resp.body)
        
        self.write(result)

For CPU-bound work, offload to a thread pool:

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

class ImageHandler(tornado.web.RequestHandler):
    async def post(self):
        image_data = self.request.body
        # Run CPU-intensive work in thread pool
        result = await asyncio.get_event_loop().run_in_executor(
            executor, process_image, image_data
        )
        self.write({"processed": True, "size": len(result)})

Streaming responses

Tornado supports chunked transfer encoding for streaming:

class StreamHandler(tornado.web.RequestHandler):
    async def get(self):
        self.set_header("Content-Type", "text/event-stream")
        self.set_header("Cache-Control", "no-cache")
        
        while True:
            event = await event_queue.get()
            self.write(f"data: {json.dumps(event)}\n\n")
            await self.flush()
    
    def on_connection_close(self):
        """Called when client disconnects during streaming."""
        logger.info("Client disconnected from stream")

The flush() call pushes buffered data to the client immediately. The on_connection_close() callback fires when the client disconnects, letting you clean up resources.

Multi-process deployment

Tornado provides built-in multi-process support:

import tornado.httpserver
import tornado.process

def main():
    app = Application()
    server = tornado.httpserver.HTTPServer(app)
    server.bind(8000)
    
    # Fork worker processes
    tornado.process.fork_processes(num_processes=0)  # 0 = one per CPU
    
    # Each worker runs its own IOLoop
    tornado.ioloop.IOLoop.current().start()

However, for production, running separate processes managed by systemd or a process manager is more robust:

# systemd service with multiple instances
# tornado@.service
[Service]
ExecStart=/usr/bin/python -m myapp --port=80%i
# Then enable tornado@01, tornado@02, tornado@03, tornado@04

Put nginx in front for load balancing:

upstream tornado_backend {
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
    server 127.0.0.1:8003;
    server 127.0.0.1:8004;
}

server {
    listen 80;
    location / {
        proxy_pass http://tornado_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

The proxy_set_header Upgrade and Connection "upgrade" lines are essential for WebSocket proxying.

Testing Tornado applications

Tornado provides a dedicated test framework:

from tornado.testing import AsyncHTTPTestCase
import tornado.testing

class TestUserAPI(AsyncHTTPTestCase):
    def get_app(self):
        return Application()
    
    def test_get_user(self):
        response = self.fetch("/api/users/1")
        self.assertEqual(response.code, 200)
        data = json.loads(response.body)
        self.assertIn("name", data)
    
    @tornado.testing.gen_test
    async def test_websocket(self):
        ws_url = f"ws://localhost:{self.get_http_port()}/ws?user_id=test"
        conn = await tornado.websocket.websocket_connect(ws_url)
        conn.write_message(json.dumps({"text": "hello"}))
        response = await conn.read_message()
        self.assertIsNotNone(response)
        conn.close()

The AsyncHTTPTestCase starts a real Tornado server on a random port, so tests exercise the full networking stack.

Performance characteristics

Tornado’s performance profile differs from ASGI frameworks:

  • Startup time: Fast — no auto-discovery, no import scanning
  • Memory per connection: ~2-5KB for idle keep-alive, ~10-50KB for active WebSockets
  • Throughput: Competitive for I/O-bound workloads; 20,000-50,000 requests/second for simple JSON responses on modern hardware
  • Latency: P99 latency stays stable under load because the single-threaded event loop avoids thread contention
  • Scalability wall: Hits CPU limits before memory limits, typically around 50,000-100,000 concurrent connections per process depending on message rate

The one thing to remember: Tornado’s strength lies in its battle-tested event loop, mature WebSocket support, and fine-grained control over the networking stack — making it the framework of choice when you need predictable behavior under high-concurrency, long-lived connection workloads.

pythonweb-frameworksasynctornado

See Also

  • Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
  • Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
  • Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
  • Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
  • Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.