Neo4j Integration with Python — Deep Dive
Driver architecture
The official neo4j Python driver (v5.x) implements the Bolt protocol versions 4.3 through 5.4. Internally, it maintains a connection pool per (host, port) pair. Each connection is a TCP socket with optional TLS, and the driver multiplexes sessions across pooled connections.
Key configuration parameters:
from neo4j import GraphDatabase
driver = GraphDatabase.driver(
"neo4j://cluster:7687",
auth=("neo4j", "password"),
max_connection_pool_size=100, # default 100
connection_acquisition_timeout=60, # seconds
max_transaction_retry_time=30, # seconds
encrypted=True,
trust=neo4j.TrustSystemCAs(),
)
The neo4j:// scheme enables routing — the driver discovers cluster topology from the initial server and routes read queries to followers, write queries to the leader. The bolt:// scheme connects directly to a single instance.
Session and transaction lifecycle
Sessions are cheap — they’re logical constructs that borrow connections from the pool. A session pins to a connection only during an active transaction.
with driver.session(database="neo4j") as session:
# execute_write passes a Transaction object to your function
result = session.execute_write(create_person, name="Alice")
def create_person(tx, name):
query = "CREATE (p:Person {name: $name}) RETURN p.name AS name"
record = tx.run(query, name=name).single()
return record["name"]
The function create_person may be called multiple times if the driver encounters transient errors (error codes starting with Neo.TransientError). This means your transaction functions must be idempotent or use MERGE instead of CREATE to avoid duplicate data.
Parameterized queries and injection
Cypher supports parameterized queries natively. Parameters are sent separately from the query string, making Cypher injection impossible when used correctly.
# SAFE — parameters are bound server-side
tx.run("MATCH (p:Person {name: $name}) RETURN p", name=user_input)
# DANGEROUS — string interpolation allows injection
tx.run(f"MATCH (p:Person {{name: '{user_input}'}}) RETURN p")
Always use $parameter syntax. The driver serializes parameters as PackStream types, not as string interpolation.
Async support
The driver provides async variants for asyncio applications:
from neo4j import AsyncGraphDatabase
async_driver = AsyncGraphDatabase.driver("neo4j://localhost:7687", auth=("neo4j", "pw"))
async with async_driver.session() as session:
result = await session.execute_read(fetch_friends, person="Alice")
async def fetch_friends(tx, person):
result = await tx.run(
"MATCH (p:Person {name: $name})-[:KNOWS]->(f) RETURN f.name",
name=person,
)
records = await result.data()
return [r["f.name"] for r in records]
Async sessions use the same connection pool but with non-blocking I/O. This is critical for FastAPI or aiohttp applications where blocking the event loop kills throughput.
Bulk data loading
For large imports (millions of nodes), individual CREATE statements are too slow. Strategies:
1. UNWIND batching — Send data as a list parameter and unwind it server-side:
def batch_create(tx, people):
tx.run(
"UNWIND $batch AS row CREATE (p:Person {name: row.name, age: row.age})",
batch=people,
)
# Send in chunks of 5000-10000
for chunk in chunked(all_people, 5000):
session.execute_write(batch_create, chunk)
2. neo4j-admin import — For initial loads of tens of millions of nodes, the command-line neo4j-admin database import tool bypasses the transactional layer entirely and writes directly to the store files. It requires the database to be offline but is 10-100x faster than Cypher.
3. LOAD CSV — Neo4j can read CSV files directly from the server filesystem or a URL:
LOAD CSV WITH HEADERS FROM 'file:///people.csv' AS row
CREATE (p:Person {name: row.name, age: toInteger(row.age)})
Index management
Without indexes, every MATCH performs a full label scan. Create indexes for properties used in WHERE clauses and MERGE operations:
session.run("CREATE INDEX person_name IF NOT EXISTS FOR (p:Person) ON (p.name)")
session.run("CREATE CONSTRAINT person_email IF NOT EXISTS FOR (p:Person) REQUIRE p.email IS UNIQUE")
Neo4j 5.x supports range indexes (default), text indexes for full-text search, point indexes for geospatial queries, and vector indexes for similarity search.
Result consumption patterns
Results are streamed from the server. Three consumption patterns:
# 1. Eager — collect all records into memory
records = list(result)
# 2. Streaming — process one at a time
for record in result:
process(record)
# 3. Summary only — discard records, get statistics
summary = result.consume()
print(f"Nodes created: {summary.counters.nodes_created}")
For large result sets, streaming avoids memory spikes. For result sets you need to access multiple times, eager collection is necessary.
Error handling taxonomy
Neo4j errors fall into categories:
- ClientError — bad query syntax, constraint violations. Not retryable.
- TransientError — leader switch, deadlock. Retryable (driver handles automatically in managed transactions).
- DatabaseError — internal failures. Generally not retryable.
from neo4j.exceptions import (
ServiceUnavailable,
SessionExpired,
TransientError,
ClientError,
ConstraintError,
)
try:
session.execute_write(do_work)
except ConstraintError:
# Unique constraint violated — handle duplicate
pass
except ServiceUnavailable:
# All connections failed — check network/server
pass
Integration with pandas and NetworkX
Pull graph data into familiar Python tools:
import pandas as pd
import networkx as nx
with driver.session() as session:
result = session.run(
"MATCH (a)-[r:KNOWS]->(b) RETURN a.name AS source, b.name AS target, r.since AS year"
)
df = pd.DataFrame(result.data())
# Build a NetworkX graph from the DataFrame
G = nx.from_pandas_edgelist(df, "source", "target", edge_attr="year")
print(f"Nodes: {G.number_of_nodes()}, Edges: {G.number_of_edges()}")
print(f"Most connected: {max(G.degree(), key=lambda x: x[1])}")
Testing patterns
Use the neo4j test containers or an embedded test instance:
import pytest
from testcontainers.neo4j import Neo4jContainer
@pytest.fixture(scope="session")
def neo4j_driver():
with Neo4jContainer("neo4j:5.15") as container:
driver = GraphDatabase.driver(
container.get_connection_url(),
auth=("neo4j", container.NEO4J_ADMIN_PASSWORD),
)
yield driver
driver.close()
@pytest.fixture(autouse=True)
def clean_db(neo4j_driver):
with neo4j_driver.session() as session:
session.run("MATCH (n) DETACH DELETE n")
Production checklist
- Use
neo4j://URI for cluster routing;bolt://for single-instance only - Set
max_connection_pool_sizebased on expected concurrent sessions - Enable TLS with
encrypted=Trueand proper trust configuration - Use managed transactions (
execute_read/execute_write) — never rawsession.run()in production - Create indexes before bulk loading data
- Monitor connection pool metrics via driver callbacks
- Close the driver on application shutdown to release connections cleanly
One thing to remember: The Neo4j Python driver is designed around managed transactions and connection pooling. Use these correctly and the driver handles retries, routing, and connection lifecycle — try to work around them and you’ll fight the framework.
See Also
- Python Knowledge Graph Construction How Python builds a web of facts about the world — connecting people, places, and ideas so computers can answer real questions.
- Python Property Graph Modeling How Python designs rich maps of connected data where every dot and line can carry extra details.
- Python Rdf Sparql Queries How Python reads and asks questions about the web's universal language for describing things and their connections.
- Python Arima Forecasting How ARIMA models use patterns in past numbers to predict the future, explained like a bedtime story.
- Python Autocorrelation Analysis How today's number is connected to yesterday's, and why that connection is the secret weapon of time series analysis.