Embedding Pipelines in Python — Deep Dive

Embedding pipelines are the data infrastructure behind every semantic search and RAG system. At scale, they must handle millions of documents, multiple embedding models, incremental updates, and quality monitoring — all while keeping costs under control.

1) Pipeline architecture

A production pipeline separates concerns into independently scalable components:

Source Connectors → Preprocessor → Chunker → Embedder → Writer → Monitor

Each component communicates through a queue (Redis, SQS, or an in-process asyncio queue for simpler setups). This lets you scale the embedder independently — the bottleneck in most pipelines.

import asyncio
from dataclasses import dataclass, field

@dataclass
class Chunk:
    doc_id: str
    chunk_index: int
    text: str
    metadata: dict = field(default_factory=dict)
    vector: list[float] | None = None

async def pipeline(sources: list[str], embed_fn, store_fn, batch_size: int = 100):
    chunks: list[Chunk] = []
    for source in sources:
        text = await load_source(source)
        cleaned = preprocess(text)
        chunks.extend(chunk_text(cleaned, source_id=source))

    # Batch embed
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        texts = [c.text for c in batch]
        vectors = await embed_fn(texts)
        for chunk, vec in zip(batch, vectors):
            chunk.vector = vec

    # Store
    await store_fn(chunks)

2) Async batch embedding

API-based embedding benefits from concurrent requests with rate limiting:

import asyncio
from openai import AsyncOpenAI

aclient = AsyncOpenAI()
semaphore = asyncio.Semaphore(5)  # max concurrent requests

async def embed_batch(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
    async with semaphore:
        response = await aclient.embeddings.create(input=texts, model=model)
        return [item.embedding for item in response.data]

async def embed_all(chunks: list[str], batch_size: int = 100) -> list[list[float]]:
    tasks = []
    for i in range(0, len(chunks), batch_size):
        tasks.append(embed_batch(chunks[i:i + batch_size]))
    results = await asyncio.gather(*tasks)
    return [vec for batch in results for vec in batch]

For local models, use sentence-transformers with GPU batching:

from sentence_transformers import SentenceTransformer

model = SentenceTransformer("BAAI/bge-large-en-v1.5", device="cuda")

def embed_local(texts: list[str], batch_size: int = 64) -> list[list[float]]:
    vectors = model.encode(texts, batch_size=batch_size, show_progress_bar=True)
    return vectors.tolist()

3) Embedding model selection

Model choice has a bigger impact on retrieval quality than any other pipeline decision.

ModelDimensionsSpeedQuality (MTEB avg)Cost
text-embedding-3-small (OpenAI)1536Fast (API)Good$0.02/1M tokens
text-embedding-3-large (OpenAI)3072Fast (API)Very good$0.13/1M tokens
bge-large-en-v1.51024GPU-dependentVery goodFree (local)
e5-mistral-7b-instruct4096Slow (7B model)ExcellentFree (local)
Cohere embed-v31024Fast (API)Excellent$0.10/1M tokens

Benchmark on your data, not just public leaderboards. Create an evaluation set of 50-100 queries with known relevant documents and measure recall@10.

4) Incremental processing

Re-embedding everything on each run is wasteful. Track state:

import hashlib
import json
from pathlib import Path

class EmbeddingState:
    def __init__(self, state_file: str = "embedding_state.json"):
        self.state_file = Path(state_file)
        self.state = self._load()

    def _load(self) -> dict:
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {"documents": {}, "model_version": None}

    def needs_embedding(self, doc_id: str, content: str) -> bool:
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        existing = self.state["documents"].get(doc_id)
        return existing is None or existing["hash"] != content_hash

    def mark_embedded(self, doc_id: str, content: str):
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        self.state["documents"][doc_id] = {
            "hash": content_hash,
            "embedded_at": datetime.utcnow().isoformat(),
        }

    def save(self):
        self.state_file.write_text(json.dumps(self.state, indent=2))

When you change embedding models, bump the model_version and re-embed everything. Vectors from different models are incompatible.

5) Chunking implementation details

