Python Blockchain Data Analysis — Deep Dive

Building a blockchain data pipeline

Production blockchain analytics requires a pipeline that continuously ingests, decodes, and stores on-chain data for fast querying.

Node RPC ──→ Block Fetcher ──→ Event Decoder ──→ Transformer ──→ Database
                                      ↑                              ↓
                                 ABI Registry                  Query Layer

                                                            Dashboard / API

Block ingestion with ethereum-etl

The ethereum-etl project exports blockchain data into structured formats:

# Export blocks and transactions to CSV
# ethereumetl export_blocks_and_transactions \
#   --start-block 18000000 --end-block 18001000 \
#   --provider-uri https://eth.llamarpc.com \
#   --blocks-output blocks.csv --transactions-output txs.csv

import pandas as pd
blocks = pd.read_csv("blocks.csv")
txs = pd.read_csv("txs.csv")

# Gas analysis
txs["gas_price_gwei"] = txs["gas_price"] / 10**9
avg_gas = txs.groupby("block_number")["gas_price_gwei"].mean()

For continuous ingestion, run ethereum-etl stream which tails new blocks and writes to Kafka, Postgres, or files.

Event decoding with ABI registry

Raw event logs contain hex-encoded topics and data. Decoding them requires matching the topic hash to known event signatures:

from eth_abi import decode
from web3 import Web3

# ERC-20 Transfer event signature
TRANSFER_TOPIC = Web3.keccak(text="Transfer(address,address,uint256)").hex()

def decode_transfer_log(log):
    if log["topics"][0].hex() != TRANSFER_TOPIC:
        return None
    from_addr = "0x" + log["topics"][1].hex()[-40:]
    to_addr = "0x" + log["topics"][2].hex()[-40:]
    value = int(log["data"].hex(), 16)
    return {"from": from_addr, "to": to_addr, "value": value}

For multi-protocol analysis, maintain an ABI registry that maps contract addresses to their ABIs, enabling automatic decoding of any event from any known contract.

MEV detection and analysis

Maximal Extractable Value (MEV) represents profit extracted by validators and searchers through transaction ordering. Detecting MEV patterns reveals hidden market dynamics.

Sandwich attack detection

A sandwich attack wraps a victim’s swap between two attacker transactions:

def detect_sandwiches(block_txs: pd.DataFrame, swaps: pd.DataFrame) -> list:
    """Find sandwich attacks within a block."""
    sandwiches = []
    swaps_sorted = swaps.sort_values("transaction_index")

    for i in range(len(swaps_sorted) - 2):
        tx_a = swaps_sorted.iloc[i]
        tx_victim = swaps_sorted.iloc[i + 1]
        tx_b = swaps_sorted.iloc[i + 2]

        # Same pool, same attacker on both sides
        if (tx_a["pool"] == tx_victim["pool"] == tx_b["pool"] and
            tx_a["sender"] == tx_b["sender"] and
            tx_a["sender"] != tx_victim["sender"]):

            # Attacker buys before, sells after
            if tx_a["direction"] == "buy" and tx_b["direction"] == "sell":
                profit = tx_b["amount_out"] - tx_a["amount_in"]
                sandwiches.append({
                    "attacker": tx_a["sender"],
                    "victim": tx_victim["sender"],
                    "pool": tx_a["pool"],
                    "profit": profit,
                    "block": tx_a["block_number"],
                })

    return sandwiches

Arbitrage detection

Atomic arbitrage happens when a single transaction trades across multiple pools to profit from price differences:

def detect_arbitrage(tx_swaps: list) -> dict:
    """Detect if a transaction's swaps form a profitable cycle."""
    if len(tx_swaps) < 2:
        return None

    first_token_in = tx_swaps[0]["token_in"]
    last_token_out = tx_swaps[-1]["token_out"]

    # Cycle detection: does the last output match the first input?
    if first_token_in == last_token_out:
        profit = tx_swaps[-1]["amount_out"] - tx_swaps[0]["amount_in"]
        return {
            "type": "cyclic_arbitrage",
            "hops": len(tx_swaps),
            "profit": profit,
            "tokens": [s["token_in"] for s in tx_swaps],
        }
    return None

Wallet profiling and entity labeling

Assigning labels to addresses transforms raw data into actionable intelligence:

KNOWN_LABELS = {
    "0xdead...": "burn_address",
    "0x28c6...": "binance_hot_wallet",
    "0x21a3...": "aave_v3_pool",
}

