Meilisearch Integration in Python — Deep Dive

Meilisearch’s simplicity makes getting started trivial, but production deployments need careful attention to index design, data synchronization, multi-tenancy, and the new hybrid search capabilities.

1) Index design strategies

Single index vs multi-index

Use a single index when all documents share the same schema and search context. Use separate indices when document types differ significantly:

import meilisearch

client = meilisearch.Client('http://localhost:7700', 'master-key')

# Separate indices for different content types
products_index = client.index('products')
articles_index = client.index('articles')
users_index = client.index('users')

# Configure each independently
products_index.update_searchable_attributes(['name', 'description', 'brand'])
products_index.update_filterable_attributes(['category', 'price', 'in_stock'])

articles_index.update_searchable_attributes(['title', 'body', 'tags'])
articles_index.update_filterable_attributes(['published_at', 'author', 'category'])

Meilisearch v1.6+ supports searching across multiple indices in a single request:

results = client.multi_search([
    {"indexUid": "products", "q": "python", "limit": 5},
    {"indexUid": "articles", "q": "python", "limit": 5},
    {"indexUid": "users", "q": "python", "limit": 3}
])

for index_result in results['results']:
    print(f"\n{index_result['indexUid']}:")
    for hit in index_result['hits']:
        print(f"  {hit.get('name') or hit.get('title')}")

2) Hybrid search with embeddings

Meilisearch v1.6+ supports vector search alongside keyword search, combining BM25-style matching with semantic similarity:

# Configure embedder
index.update_embedders({
    "default": {
        "source": "userProvided",
        "dimensions": 384
    }
})

# Index with embeddings
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('all-MiniLM-L6-v2')

docs_with_vectors = []
for doc in documents:
    text = f"{doc['title']} {doc.get('description', '')}"
    embedding = model.encode(text).tolist()
    doc['_vectors'] = {"default": embedding}
    docs_with_vectors.append(doc)

task = index.add_documents(docs_with_vectors)
client.wait_for_task(task.task_uid)

# Hybrid search: combines keyword + semantic
query_vector = model.encode("stories about time travel").tolist()
results = index.search("time travel", {
    "hybrid": {
        "semanticRatio": 0.5,  # 0 = pure keyword, 1 = pure semantic
        "embedder": "default"
    },
    "vector": query_vector
})

Tune semanticRatio per use case: product search benefits from lower ratios (keywords matter for SKUs/brands), while article search benefits from higher ratios (meaning matters more than exact words).

3) Multi-tenant architecture

Tenant tokens for data isolation

Meilisearch supports tenant tokens that restrict search to specific filtered subsets:

import meilisearch
import json
import base64
import hmac
import hashlib
import time

def create_tenant_token(api_key_uid, api_key, tenant_id, expires_at=None):
    """Create a token that restricts search to a specific tenant."""
    header = base64.urlsafe_b64encode(json.dumps(
        {"alg": "HS256", "typ": "JWT"}
    ).encode()).rstrip(b'=').decode()

    payload_data = {
        "searchRules": {
            "*": {"filter": f"tenant_id = {tenant_id}"}
        },
        "apiKeyUid": api_key_uid,
        "exp": expires_at or int(time.time()) + 3600
    }
    payload = base64.urlsafe_b64encode(
        json.dumps(payload_data).encode()
    ).rstrip(b'=').decode()

    signature = base64.urlsafe_b64encode(
        hmac.new(api_key.encode(), f"{header}.{payload}".encode(), hashlib.sha256).digest()
    ).rstrip(b'=').decode()

    return f"{header}.{payload}.{signature}"

# Client-side: use tenant token (can only see their own data)
tenant_client = meilisearch.Client(
    'http://localhost:7700',
    create_tenant_token(api_key_uid, search_api_key, tenant_id=42)
)
results = tenant_client.index('documents').search("quarterly report")

Index-per-tenant

For strict isolation with different schemas per tenant, create separate indices:

