Python Crypto Trading Bots — Deep Dive

Production bot architecture

A toy bot polls prices in a loop. A production bot is an event-driven system with separated concerns:

WebSocket Feed ──→ Market Data Store ──→ Signal Generator
                         ↓                      ↓
                   Feature Engine         Order Manager

                                         Risk Engine

                                    Exchange Connector (REST/WS)

                                      Execution Reporter

                                   Logging / Monitoring / Alerts

Each component communicates through async queues or an internal event bus. This isolation means a bug in the feature engine doesn’t crash the order manager.

Real-time data with WebSockets

REST polling introduces latency and wastes API rate limits. WebSocket feeds provide instant updates:

import asyncio
import ccxt.pro as ccxtpro

class MarketDataFeed:
    def __init__(self, exchange_id, symbols):
        self.exchange = getattr(ccxtpro, exchange_id)({
            "apiKey": "...", "secret": "...",
        })
        self.symbols = symbols
        self.orderbooks = {}
        self.trades = {}

    async def stream_orderbooks(self):
        while True:
            for symbol in self.symbols:
                try:
                    ob = await self.exchange.watch_order_book(symbol)
                    self.orderbooks[symbol] = {
                        "bids": ob["bids"][:20],
                        "asks": ob["asks"][:20],
                        "timestamp": ob["timestamp"],
                    }
                except Exception as e:
                    logger.error(f"OrderBook error {symbol}: {e}")
                    await asyncio.sleep(1)

    async def stream_trades(self):
        while True:
            for symbol in self.symbols:
                try:
                    trades = await self.exchange.watch_trades(symbol)
                    self.trades[symbol] = trades[-100:]
                except Exception as e:
                    logger.error(f"Trades error {symbol}: {e}")
                    await asyncio.sleep(1)

ccxt.pro (the async version of ccxt) provides WebSocket wrappers for all major exchanges with automatic reconnection.

Order management system

A proper order manager tracks the full lifecycle of every order:

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime

class OrderState(Enum):
    PENDING = "pending"
    SUBMITTED = "submitted"
    PARTIALLY_FILLED = "partially_filled"
    FILLED = "filled"
    CANCELLED = "cancelled"
    REJECTED = "rejected"

@dataclass
class ManagedOrder:
    id: str
    symbol: str
    side: str
    order_type: str
    quantity: float
    price: float
    state: OrderState = OrderState.PENDING
    filled_qty: float = 0
    avg_fill_price: float = 0
    exchange_id: str = ""
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)

class OrderManager:
    def __init__(self, exchange, risk_engine):
        self.exchange = exchange
        self.risk = risk_engine
        self.orders = {}
        self.position = {}

    async def submit_order(self, order: ManagedOrder):
        # Risk check before submission
        approved, reason = self.risk.check(order, self.position)
        if not approved:
            order.state = OrderState.REJECTED
            logger.warning(f"Order rejected: {reason}")
            return order

        try:
            result = await self.exchange.create_order(
                order.symbol, order.order_type, order.side,
                order.quantity, order.price
            )
            order.exchange_id = result["id"]
            order.state = OrderState.SUBMITTED
            self.orders[order.id] = order
        except Exception as e:
            order.state = OrderState.REJECTED
            logger.error(f"Order submission failed: {e}")

        return order

    async def sync_orders(self):
        """Poll exchange for order updates."""
        for order_id, order in self.orders.items():
            if order.state in (OrderState.SUBMITTED, OrderState.PARTIALLY_FILLED):
                try:
                    exchange_order = await self.exchange.fetch_order(
                        order.exchange_id, order.symbol
                    )
                    order.filled_qty = exchange_order["filled"]
                    order.avg_fill_price = exchange_order.get("average", 0)

                    if exchange_order["status"] == "closed":
                        order.state = OrderState.FILLED
                        self._update_position(order)
                    elif exchange_order["filled"] > 0:
                        order.state = OrderState.PARTIALLY_FILLED
                except Exception as e:
                    logger.error(f"Order sync failed for {order_id}: {e}")

Advanced backtesting with vectorized operations

Simple loop-based backtesting is slow for long histories. Vectorized backtesting with numpy processes millions of candles in seconds:

import numpy as np
import pandas as pd