A production chunker handles edge cases:

import tiktoken

def chunk_with_overlap(
    text: str,
    max_tokens: int = 500,
    overlap_tokens: int = 50,
    model: str = "cl100k_base",
) -> list[str]:
    enc = tiktoken.get_encoding(model)
    tokens = enc.encode(text)
    chunks = []
    start = 0
    while start < len(tokens):
        end = min(start + max_tokens, len(tokens))
        chunk_tokens = tokens[start:end]
        chunks.append(enc.decode(chunk_tokens))
        if end == len(tokens):
            break
        start = end - overlap_tokens
    return chunks

Token-based chunking is more reliable than character-based because embedding models have token limits, not character limits. Using the same tokenizer as the embedding model ensures chunks fit.

6) Dimensionality reduction

High-dimensional vectors cost more to store and query. OpenAI’s text-embedding-3 models support Matryoshka representations — you can truncate vectors to lower dimensions with minimal quality loss:

def truncate_embedding(vector: list[float], target_dim: int = 256) -> list[float]:
    truncated = vector[:target_dim]
    # L2 normalize after truncation
    norm = sum(x**2 for x in truncated) ** 0.5
    return [x / norm for x in truncated]

Going from 1536 to 256 dimensions reduces storage by 6x with typically less than 5% quality loss on retrieval benchmarks. Test on your data before committing.

7) Error handling and retries

API embedding calls fail. Build resilience:

import tenacity

@tenacity.retry(
    stop=tenacity.stop_after_attempt(3),
    wait=tenacity.wait_exponential(min=1, max=30),
    retry=tenacity.retry_if_exception_type((TimeoutError, ConnectionError)),
    before_sleep=lambda retry_state: logger.warning(
        f"Embedding retry {retry_state.attempt_number}: {retry_state.outcome.exception()}"
    ),
)
async def embed_with_retry(texts: list[str]) -> list[list[float]]:
    return await embed_batch(texts)

For local models, handle CUDA out-of-memory errors by reducing batch size dynamically:

def embed_local_safe(texts: list[str], initial_batch: int = 64) -> list[list[float]]:
    batch_size = initial_batch
    while batch_size >= 1:
        try:
            return model.encode(texts, batch_size=batch_size).tolist()
        except RuntimeError as e:
            if "out of memory" in str(e).lower():
                batch_size //= 2
                torch.cuda.empty_cache()
            else:
                raise
    raise RuntimeError("Cannot embed even with batch_size=1")

8) Monitoring and observability

Track these metrics per pipeline run:

  • Documents processed — total and delta from last run.
  • Chunks generated — with size distribution (min, max, mean, p95).
  • Embedding latency — per batch and total.
  • Token usage — for cost tracking with API providers.
  • Failures — retries, permanent failures, and affected document IDs.
  • Quality probe — run a fixed set of test queries after each pipeline run and alert if recall drops below a threshold.

Store metrics in your existing observability stack (Prometheus, Datadog, or even a simple JSON log). The quality probe is the most important — it catches model regressions, chunking bugs, and data quality issues before they affect users.

The one thing to remember: A production embedding pipeline is a data engineering system — async batching, incremental processing, model benchmarking, and quality monitoring are what separate a working prototype from a reliable production feature.

pythonembeddingsnlpdata-pipelinesragproduction

See Also

  • Python Agent Frameworks An agent framework gives AI the ability to plan, use tools, and work through problems step by step — like upgrading a calculator into a research assistant.
  • Python Guardrails Ai Guardrails are safety bumpers for AI — they check what the model says before it reaches users, like a spellchecker but for facts, tone, and dangerous content.
  • Python Llm Evaluation Harness An LLM evaluation harness is like a report card for AI — it runs tests and grades how well the model answers questions so you know if it is actually improving.
  • Python Llm Function Calling Function calling lets an AI ask your Python code for help — like a chef who can read a recipe but needs someone else to actually open the fridge.
  • Python Prompt Chaining Think of prompt chaining as a relay race where each runner hands a baton to the next — except the runners are AI prompts building on each other's work.