Python Crypto Trading Bots — Deep Dive
Production bot architecture
A toy bot polls prices in a loop. A production bot is an event-driven system with separated concerns:
WebSocket Feed ──→ Market Data Store ──→ Signal Generator
↓ ↓
Feature Engine Order Manager
↓
Risk Engine
↓
Exchange Connector (REST/WS)
↓
Execution Reporter
↓
Logging / Monitoring / Alerts
Each component communicates through async queues or an internal event bus. This isolation means a bug in the feature engine doesn’t crash the order manager.
Real-time data with WebSockets
REST polling introduces latency and wastes API rate limits. WebSocket feeds provide instant updates:
import asyncio
import ccxt.pro as ccxtpro
class MarketDataFeed:
def __init__(self, exchange_id, symbols):
self.exchange = getattr(ccxtpro, exchange_id)({
"apiKey": "...", "secret": "...",
})
self.symbols = symbols
self.orderbooks = {}
self.trades = {}
async def stream_orderbooks(self):
while True:
for symbol in self.symbols:
try:
ob = await self.exchange.watch_order_book(symbol)
self.orderbooks[symbol] = {
"bids": ob["bids"][:20],
"asks": ob["asks"][:20],
"timestamp": ob["timestamp"],
}
except Exception as e:
logger.error(f"OrderBook error {symbol}: {e}")
await asyncio.sleep(1)
async def stream_trades(self):
while True:
for symbol in self.symbols:
try:
trades = await self.exchange.watch_trades(symbol)
self.trades[symbol] = trades[-100:]
except Exception as e:
logger.error(f"Trades error {symbol}: {e}")
await asyncio.sleep(1)
ccxt.pro (the async version of ccxt) provides WebSocket wrappers for all major exchanges with automatic reconnection.
Order management system
A proper order manager tracks the full lifecycle of every order:
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
class OrderState(Enum):
PENDING = "pending"
SUBMITTED = "submitted"
PARTIALLY_FILLED = "partially_filled"
FILLED = "filled"
CANCELLED = "cancelled"
REJECTED = "rejected"
@dataclass
class ManagedOrder:
id: str
symbol: str
side: str
order_type: str
quantity: float
price: float
state: OrderState = OrderState.PENDING
filled_qty: float = 0
avg_fill_price: float = 0
exchange_id: str = ""
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
class OrderManager:
def __init__(self, exchange, risk_engine):
self.exchange = exchange
self.risk = risk_engine
self.orders = {}
self.position = {}
async def submit_order(self, order: ManagedOrder):
# Risk check before submission
approved, reason = self.risk.check(order, self.position)
if not approved:
order.state = OrderState.REJECTED
logger.warning(f"Order rejected: {reason}")
return order
try:
result = await self.exchange.create_order(
order.symbol, order.order_type, order.side,
order.quantity, order.price
)
order.exchange_id = result["id"]
order.state = OrderState.SUBMITTED
self.orders[order.id] = order
except Exception as e:
order.state = OrderState.REJECTED
logger.error(f"Order submission failed: {e}")
return order
async def sync_orders(self):
"""Poll exchange for order updates."""
for order_id, order in self.orders.items():
if order.state in (OrderState.SUBMITTED, OrderState.PARTIALLY_FILLED):
try:
exchange_order = await self.exchange.fetch_order(
order.exchange_id, order.symbol
)
order.filled_qty = exchange_order["filled"]
order.avg_fill_price = exchange_order.get("average", 0)
if exchange_order["status"] == "closed":
order.state = OrderState.FILLED
self._update_position(order)
elif exchange_order["filled"] > 0:
order.state = OrderState.PARTIALLY_FILLED
except Exception as e:
logger.error(f"Order sync failed for {order_id}: {e}")
Advanced backtesting with vectorized operations
Simple loop-based backtesting is slow for long histories. Vectorized backtesting with numpy processes millions of candles in seconds:
import numpy as np
import pandas as pd
class VectorizedBacktest:
def __init__(self, data: pd.DataFrame):
self.data = data
self.close = data["close"].values
def sma_crossover(self, fast_period=10, slow_period=30):
fast_sma = pd.Series(self.close).rolling(fast_period).mean().values
slow_sma = pd.Series(self.close).rolling(slow_period).mean().values
# Signal: 1 when fast > slow (bullish), -1 when fast < slow
signals = np.where(fast_sma > slow_sma, 1, -1)
# Only trade on crossover points
positions = np.diff(signals, prepend=signals[0])
entries = np.where(positions == 2, 1, 0) # Short to long
exits = np.where(positions == -2, -1, 0) # Long to short
return entries + exits
def calculate_returns(self, signals, commission=0.001):
"""Calculate strategy returns accounting for commissions."""
price_returns = np.diff(self.close) / self.close[:-1]
# Position: 1 = long, 0 = flat, based on signals
position = np.zeros(len(self.close))
for i in range(1, len(signals)):
if signals[i] == 1:
position[i] = 1
elif signals[i] == -1:
position[i] = 0
else:
position[i] = position[i-1]
strategy_returns = position[:-1] * price_returns
# Deduct commissions on trades
trades = np.abs(np.diff(position, prepend=0))
strategy_returns -= trades[:-1] * commission
cumulative = np.cumprod(1 + strategy_returns)
return {
"total_return": (cumulative[-1] - 1) * 100,
"max_drawdown": self._max_drawdown(cumulative) * 100,
"sharpe_ratio": self._sharpe(strategy_returns),
"num_trades": int(trades.sum()),
}
def _max_drawdown(self, cumulative):
peak = np.maximum.accumulate(cumulative)
drawdown = (peak - cumulative) / peak
return drawdown.max()
def _sharpe(self, returns, risk_free=0, periods=365):
excess = returns - risk_free / periods
if excess.std() == 0:
return 0
return np.sqrt(periods) * excess.mean() / excess.std()
Walk-forward optimization
Backtesting on the full dataset leads to overfitting. Walk-forward testing trains on past data and tests on unseen future data:
def walk_forward_test(data, strategy_class, train_window=180, test_window=30):
results = []
total_days = len(data)
for start in range(0, total_days - train_window - test_window, test_window):
train_data = data[start:start + train_window]
test_data = data[start + train_window:start + train_window + test_window]
# Optimize parameters on training set
strategy = strategy_class()
best_params = strategy.optimize(train_data)
# Test with optimized parameters on unseen data
strategy.set_params(best_params)
period_return = strategy.backtest(test_data)
results.append({
"period_start": test_data.index[0],
"period_end": test_data.index[-1],
"return_pct": period_return,
"params": best_params,
})
return pd.DataFrame(results)
Latency optimization
In competitive markets, milliseconds matter. Key optimizations:
- Connection pooling: Reuse HTTP sessions and WebSocket connections.
- Local order book maintenance: Instead of fetching the full order book, subscribe to incremental updates and maintain a local copy.
- Pre-signed transactions: For DEX trading, pre-compute transaction calldata so you only need to broadcast when the signal fires.
- Colocation: Run your bot on a server geographically close to the exchange’s data center.
class LocalOrderBook:
def __init__(self):
self.bids = {} # price → quantity
self.asks = {}
def apply_update(self, update):
for price, qty in update.get("bids", []):
if qty == 0:
self.bids.pop(price, None)
else:
self.bids[price] = qty
for price, qty in update.get("asks", []):
if qty == 0:
self.asks.pop(price, None)
else:
self.asks[price] = qty
def best_bid(self):
return max(self.bids.keys()) if self.bids else 0
def best_ask(self):
return min(self.asks.keys()) if self.asks else float("inf")
def spread(self):
return self.best_ask() - self.best_bid()
Operational monitoring
Production bots need comprehensive monitoring:
from prometheus_client import Counter, Gauge, Histogram
orders_total = Counter("bot_orders_total", "Total orders", ["side", "status"])
portfolio_value = Gauge("bot_portfolio_value_usd", "Portfolio value in USD")
order_latency = Histogram("bot_order_latency_seconds", "Order round-trip time")
pnl_gauge = Gauge("bot_pnl_usd", "Unrealized + realized PnL")
drawdown_gauge = Gauge("bot_drawdown_pct", "Current drawdown percentage")
class MonitoredBot:
def on_order_filled(self, order):
orders_total.labels(side=order.side, status="filled").inc()
self._update_pnl()
def on_tick(self):
portfolio_value.set(self.calculate_portfolio_value())
pnl_gauge.set(self.calculate_pnl())
drawdown_gauge.set(self.calculate_drawdown())
Set alerts for: drawdown exceeding threshold, no trades for N hours (possible connectivity issue), exchange API errors exceeding rate, and unexpected position sizes.
One thing to remember
A production crypto trading bot is an event-driven system where the execution infrastructure (WebSocket feeds, order management, position tracking, monitoring) is more complex and more critical than the trading strategy itself — most failures come from engineering problems, not bad signals.
See Also
- Python Blockchain Data Analysis How Python detectives read the blockchain's public ledger to find patterns, explained with a library guest book analogy.
- Python Defi Protocol Integration How Python connects to decentralized finance protocols, explained through a self-service banking analogy.
- Python Ipfs Integration How Python stores and retrieves files on the decentralized web using IPFS, explained through a neighborhood library network.
- Python Nft Metadata Generation How Python creates the descriptions and images behind NFT collections, told through a trading card factory story.
- Python Smart Contract Testing Why testing blockchain programs with Python matters, explained through a vending machine story anyone can follow.