class WalletProfiler:
    def __init__(self, transactions_df, labels=None):
        self.txs = transactions_df
        self.labels = labels or KNOWN_LABELS

    def profile(self, address):
        addr_txs = self.txs[
            (self.txs["from"] == address) | (self.txs["to"] == address)
        ]

        return {
            "address": address,
            "label": self.labels.get(address, "unknown"),
            "first_seen": addr_txs["timestamp"].min(),
            "last_seen": addr_txs["timestamp"].max(),
            "tx_count": len(addr_txs),
            "unique_counterparties": addr_txs["to"].nunique() + addr_txs["from"].nunique(),
            "total_eth_sent": addr_txs[addr_txs["from"] == address]["value"].sum() / 10**18,
            "total_eth_received": addr_txs[addr_txs["to"] == address]["value"].sum() / 10**18,
            "avg_tx_value": addr_txs["value"].mean() / 10**18,
            "protocols_used": self._identify_protocols(addr_txs),
        }

    def _identify_protocols(self, txs):
        protocols = set()
        for _, tx in txs.iterrows():
            label = self.labels.get(tx["to"])
            if label:
                protocols.add(label)
        return list(protocols)

Combine on-chain data with off-chain label databases (Etherscan labels, Arkham Intelligence, Nansen) for comprehensive profiles.

Cross-chain correlation

With DeFi spanning multiple chains, analysts need to track assets moving between them:

def track_bridge_flows(l1_deposits, l2_withdrawals):
    """Correlate L1 bridge deposits with L2 appearances."""
    flows = []
    for _, deposit in l1_deposits.iterrows():
        # Find matching L2 withdrawal within time window
        candidates = l2_withdrawals[
            (l2_withdrawals["amount"] == deposit["amount"]) &
            (l2_withdrawals["timestamp"] > deposit["timestamp"]) &
            (l2_withdrawals["timestamp"] < deposit["timestamp"] + 3600)
        ]
        if len(candidates) > 0:
            match = candidates.iloc[0]
            flows.append({
                "l1_tx": deposit["tx_hash"],
                "l2_tx": match["tx_hash"],
                "amount": deposit["amount"],
                "delay_seconds": match["timestamp"] - deposit["timestamp"],
                "l1_address": deposit["from"],
                "l2_address": match["to"],
            })
    return pd.DataFrame(flows)

Real-time dashboards

For live monitoring, combine WebSocket event streams with a dashboard framework:

import asyncio
from web3 import AsyncWeb3, AsyncHTTPProvider

class RealtimeAnalytics:
    def __init__(self, w3, contracts_to_watch):
        self.w3 = w3
        self.contracts = contracts_to_watch
        self.metrics = {
            "swaps_per_minute": 0,
            "volume_usd": 0,
            "unique_traders": set(),
            "largest_swap": 0,
        }

    async def process_block(self, block_number):
        block = await self.w3.eth.get_block(block_number, full_transactions=True)
        for tx in block.transactions:
            if tx["to"] in self.contracts:
                receipt = await self.w3.eth.get_transaction_receipt(tx["hash"])
                events = self.decode_events(receipt.logs)
                self.update_metrics(events)

    def update_metrics(self, events):
        for event in events:
            if event["type"] == "Swap":
                self.metrics["swaps_per_minute"] += 1
                self.metrics["volume_usd"] += event["volume_usd"]
                self.metrics["unique_traders"].add(event["sender"])
                self.metrics["largest_swap"] = max(
                    self.metrics["largest_swap"], event["volume_usd"]
                )

Pair this with Streamlit or Dash for visualization, pushing updates every block (roughly 12 seconds on Ethereum).

Storage and query optimization

Large-scale analysis requires thoughtful storage:

  • Parquet files: Column-oriented, compressed, excellent for pandas. Store partitioned by date.
  • DuckDB: In-process analytical database that queries Parquet files directly — fast SQL without a server.
  • TimescaleDB: PostgreSQL extension optimized for time-series blockchain data.
import duckdb

con = duckdb.connect()
result = con.execute("""
    SELECT date_trunc('hour', timestamp) as hour,
           count(*) as tx_count,
           sum(value_eth) as volume
    FROM read_parquet('data/transactions/*.parquet')
    WHERE timestamp >= '2024-01-01'
    GROUP BY 1
    ORDER BY 1
""").fetchdf()

DuckDB can scan 100M+ rows of Parquet data in seconds on a laptop, making it ideal for exploratory blockchain analysis.

One thing to remember

Python blockchain data analysis at scale means building an ingestion pipeline (ethereum-etl or RPC streaming), decoding events through an ABI registry, storing in columnar formats (Parquet + DuckDB), and layering analytical patterns like MEV detection, wallet profiling, and cross-chain correlation on top.

pythonblockchainproduction

See Also