class VectorizedBacktest:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.close = data["close"].values

    def sma_crossover(self, fast_period=10, slow_period=30):
        fast_sma = pd.Series(self.close).rolling(fast_period).mean().values
        slow_sma = pd.Series(self.close).rolling(slow_period).mean().values

        # Signal: 1 when fast > slow (bullish), -1 when fast < slow
        signals = np.where(fast_sma > slow_sma, 1, -1)

        # Only trade on crossover points
        positions = np.diff(signals, prepend=signals[0])
        entries = np.where(positions == 2, 1, 0)   # Short to long
        exits = np.where(positions == -2, -1, 0)    # Long to short

        return entries + exits

    def calculate_returns(self, signals, commission=0.001):
        """Calculate strategy returns accounting for commissions."""
        price_returns = np.diff(self.close) / self.close[:-1]

        # Position: 1 = long, 0 = flat, based on signals
        position = np.zeros(len(self.close))
        for i in range(1, len(signals)):
            if signals[i] == 1:
                position[i] = 1
            elif signals[i] == -1:
                position[i] = 0
            else:
                position[i] = position[i-1]

        strategy_returns = position[:-1] * price_returns

        # Deduct commissions on trades
        trades = np.abs(np.diff(position, prepend=0))
        strategy_returns -= trades[:-1] * commission

        cumulative = np.cumprod(1 + strategy_returns)
        return {
            "total_return": (cumulative[-1] - 1) * 100,
            "max_drawdown": self._max_drawdown(cumulative) * 100,
            "sharpe_ratio": self._sharpe(strategy_returns),
            "num_trades": int(trades.sum()),
        }

    def _max_drawdown(self, cumulative):
        peak = np.maximum.accumulate(cumulative)
        drawdown = (peak - cumulative) / peak
        return drawdown.max()

    def _sharpe(self, returns, risk_free=0, periods=365):
        excess = returns - risk_free / periods
        if excess.std() == 0:
            return 0
        return np.sqrt(periods) * excess.mean() / excess.std()

Walk-forward optimization

Backtesting on the full dataset leads to overfitting. Walk-forward testing trains on past data and tests on unseen future data:

def walk_forward_test(data, strategy_class, train_window=180, test_window=30):
    results = []
    total_days = len(data)

    for start in range(0, total_days - train_window - test_window, test_window):
        train_data = data[start:start + train_window]
        test_data = data[start + train_window:start + train_window + test_window]

        # Optimize parameters on training set
        strategy = strategy_class()
        best_params = strategy.optimize(train_data)

        # Test with optimized parameters on unseen data
        strategy.set_params(best_params)
        period_return = strategy.backtest(test_data)
        results.append({
            "period_start": test_data.index[0],
            "period_end": test_data.index[-1],
            "return_pct": period_return,
            "params": best_params,
        })

    return pd.DataFrame(results)

Latency optimization

In competitive markets, milliseconds matter. Key optimizations:

  • Connection pooling: Reuse HTTP sessions and WebSocket connections.
  • Local order book maintenance: Instead of fetching the full order book, subscribe to incremental updates and maintain a local copy.
  • Pre-signed transactions: For DEX trading, pre-compute transaction calldata so you only need to broadcast when the signal fires.
  • Colocation: Run your bot on a server geographically close to the exchange’s data center.
class LocalOrderBook:
    def __init__(self):
        self.bids = {}  # price → quantity
        self.asks = {}

    def apply_update(self, update):
        for price, qty in update.get("bids", []):
            if qty == 0:
                self.bids.pop(price, None)
            else:
                self.bids[price] = qty
        for price, qty in update.get("asks", []):
            if qty == 0:
                self.asks.pop(price, None)
            else:
                self.asks[price] = qty

    def best_bid(self):
        return max(self.bids.keys()) if self.bids else 0

    def best_ask(self):
        return min(self.asks.keys()) if self.asks else float("inf")

    def spread(self):
        return self.best_ask() - self.best_bid()

Operational monitoring

Production bots need comprehensive monitoring:

from prometheus_client import Counter, Gauge, Histogram

orders_total = Counter("bot_orders_total", "Total orders", ["side", "status"])
portfolio_value = Gauge("bot_portfolio_value_usd", "Portfolio value in USD")
order_latency = Histogram("bot_order_latency_seconds", "Order round-trip time")
pnl_gauge = Gauge("bot_pnl_usd", "Unrealized + realized PnL")
drawdown_gauge = Gauge("bot_drawdown_pct", "Current drawdown percentage")

class MonitoredBot:
    def on_order_filled(self, order):
        orders_total.labels(side=order.side, status="filled").inc()
        self._update_pnl()

    def on_tick(self):
        portfolio_value.set(self.calculate_portfolio_value())
        pnl_gauge.set(self.calculate_pnl())
        drawdown_gauge.set(self.calculate_drawdown())

Set alerts for: drawdown exceeding threshold, no trades for N hours (possible connectivity issue), exchange API errors exceeding rate, and unexpected position sizes.

One thing to remember

A production crypto trading bot is an event-driven system where the execution infrastructure (WebSocket feeds, order management, position tracking, monitoring) is more complex and more critical than the trading strategy itself — most failures come from engineering problems, not bad signals.

pythonblockchainproduction

See Also