Neo4j Integration with Python — Deep Dive

Driver architecture

The official neo4j Python driver (v5.x) implements the Bolt protocol versions 4.3 through 5.4. Internally, it maintains a connection pool per (host, port) pair. Each connection is a TCP socket with optional TLS, and the driver multiplexes sessions across pooled connections.

Key configuration parameters:

from neo4j import GraphDatabase

driver = GraphDatabase.driver(
    "neo4j://cluster:7687",
    auth=("neo4j", "password"),
    max_connection_pool_size=100,      # default 100
    connection_acquisition_timeout=60,  # seconds
    max_transaction_retry_time=30,      # seconds
    encrypted=True,
    trust=neo4j.TrustSystemCAs(),
)

The neo4j:// scheme enables routing — the driver discovers cluster topology from the initial server and routes read queries to followers, write queries to the leader. The bolt:// scheme connects directly to a single instance.

Session and transaction lifecycle

Sessions are cheap — they’re logical constructs that borrow connections from the pool. A session pins to a connection only during an active transaction.

with driver.session(database="neo4j") as session:
    # execute_write passes a Transaction object to your function
    result = session.execute_write(create_person, name="Alice")

def create_person(tx, name):
    query = "CREATE (p:Person {name: $name}) RETURN p.name AS name"
    record = tx.run(query, name=name).single()
    return record["name"]

The function create_person may be called multiple times if the driver encounters transient errors (error codes starting with Neo.TransientError). This means your transaction functions must be idempotent or use MERGE instead of CREATE to avoid duplicate data.

Parameterized queries and injection

Cypher supports parameterized queries natively. Parameters are sent separately from the query string, making Cypher injection impossible when used correctly.

# SAFE — parameters are bound server-side
tx.run("MATCH (p:Person {name: $name}) RETURN p", name=user_input)

# DANGEROUS — string interpolation allows injection
tx.run(f"MATCH (p:Person {{name: '{user_input}'}}) RETURN p")

Always use $parameter syntax. The driver serializes parameters as PackStream types, not as string interpolation.

Async support

The driver provides async variants for asyncio applications:

from neo4j import AsyncGraphDatabase

async_driver = AsyncGraphDatabase.driver("neo4j://localhost:7687", auth=("neo4j", "pw"))

async with async_driver.session() as session:
    result = await session.execute_read(fetch_friends, person="Alice")

async def fetch_friends(tx, person):
    result = await tx.run(
        "MATCH (p:Person {name: $name})-[:KNOWS]->(f) RETURN f.name",
        name=person,
    )
    records = await result.data()
    return [r["f.name"] for r in records]

Async sessions use the same connection pool but with non-blocking I/O. This is critical for FastAPI or aiohttp applications where blocking the event loop kills throughput.

Bulk data loading

For large imports (millions of nodes), individual CREATE statements are too slow. Strategies:

1. UNWIND batching — Send data as a list parameter and unwind it server-side:

def batch_create(tx, people):
    tx.run(
        "UNWIND $batch AS row CREATE (p:Person {name: row.name, age: row.age})",
        batch=people,
    )

# Send in chunks of 5000-10000
for chunk in chunked(all_people, 5000):
    session.execute_write(batch_create, chunk)

2. neo4j-admin import — For initial loads of tens of millions of nodes, the command-line neo4j-admin database import tool bypasses the transactional layer entirely and writes directly to the store files. It requires the database to be offline but is 10-100x faster than Cypher.

3. LOAD CSV — Neo4j can read CSV files directly from the server filesystem or a URL:

LOAD CSV WITH HEADERS FROM 'file:///people.csv' AS row
CREATE (p:Person {name: row.name, age: toInteger(row.age)})

Index management

Without indexes, every MATCH performs a full label scan. Create indexes for properties used in WHERE clauses and MERGE operations:

session.run("CREATE INDEX person_name IF NOT EXISTS FOR (p:Person) ON (p.name)")
session.run("CREATE CONSTRAINT person_email IF NOT EXISTS FOR (p:Person) REQUIRE p.email IS UNIQUE")

Neo4j 5.x supports range indexes (default), text indexes for full-text search, point indexes for geospatial queries, and vector indexes for similarity search.

Result consumption patterns

Results are streamed from the server. Three consumption patterns:

# 1. Eager — collect all records into memory
records = list(result)

# 2. Streaming — process one at a time
for record in result:
    process(record)

# 3. Summary only — discard records, get statistics
summary = result.consume()
print(f"Nodes created: {summary.counters.nodes_created}")

For large result sets, streaming avoids memory spikes. For result sets you need to access multiple times, eager collection is necessary.

Error handling taxonomy

Neo4j errors fall into categories:

  • ClientError — bad query syntax, constraint violations. Not retryable.
  • TransientError — leader switch, deadlock. Retryable (driver handles automatically in managed transactions).
  • DatabaseError — internal failures. Generally not retryable.
from neo4j.exceptions import (
    ServiceUnavailable,
    SessionExpired,
    TransientError,
    ClientError,
    ConstraintError,
)

try:
    session.execute_write(do_work)
except ConstraintError:
    # Unique constraint violated — handle duplicate
    pass
except ServiceUnavailable:
    # All connections failed — check network/server
    pass

Integration with pandas and NetworkX

Pull graph data into familiar Python tools:

import pandas as pd
import networkx as nx

with driver.session() as session:
    result = session.run(
        "MATCH (a)-[r:KNOWS]->(b) RETURN a.name AS source, b.name AS target, r.since AS year"
    )
    df = pd.DataFrame(result.data())

# Build a NetworkX graph from the DataFrame
G = nx.from_pandas_edgelist(df, "source", "target", edge_attr="year")
print(f"Nodes: {G.number_of_nodes()}, Edges: {G.number_of_edges()}")
print(f"Most connected: {max(G.degree(), key=lambda x: x[1])}")

Testing patterns

Use the neo4j test containers or an embedded test instance:

import pytest
from testcontainers.neo4j import Neo4jContainer

@pytest.fixture(scope="session")
def neo4j_driver():
    with Neo4jContainer("neo4j:5.15") as container:
        driver = GraphDatabase.driver(
            container.get_connection_url(),
            auth=("neo4j", container.NEO4J_ADMIN_PASSWORD),
        )
        yield driver
        driver.close()

@pytest.fixture(autouse=True)
def clean_db(neo4j_driver):
    with neo4j_driver.session() as session:
        session.run("MATCH (n) DETACH DELETE n")

Production checklist

  • Use neo4j:// URI for cluster routing; bolt:// for single-instance only
  • Set max_connection_pool_size based on expected concurrent sessions
  • Enable TLS with encrypted=True and proper trust configuration
  • Use managed transactions (execute_read/execute_write) — never raw session.run() in production
  • Create indexes before bulk loading data
  • Monitor connection pool metrics via driver callbacks
  • Close the driver on application shutdown to release connections cleanly

One thing to remember: The Neo4j Python driver is designed around managed transactions and connection pooling. Use these correctly and the driver handles retries, routing, and connection lifecycle — try to work around them and you’ll fight the framework.

pythondatabasesgraph-databases

See Also