def get_tenant_index(client, tenant_id):
    index_name = f"docs_{tenant_id}"
    try:
        return client.get_index(index_name)
    except meilisearch.errors.MeilisearchApiError:
        task = client.create_index(index_name, {"primaryKey": "id"})
        client.wait_for_task(task.task_uid)
        idx = client.index(index_name)
        idx.update_filterable_attributes(['category', 'created_at'])
        return idx

4) Data synchronization pipeline

Keep Meilisearch in sync with your primary database:

import asyncio
from datetime import datetime, timedelta

class MeilisearchSyncer:
    def __init__(self, db, meili_client, index_name, batch_size=1000):
        self.db = db
        self.index = meili_client.index(index_name)
        self.client = meili_client
        self.batch_size = batch_size
        self.last_sync = None

    async def incremental_sync(self):
        """Sync only changed documents since last run."""
        since = self.last_sync or datetime.utcnow() - timedelta(hours=1)

        # Fetch changed records from primary DB
        changed = await self.db.fetch(
            "SELECT * FROM articles WHERE updated_at > $1 ORDER BY updated_at",
            since
        )

        if not changed:
            return 0

        # Batch upsert to Meilisearch
        for i in range(0, len(changed), self.batch_size):
            batch = changed[i:i + self.batch_size]
            docs = [self._transform(row) for row in batch]
            task = self.index.add_documents(docs)
            self.client.wait_for_task(task.task_uid, timeout_in_ms=60000)

        # Handle deletions
        deleted_ids = await self.db.fetch(
            "SELECT id FROM deleted_articles WHERE deleted_at > $1", since
        )
        if deleted_ids:
            self.index.delete_documents([str(r['id']) for r in deleted_ids])

        self.last_sync = datetime.utcnow()
        return len(changed)

    def _transform(self, row):
        return {
            "id": str(row['id']),
            "title": row['title'],
            "content": row['content'],
            "category": row['category'],
            "published_at": row['published_at'].isoformat(),
        }

Run incremental syncs every 30-60 seconds for near-real-time freshness. Run a full reindex weekly to catch any drift.

5) Performance tuning

Payload size limits

Meilisearch has a default payload limit of 100MB. For large batch imports:

def chunked_import(index, documents, chunk_size=10000):
    """Import in chunks to stay within payload limits."""
    tasks = []
    for i in range(0, len(documents), chunk_size):
        chunk = documents[i:i + chunk_size]
        task = index.add_documents(chunk)
        tasks.append(task.task_uid)

    # Wait for all tasks
    for uid in tasks:
        status = client.wait_for_task(uid, timeout_in_ms=120000)
        if status.status == 'failed':
            raise RuntimeError(f"Task {uid} failed: {status.error}")

Search performance

  • Reduce searchable attributes — fewer searchable fields means faster indexing and search
  • Use attributesToRetrieve to return only needed fields:
results = index.search("python", {
    "attributesToRetrieve": ["title", "id", "rating"],
    "limit": 20
})
  • Pre-compute filterable attributes — only attributes marked filterable can be used in filters; keep the list minimal

Distinct attribute

For results that might have many near-duplicates (same product in different colors):

index.update_distinct_attribute('product_group_id')

6) Monitoring and health checks

def meilisearch_health_check(client):
    try:
        health = client.health()
        stats = client.get_all_stats()

        report = {
            "status": health['status'],
            "indexes": {}
        }

        for name, index_stats in stats['indexes'].items():
            report["indexes"][name] = {
                "documents": index_stats['numberOfDocuments'],
                "indexing": index_stats['isIndexing'],
                "field_distribution": index_stats.get('fieldDistribution', {})
            }

        # Check for stuck tasks
        tasks = client.get_tasks({"statuses": ["processing", "enqueued"], "limit": 100})
        report["pending_tasks"] = tasks.results.__len__() if hasattr(tasks, 'results') else 0

        return report
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

Alert on: health endpoint returning non-200, task queue growing beyond 1000, indexing taking longer than expected (indicates resource starvation), and disk usage approaching limits.

One thing to remember: Meilisearch’s power comes from its opinionated defaults — fight the urge to over-configure, lean into its strengths for user-facing search, and pair it with a reliable sync pipeline from your source-of-truth database.

pythonmeilisearchhybrid-searchmulti-tenancy

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.