Python Blockchain Data Analysis — Deep Dive
Building a blockchain data pipeline
Production blockchain analytics requires a pipeline that continuously ingests, decodes, and stores on-chain data for fast querying.
Node RPC ──→ Block Fetcher ──→ Event Decoder ──→ Transformer ──→ Database
↑ ↓
ABI Registry Query Layer
↓
Dashboard / API
Block ingestion with ethereum-etl
The ethereum-etl project exports blockchain data into structured formats:
# Export blocks and transactions to CSV
# ethereumetl export_blocks_and_transactions \
# --start-block 18000000 --end-block 18001000 \
# --provider-uri https://eth.llamarpc.com \
# --blocks-output blocks.csv --transactions-output txs.csv
import pandas as pd
blocks = pd.read_csv("blocks.csv")
txs = pd.read_csv("txs.csv")
# Gas analysis
txs["gas_price_gwei"] = txs["gas_price"] / 10**9
avg_gas = txs.groupby("block_number")["gas_price_gwei"].mean()
For continuous ingestion, run ethereum-etl stream which tails new blocks and writes to Kafka, Postgres, or files.
Event decoding with ABI registry
Raw event logs contain hex-encoded topics and data. Decoding them requires matching the topic hash to known event signatures:
from eth_abi import decode
from web3 import Web3
# ERC-20 Transfer event signature
TRANSFER_TOPIC = Web3.keccak(text="Transfer(address,address,uint256)").hex()
def decode_transfer_log(log):
if log["topics"][0].hex() != TRANSFER_TOPIC:
return None
from_addr = "0x" + log["topics"][1].hex()[-40:]
to_addr = "0x" + log["topics"][2].hex()[-40:]
value = int(log["data"].hex(), 16)
return {"from": from_addr, "to": to_addr, "value": value}
For multi-protocol analysis, maintain an ABI registry that maps contract addresses to their ABIs, enabling automatic decoding of any event from any known contract.
MEV detection and analysis
Maximal Extractable Value (MEV) represents profit extracted by validators and searchers through transaction ordering. Detecting MEV patterns reveals hidden market dynamics.
Sandwich attack detection
A sandwich attack wraps a victim’s swap between two attacker transactions:
def detect_sandwiches(block_txs: pd.DataFrame, swaps: pd.DataFrame) -> list:
"""Find sandwich attacks within a block."""
sandwiches = []
swaps_sorted = swaps.sort_values("transaction_index")
for i in range(len(swaps_sorted) - 2):
tx_a = swaps_sorted.iloc[i]
tx_victim = swaps_sorted.iloc[i + 1]
tx_b = swaps_sorted.iloc[i + 2]
# Same pool, same attacker on both sides
if (tx_a["pool"] == tx_victim["pool"] == tx_b["pool"] and
tx_a["sender"] == tx_b["sender"] and
tx_a["sender"] != tx_victim["sender"]):
# Attacker buys before, sells after
if tx_a["direction"] == "buy" and tx_b["direction"] == "sell":
profit = tx_b["amount_out"] - tx_a["amount_in"]
sandwiches.append({
"attacker": tx_a["sender"],
"victim": tx_victim["sender"],
"pool": tx_a["pool"],
"profit": profit,
"block": tx_a["block_number"],
})
return sandwiches
Arbitrage detection
Atomic arbitrage happens when a single transaction trades across multiple pools to profit from price differences:
def detect_arbitrage(tx_swaps: list) -> dict:
"""Detect if a transaction's swaps form a profitable cycle."""
if len(tx_swaps) < 2:
return None
first_token_in = tx_swaps[0]["token_in"]
last_token_out = tx_swaps[-1]["token_out"]
# Cycle detection: does the last output match the first input?
if first_token_in == last_token_out:
profit = tx_swaps[-1]["amount_out"] - tx_swaps[0]["amount_in"]
return {
"type": "cyclic_arbitrage",
"hops": len(tx_swaps),
"profit": profit,
"tokens": [s["token_in"] for s in tx_swaps],
}
return None
Wallet profiling and entity labeling
Assigning labels to addresses transforms raw data into actionable intelligence:
KNOWN_LABELS = {
"0xdead...": "burn_address",
"0x28c6...": "binance_hot_wallet",
"0x21a3...": "aave_v3_pool",
}
class WalletProfiler:
def __init__(self, transactions_df, labels=None):
self.txs = transactions_df
self.labels = labels or KNOWN_LABELS
def profile(self, address):
addr_txs = self.txs[
(self.txs["from"] == address) | (self.txs["to"] == address)
]
return {
"address": address,
"label": self.labels.get(address, "unknown"),
"first_seen": addr_txs["timestamp"].min(),
"last_seen": addr_txs["timestamp"].max(),
"tx_count": len(addr_txs),
"unique_counterparties": addr_txs["to"].nunique() + addr_txs["from"].nunique(),
"total_eth_sent": addr_txs[addr_txs["from"] == address]["value"].sum() / 10**18,
"total_eth_received": addr_txs[addr_txs["to"] == address]["value"].sum() / 10**18,
"avg_tx_value": addr_txs["value"].mean() / 10**18,
"protocols_used": self._identify_protocols(addr_txs),
}
def _identify_protocols(self, txs):
protocols = set()
for _, tx in txs.iterrows():
label = self.labels.get(tx["to"])
if label:
protocols.add(label)
return list(protocols)
Combine on-chain data with off-chain label databases (Etherscan labels, Arkham Intelligence, Nansen) for comprehensive profiles.
Cross-chain correlation
With DeFi spanning multiple chains, analysts need to track assets moving between them:
def track_bridge_flows(l1_deposits, l2_withdrawals):
"""Correlate L1 bridge deposits with L2 appearances."""
flows = []
for _, deposit in l1_deposits.iterrows():
# Find matching L2 withdrawal within time window
candidates = l2_withdrawals[
(l2_withdrawals["amount"] == deposit["amount"]) &
(l2_withdrawals["timestamp"] > deposit["timestamp"]) &
(l2_withdrawals["timestamp"] < deposit["timestamp"] + 3600)
]
if len(candidates) > 0:
match = candidates.iloc[0]
flows.append({
"l1_tx": deposit["tx_hash"],
"l2_tx": match["tx_hash"],
"amount": deposit["amount"],
"delay_seconds": match["timestamp"] - deposit["timestamp"],
"l1_address": deposit["from"],
"l2_address": match["to"],
})
return pd.DataFrame(flows)
Real-time dashboards
For live monitoring, combine WebSocket event streams with a dashboard framework:
import asyncio
from web3 import AsyncWeb3, AsyncHTTPProvider
class RealtimeAnalytics:
def __init__(self, w3, contracts_to_watch):
self.w3 = w3
self.contracts = contracts_to_watch
self.metrics = {
"swaps_per_minute": 0,
"volume_usd": 0,
"unique_traders": set(),
"largest_swap": 0,
}
async def process_block(self, block_number):
block = await self.w3.eth.get_block(block_number, full_transactions=True)
for tx in block.transactions:
if tx["to"] in self.contracts:
receipt = await self.w3.eth.get_transaction_receipt(tx["hash"])
events = self.decode_events(receipt.logs)
self.update_metrics(events)
def update_metrics(self, events):
for event in events:
if event["type"] == "Swap":
self.metrics["swaps_per_minute"] += 1
self.metrics["volume_usd"] += event["volume_usd"]
self.metrics["unique_traders"].add(event["sender"])
self.metrics["largest_swap"] = max(
self.metrics["largest_swap"], event["volume_usd"]
)
Pair this with Streamlit or Dash for visualization, pushing updates every block (roughly 12 seconds on Ethereum).
Storage and query optimization
Large-scale analysis requires thoughtful storage:
- Parquet files: Column-oriented, compressed, excellent for pandas. Store partitioned by date.
- DuckDB: In-process analytical database that queries Parquet files directly — fast SQL without a server.
- TimescaleDB: PostgreSQL extension optimized for time-series blockchain data.
import duckdb
con = duckdb.connect()
result = con.execute("""
SELECT date_trunc('hour', timestamp) as hour,
count(*) as tx_count,
sum(value_eth) as volume
FROM read_parquet('data/transactions/*.parquet')
WHERE timestamp >= '2024-01-01'
GROUP BY 1
ORDER BY 1
""").fetchdf()
DuckDB can scan 100M+ rows of Parquet data in seconds on a laptop, making it ideal for exploratory blockchain analysis.
One thing to remember
Python blockchain data analysis at scale means building an ingestion pipeline (ethereum-etl or RPC streaming), decoding events through an ABI registry, storing in columnar formats (Parquet + DuckDB), and layering analytical patterns like MEV detection, wallet profiling, and cross-chain correlation on top.
See Also
- Python Crypto Trading Bots How Python programs trade cryptocurrency automatically while you sleep, explained with a lemonade stand price watcher.
- Python Defi Protocol Integration How Python connects to decentralized finance protocols, explained through a self-service banking analogy.
- Python Ipfs Integration How Python stores and retrieves files on the decentralized web using IPFS, explained through a neighborhood library network.
- Python Nft Metadata Generation How Python creates the descriptions and images behind NFT collections, told through a trading card factory story.
- Python Smart Contract Testing Why testing blockchain programs with Python matters, explained through a vending machine story anyone can follow.